COMPMID-417: Cleanup CPPScheduler

Change-Id: I45028dc90db5c8c0ed1eba795d4652aa95305b48
Reviewed-on: http://mpd-gerrit.cambridge.arm.com/87053
Tested-by: Kaizen <jeremy.johnson+kaizengerrit@arm.com>
Reviewed-by: Georgios Pinitas <georgios.pinitas@arm.com>
Reviewed-by: Anthony Barbier <anthony.barbier@arm.com>
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index a8382b4..137c18b 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -28,76 +28,67 @@
 #include "arm_compute/core/Helpers.h"
 #include "arm_compute/core/Utils.h"
 
+#include <condition_variable>
 #include <iostream>
+#include <mutex>
 #include <semaphore.h>
 #include <system_error>
 #include <thread>
 
-using namespace arm_compute;
-
-class arm_compute::Thread
+namespace arm_compute
+{
+class Thread
 {
 public:
-    /** Start a new thread
-     */
+    /** Start a new thread. */
     Thread();
+
     Thread(const Thread &) = delete;
     Thread &operator=(const Thread &) = delete;
     Thread(Thread &&)                 = delete;
     Thread &operator=(Thread &&) = delete;
-    /** Make the thread join
-     */
+
+    /** Destructor. Make the thread join. */
     ~Thread();
+
     /** Request the worker thread to start executing the given kernel
      * This function will return as soon as the kernel has been sent to the worker thread.
      * wait() needs to be called to ensure the execution is complete.
      */
     void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info);
-    /** Wait for the current kernel execution to complete
-     */
+
+    /** Wait for the current kernel execution to complete. */
     void wait();
-    /** Function ran by the worker thread
-     */
+
+    /** Function ran by the worker thread. */
     void worker_thread();
 
 private:
-    std::thread        _thread;
-    ICPPKernel        *_kernel{ nullptr };
-    Window             _window;
-    ThreadInfo         _info;
-    sem_t              _wait_for_work;
-    sem_t              _job_complete;
-    std::exception_ptr _current_exception;
+    std::thread             _thread;
+    ICPPKernel             *_kernel{ nullptr };
+    Window                  _window;
+    ThreadInfo              _info;
+    std::mutex              _m;
+    std::condition_variable _cv;
+    bool                    _wait_for_work{ false };
+    bool                    _job_complete{ true };
+    std::exception_ptr      _current_exception;
 };
 
 Thread::Thread()
-    : _thread(), _window(), _info(), _wait_for_work(), _job_complete(), _current_exception(nullptr)
+    : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr)
 {
-    int ret = sem_init(&_wait_for_work, 0, 0);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
-    ARM_COMPUTE_UNUSED(ret);
-
-    ret = sem_init(&_job_complete, 0, 0);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
-    ARM_COMPUTE_UNUSED(ret);
-
     _thread = std::thread(&Thread::worker_thread, this);
 }
 
 Thread::~Thread()
 {
-    ARM_COMPUTE_ERROR_ON(!_thread.joinable());
-
-    start(nullptr, Window(), ThreadInfo());
-    _thread.join();
-
-    int ret = sem_destroy(&_wait_for_work);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
-    ARM_COMPUTE_UNUSED(ret);
-
-    ret = sem_destroy(&_job_complete);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
-    ARM_COMPUTE_UNUSED(ret);
+    // Make sure worker thread has ended
+    if(_thread.joinable())
+    {
+        start(nullptr, Window(), ThreadInfo());
+        _thread.join();
+    }
 }
 
 void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info)
@@ -105,16 +96,22 @@
     _kernel = kernel;
     _window = window;
     _info   = info;
-    int ret = sem_post(&_wait_for_work);
-    ARM_COMPUTE_UNUSED(ret);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
+
+    {
+        std::lock_guard<std::mutex> lock(_m);
+        _wait_for_work = true;
+        _job_complete  = false;
+    }
+    _cv.notify_one();
 }
 
 void Thread::wait()
 {
-    int ret = sem_wait(&_job_complete);
-    ARM_COMPUTE_UNUSED(ret);
-    ARM_COMPUTE_ERROR_ON(ret < 0);
+    {
+        std::unique_lock<std::mutex> lock(_m);
+        _cv.wait(lock, [&] { return _job_complete; });
+    }
+
     if(_current_exception)
     {
         std::rethrow_exception(_current_exception);
@@ -123,9 +120,14 @@
 
 void Thread::worker_thread()
 {
-    while(sem_wait(&_wait_for_work) >= 0)
+    while(true)
     {
+        std::unique_lock<std::mutex> lock(_m);
+        _cv.wait(lock, [&] { return _wait_for_work; });
+        _wait_for_work = false;
+
         _current_exception = nullptr;
+
         // Time to exit
         if(_kernel == nullptr)
         {
@@ -141,22 +143,13 @@
         {
             _current_exception = std::current_exception();
         }
-        int ret = sem_post(&_job_complete);
-        ARM_COMPUTE_UNUSED(ret);
-        ARM_COMPUTE_ERROR_ON(ret < 0);
+
+        _job_complete = true;
+        lock.unlock();
+        _cv.notify_one();
     }
-
-    ARM_COMPUTE_ERROR("Wait failed");
 }
 
-namespace
-{
-void delete_threads(Thread *t)
-{
-    delete[] t;
-}
-} // namespace
-
 CPPScheduler &CPPScheduler::get()
 {
     static CPPScheduler scheduler;
@@ -165,15 +158,14 @@
 
 CPPScheduler::CPPScheduler()
     : _num_threads(std::thread::hardware_concurrency()),
-      _threads(std::unique_ptr<Thread[], void(*)(Thread *)>(new Thread[_num_threads - 1], delete_threads))
+      _threads(_num_threads - 1)
 {
 }
 
 void CPPScheduler::set_num_threads(unsigned int num_threads)
 {
-    const unsigned int num_cores = std::thread::hardware_concurrency();
-    _num_threads                 = num_threads == 0 ? num_cores : num_threads;
-    _threads.reset(new Thread[_num_threads - 1]);
+    _num_threads = num_threads == 0 ? std::thread::hardware_concurrency() : num_threads;
+    _threads.resize(_num_threads - 1);
 }
 
 unsigned int CPPScheduler::num_threads() const
@@ -199,32 +191,33 @@
     }
     else
     {
-        for(int t = 0; t < info.num_threads; ++t)
+        int  t         = 0;
+        auto thread_it = _threads.begin();
+
+        for(; t < info.num_threads - 1; ++t, ++thread_it)
         {
             Window win     = max_window.split_window(split_dimension, t, info.num_threads);
             info.thread_id = t;
-
-            if(t != info.num_threads - 1)
-            {
-                _threads[t].start(kernel, win, info);
-            }
-            else
-            {
-                kernel->run(win, info);
-            }
+            thread_it->start(kernel, win, info);
         }
 
+        // Run last part on main thread
+        Window win     = max_window.split_window(split_dimension, t, info.num_threads);
+        info.thread_id = t;
+        kernel->run(win, info);
+
         try
         {
-            for(int t = 1; t < info.num_threads; ++t)
+            for(auto &thread : _threads)
             {
-                _threads[t - 1].wait();
+                thread.wait();
             }
         }
         catch(const std::system_error &e)
         {
-            std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+            std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
         }
     }
     /** [Scheduler example] */
 }
+} // namespace arm_compute