blob: 886933074d348ce59f3ef53bee5993f88525f9a6 [file] [log] [blame]
Anthony Barbier6ff3b192017-09-04 18:44:23 +01001/*
2 * Copyright (c) 2016, 2017 ARM Limited.
3 *
4 * SPDX-License-Identifier: MIT
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24#include "arm_compute/runtime/CPP/CPPScheduler.h"
25
26#include "arm_compute/core/CPP/ICPPKernel.h"
27#include "arm_compute/core/Error.h"
28#include "arm_compute/core/Helpers.h"
29#include "arm_compute/core/Utils.h"
30
31#include <iostream>
32#include <semaphore.h>
33#include <system_error>
34#include <thread>
35
36using namespace arm_compute;
37
38class arm_compute::Thread
39{
40public:
41 /** Start a new thread
42 */
43 Thread();
44 Thread(const Thread &) = delete;
45 Thread &operator=(const Thread &) = delete;
46 Thread(Thread &&) = delete;
47 Thread &operator=(Thread &&) = delete;
48 /** Make the thread join
49 */
50 ~Thread();
51 /** Request the worker thread to start executing the given kernel
52 * This function will return as soon as the kernel has been sent to the worker thread.
53 * wait() needs to be called to ensure the execution is complete.
54 */
55 void start(ICPPKernel *kernel, const Window &window);
56 /** Wait for the current kernel execution to complete
57 */
58 void wait();
59 /** Function ran by the worker thread
60 */
61 void worker_thread();
62
63private:
64 std::thread _thread;
65 ICPPKernel *_kernel{ nullptr };
66 Window _window;
67 sem_t _wait_for_work;
68 sem_t _job_complete;
69 std::exception_ptr _current_exception;
70};
71
72Thread::Thread()
73 : _thread(), _window(), _wait_for_work(), _job_complete(), _current_exception(nullptr)
74{
75 int ret = sem_init(&_wait_for_work, 0, 0);
76 ARM_COMPUTE_ERROR_ON(ret < 0);
77 ARM_COMPUTE_UNUSED(ret);
78
79 ret = sem_init(&_job_complete, 0, 0);
80 ARM_COMPUTE_ERROR_ON(ret < 0);
81 ARM_COMPUTE_UNUSED(ret);
82
83 _thread = std::thread(&Thread::worker_thread, this);
84}
85
86Thread::~Thread()
87{
88 ARM_COMPUTE_ERROR_ON(!_thread.joinable());
89
90 start(nullptr, Window());
91 _thread.join();
92
93 int ret = sem_destroy(&_wait_for_work);
94 ARM_COMPUTE_ERROR_ON(ret < 0);
95 ARM_COMPUTE_UNUSED(ret);
96
97 ret = sem_destroy(&_job_complete);
98 ARM_COMPUTE_ERROR_ON(ret < 0);
99 ARM_COMPUTE_UNUSED(ret);
100}
101
102void Thread::start(ICPPKernel *kernel, const Window &window)
103{
104 _kernel = kernel;
105 _window = window;
106 int ret = sem_post(&_wait_for_work);
107 ARM_COMPUTE_UNUSED(ret);
108 ARM_COMPUTE_ERROR_ON(ret < 0);
109}
110
111void Thread::wait()
112{
113 int ret = sem_wait(&_job_complete);
114 ARM_COMPUTE_UNUSED(ret);
115 ARM_COMPUTE_ERROR_ON(ret < 0);
116 if(_current_exception)
117 {
118 std::rethrow_exception(_current_exception);
119 }
120}
121
122void Thread::worker_thread()
123{
124 while(sem_wait(&_wait_for_work) >= 0)
125 {
126 _current_exception = nullptr;
127 // Time to exit
128 if(_kernel == nullptr)
129 {
130 return;
131 }
132
133 try
134 {
135 _window.validate();
136 _kernel->run(_window);
137 }
138 catch(...)
139 {
140 _current_exception = std::current_exception();
141 }
142 int ret = sem_post(&_job_complete);
143 ARM_COMPUTE_UNUSED(ret);
144 ARM_COMPUTE_ERROR_ON(ret < 0);
145 }
146
147 ARM_COMPUTE_ERROR("Wait failed");
148}
149
150namespace
151{
152void delete_threads(Thread *t)
153{
154 delete[] t;
155}
156} // namespace
157
158CPPScheduler &CPPScheduler::get()
159{
160 static CPPScheduler scheduler;
161 return scheduler;
162}
163
164unsigned int CPPScheduler::num_threads() const
165{
166 return _num_threads;
167}
168
169CPPScheduler::CPPScheduler()
170 : _num_threads(std::thread::hardware_concurrency()),
171 _threads(std::unique_ptr<Thread[], void(*)(Thread *)>(new Thread[std::thread::hardware_concurrency() - 1], delete_threads))
172{
173}
174
175void CPPScheduler::set_num_threads(unsigned int num_threads)
176{
177 const unsigned int num_cores = std::thread::hardware_concurrency();
178 _num_threads = num_threads == 0 ? num_cores : num_threads;
179}
180
181void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension)
182{
183 ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel");
184
185 /** [Scheduler example] */
186 const Window &max_window = kernel->window();
187 const unsigned int num_iterations = max_window.num_iterations(split_dimension);
188 const unsigned int num_threads = std::min(num_iterations, _num_threads);
189
190 if(!kernel->is_parallelisable() || 1 == num_threads)
191 {
192 kernel->run(max_window);
193 }
194 else
195 {
196 for(unsigned int t = 0; t < num_threads; ++t)
197 {
198 Window win = max_window.split_window(split_dimension, t, num_threads);
199 win.set_thread_id(t);
200 win.set_num_threads(num_threads);
201
202 if(t != num_threads - 1)
203 {
204 _threads[t].start(kernel, win);
205 }
206 else
207 {
208 kernel->run(win);
209 }
210 }
211
212 try
213 {
214 for(unsigned int t = 1; t < num_threads; ++t)
215 {
216 _threads[t - 1].wait();
217 }
218 }
219 catch(const std::system_error &e)
220 {
221 std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
222 }
223 }
224 /** [Scheduler example] */
225}