Add a simple work queue abstraction.

Makes it easy to schedule a bunch of work to happen in parallel.

Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c
diff --git a/include/utils/AndroidThreads.h b/include/utils/AndroidThreads.h
index f9f7aa4..5bda0fd 100644
--- a/include/utils/AndroidThreads.h
+++ b/include/utils/AndroidThreads.h
@@ -73,6 +73,7 @@
 // Get pid for the current thread.
 extern pid_t androidGetTid();
 
+#ifdef HAVE_ANDROID_OS
 // Change the scheduling group of a particular thread.  The group
 // should be one of the ANDROID_TGROUP constants.  Returns BAD_VALUE if
 // grp is out of range, else another non-zero value with errno set if
@@ -95,6 +96,7 @@
 // scheduling groups are disabled.  Returns INVALID_OPERATION if unexpected error.
 // Thread ID zero means current thread.
 extern int androidGetThreadSchedulingGroup(pid_t tid);
+#endif
 
 #ifdef __cplusplus
 } // extern "C"
diff --git a/include/utils/WorkQueue.h b/include/utils/WorkQueue.h
new file mode 100644
index 0000000..e3c75b2
--- /dev/null
+++ b/include/utils/WorkQueue.h
@@ -0,0 +1,119 @@
+/*]
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _LIBS_UTILS_WORK_QUEUE_H
+#define _LIBS_UTILS_WORK_QUEUE_H
+
+#include <utils/Errors.h>
+#include <utils/Vector.h>
+#include <utils/threads.h>
+
+namespace android {
+
+/*
+ * A threaded work queue.
+ *
+ * This class is designed to make it easy to run a bunch of isolated work
+ * units in parallel, using up to the specified number of threads.
+ * To use it, write a loop to post work units to the work queue, then synchronize
+ * on the queue at the end.
+ */
+class WorkQueue {
+public:
+    class WorkUnit {
+    public:
+        WorkUnit() { }
+        virtual ~WorkUnit() { }
+
+        /*
+         * Runs the work unit.
+         * If the result is 'true' then the work queue continues scheduling work as usual.
+         * If the result is 'false' then the work queue is canceled.
+         */
+        virtual bool run() = 0;
+    };
+
+    /* Creates a work queue with the specified maximum number of work threads. */
+    WorkQueue(size_t maxThreads, bool canCallJava = true);
+
+    /* Destroys the work queue.
+     * Cancels pending work and waits for all remaining threads to complete.
+     */
+    ~WorkQueue();
+
+    /* Posts a work unit to run later.
+     * If the work queue has been canceled or is already finished, returns INVALID_OPERATION
+     * and does not take ownership of the work unit (caller must destroy it itself).
+     * Otherwise, returns OK and takes ownership of the work unit (the work queue will
+     * destroy it automatically).
+     *
+     * For flow control, this method blocks when the size of the pending work queue is more
+     * 'backlog' times the number of threads.  This condition reduces the rate of entry into
+     * the pending work queue and prevents it from growing much more rapidly than the
+     * work threads can actually handle.
+     *
+     * If 'backlog' is 0, then no throttle is applied.
+     */
+    status_t schedule(WorkUnit* workUnit, size_t backlog = 2);
+
+    /* Cancels all pending work.
+     * If the work queue is already finished, returns INVALID_OPERATION.
+     * If the work queue is already canceled, returns OK and does nothing else.
+     * Otherwise, returns OK, discards all pending work units and prevents additional
+     * work units from being scheduled.
+     *
+     * Call finish() after cancel() to wait for all remaining work to complete.
+     */
+    status_t cancel();
+
+    /* Waits for all work to complete.
+     * If the work queue is already finished, returns INVALID_OPERATION.
+     * Otherwise, waits for all work to complete and returns OK.
+     */
+    status_t finish();
+
+private:
+    class WorkThread : public Thread {
+    public:
+        WorkThread(WorkQueue* workQueue, bool canCallJava);
+        virtual ~WorkThread();
+
+    private:
+        virtual bool threadLoop();
+
+        WorkQueue* const mWorkQueue;
+    };
+
+    status_t cancelLocked();
+    bool threadLoop(); // called from each work thread
+
+    const size_t mMaxThreads;
+    const bool mCanCallJava;
+
+    Mutex mLock;
+    Condition mWorkChangedCondition;
+    Condition mWorkDequeuedCondition;
+
+    bool mCanceled;
+    bool mFinished;
+    size_t mIdleThreads;
+    Vector<sp<WorkThread> > mWorkThreads;
+    Vector<WorkUnit*> mWorkUnits;
+};
+
+}; // namespace android
+
+#endif // _LIBS_UTILS_WORK_QUEUE_H
diff --git a/libs/utils/Android.mk b/libs/utils/Android.mk
index 87d0fb2..b4fc994 100644
--- a/libs/utils/Android.mk
+++ b/libs/utils/Android.mk
@@ -41,6 +41,7 @@
 	Tokenizer.cpp \
 	Unicode.cpp \
 	VectorImpl.cpp \
+	WorkQueue.cpp \
 	misc.cpp
 
 host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS)
diff --git a/libs/utils/Threads.cpp b/libs/utils/Threads.cpp
index ab207f5..f9277de 100644
--- a/libs/utils/Threads.cpp
+++ b/libs/utils/Threads.cpp
@@ -323,6 +323,7 @@
 #endif
 }
 
+#ifdef HAVE_ANDROID_OS
 int androidSetThreadSchedulingGroup(pid_t tid, int grp)
 {
     if (grp > ANDROID_TGROUP_MAX || grp < 0) { 
@@ -425,6 +426,7 @@
 
     return ret;
 }
+#endif
 
 namespace android {
 
diff --git a/libs/utils/WorkQueue.cpp b/libs/utils/WorkQueue.cpp
new file mode 100644
index 0000000..3bb99a1
--- /dev/null
+++ b/libs/utils/WorkQueue.cpp
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "WorkQueue"
+
+#include <utils/Log.h>
+#include <utils/WorkQueue.h>
+
+namespace android {
+
+// --- WorkQueue ---
+
+WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
+        mMaxThreads(maxThreads), mCanCallJava(canCallJava),
+        mCanceled(false), mFinished(false), mIdleThreads(0) {
+}
+
+WorkQueue::~WorkQueue() {
+    if (!cancel()) {
+        finish();
+    }
+}
+
+status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
+    AutoMutex _l(mLock);
+
+    if (mFinished || mCanceled) {
+        return INVALID_OPERATION;
+    }
+
+    if (mWorkThreads.size() < mMaxThreads
+            && mIdleThreads < mWorkUnits.size() + 1) {
+        sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
+        status_t status = workThread->run("WorkQueue::WorkThread");
+        if (status) {
+            return status;
+        }
+        mWorkThreads.add(workThread);
+        mIdleThreads += 1;
+    } else if (backlog) {
+        while (mWorkUnits.size() >= mMaxThreads * backlog) {
+            mWorkDequeuedCondition.wait(mLock);
+            if (mFinished || mCanceled) {
+                return INVALID_OPERATION;
+            }
+        }
+    }
+
+    mWorkUnits.add(workUnit);
+    mWorkChangedCondition.broadcast();
+    return OK;
+}
+
+status_t WorkQueue::cancel() {
+    AutoMutex _l(mLock);
+
+    return cancelLocked();
+}
+
+status_t WorkQueue::cancelLocked() {
+    if (mFinished) {
+        return INVALID_OPERATION;
+    }
+
+    if (!mCanceled) {
+        mCanceled = true;
+
+        size_t count = mWorkUnits.size();
+        for (size_t i = 0; i < count; i++) {
+            delete mWorkUnits.itemAt(i);
+        }
+        mWorkUnits.clear();
+        mWorkChangedCondition.broadcast();
+        mWorkDequeuedCondition.broadcast();
+    }
+    return OK;
+}
+
+status_t WorkQueue::finish() {
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        if (mFinished) {
+            return INVALID_OPERATION;
+        }
+
+        mFinished = true;
+        mWorkChangedCondition.broadcast();
+    } // release lock
+
+    // It is not possible for the list of work threads to change once the mFinished
+    // flag has been set, so we can access mWorkThreads outside of the lock here.
+    size_t count = mWorkThreads.size();
+    for (size_t i = 0; i < count; i++) {
+        mWorkThreads.itemAt(i)->join();
+    }
+    mWorkThreads.clear();
+    return OK;
+}
+
+bool WorkQueue::threadLoop() {
+    WorkUnit* workUnit;
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        for (;;) {
+            if (mCanceled) {
+                return false;
+            }
+
+            if (!mWorkUnits.isEmpty()) {
+                workUnit = mWorkUnits.itemAt(0);
+                mWorkUnits.removeAt(0);
+                mIdleThreads -= 1;
+                mWorkDequeuedCondition.broadcast();
+                break;
+            }
+
+            if (mFinished) {
+                return false;
+            }
+
+            mWorkChangedCondition.wait(mLock);
+        }
+    } // release lock
+
+    bool shouldContinue = workUnit->run();
+    delete workUnit;
+
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        mIdleThreads += 1;
+
+        if (!shouldContinue) {
+            cancelLocked();
+            return false;
+        }
+    } // release lock
+
+    return true;
+}
+
+// --- WorkQueue::WorkThread ---
+
+WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
+        Thread(canCallJava), mWorkQueue(workQueue) {
+}
+
+WorkQueue::WorkThread::~WorkThread() {
+}
+
+bool WorkQueue::WorkThread::threadLoop() {
+    return mWorkQueue->threadLoop();
+}
+
+};  // namespace android