blob: 2a321a1101d659008cb76a4f1b617b9e0b078ff9 [file] [log] [blame]
Anthony Barbier6ff3b192017-09-04 18:44:23 +01001/*
2 * Copyright (c) 2016, 2017 ARM Limited.
3 *
4 * SPDX-License-Identifier: MIT
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24#include "arm_compute/runtime/CPP/CPPScheduler.h"
25
26#include "arm_compute/core/CPP/ICPPKernel.h"
27#include "arm_compute/core/Error.h"
28#include "arm_compute/core/Helpers.h"
29#include "arm_compute/core/Utils.h"
30
31#include <iostream>
32#include <semaphore.h>
33#include <system_error>
34#include <thread>
35
36using namespace arm_compute;
37
38class arm_compute::Thread
39{
40public:
41 /** Start a new thread
42 */
43 Thread();
44 Thread(const Thread &) = delete;
45 Thread &operator=(const Thread &) = delete;
46 Thread(Thread &&) = delete;
47 Thread &operator=(Thread &&) = delete;
48 /** Make the thread join
49 */
50 ~Thread();
51 /** Request the worker thread to start executing the given kernel
52 * This function will return as soon as the kernel has been sent to the worker thread.
53 * wait() needs to be called to ensure the execution is complete.
54 */
55 void start(ICPPKernel *kernel, const Window &window);
56 /** Wait for the current kernel execution to complete
57 */
58 void wait();
59 /** Function ran by the worker thread
60 */
61 void worker_thread();
62
63private:
64 std::thread _thread;
65 ICPPKernel *_kernel{ nullptr };
66 Window _window;
67 sem_t _wait_for_work;
68 sem_t _job_complete;
69 std::exception_ptr _current_exception;
70};
71
72Thread::Thread()
73 : _thread(), _window(), _wait_for_work(), _job_complete(), _current_exception(nullptr)
74{
75 int ret = sem_init(&_wait_for_work, 0, 0);
76 ARM_COMPUTE_ERROR_ON(ret < 0);
77 ARM_COMPUTE_UNUSED(ret);
78
79 ret = sem_init(&_job_complete, 0, 0);
80 ARM_COMPUTE_ERROR_ON(ret < 0);
81 ARM_COMPUTE_UNUSED(ret);
82
83 _thread = std::thread(&Thread::worker_thread, this);
84}
85
86Thread::~Thread()
87{
88 ARM_COMPUTE_ERROR_ON(!_thread.joinable());
89
90 start(nullptr, Window());
91 _thread.join();
92
93 int ret = sem_destroy(&_wait_for_work);
94 ARM_COMPUTE_ERROR_ON(ret < 0);
95 ARM_COMPUTE_UNUSED(ret);
96
97 ret = sem_destroy(&_job_complete);
98 ARM_COMPUTE_ERROR_ON(ret < 0);
99 ARM_COMPUTE_UNUSED(ret);
100}
101
102void Thread::start(ICPPKernel *kernel, const Window &window)
103{
104 _kernel = kernel;
105 _window = window;
106 int ret = sem_post(&_wait_for_work);
107 ARM_COMPUTE_UNUSED(ret);
108 ARM_COMPUTE_ERROR_ON(ret < 0);
109}
110
111void Thread::wait()
112{
113 int ret = sem_wait(&_job_complete);
114 ARM_COMPUTE_UNUSED(ret);
115 ARM_COMPUTE_ERROR_ON(ret < 0);
116 if(_current_exception)
117 {
118 std::rethrow_exception(_current_exception);
119 }
120}
121
122void Thread::worker_thread()
123{
124 while(sem_wait(&_wait_for_work) >= 0)
125 {
126 _current_exception = nullptr;
127 // Time to exit
128 if(_kernel == nullptr)
129 {
130 return;
131 }
132
133 try
134 {
135 _window.validate();
136 _kernel->run(_window);
137 }
138 catch(...)
139 {
140 _current_exception = std::current_exception();
141 }
142 int ret = sem_post(&_job_complete);
143 ARM_COMPUTE_UNUSED(ret);
144 ARM_COMPUTE_ERROR_ON(ret < 0);
145 }
146
147 ARM_COMPUTE_ERROR("Wait failed");
148}
149
150namespace
151{
152void delete_threads(Thread *t)
153{
154 delete[] t;
155}
156} // namespace
157
158CPPScheduler &CPPScheduler::get()
159{
160 static CPPScheduler scheduler;
161 return scheduler;
162}
163
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100164CPPScheduler::CPPScheduler()
165 : _num_threads(std::thread::hardware_concurrency()),
Moritz Pflanzerd929b9c2017-06-28 10:15:48 +0100166 _threads(std::unique_ptr<Thread[], void(*)(Thread *)>(new Thread[std::thread::hardware_concurrency() - 1], delete_threads)),
167 _target(CPUTarget::INTRINSICS)
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100168{
169}
170
171void CPPScheduler::set_num_threads(unsigned int num_threads)
172{
173 const unsigned int num_cores = std::thread::hardware_concurrency();
174 _num_threads = num_threads == 0 ? num_cores : num_threads;
175}
176
Moritz Pflanzerd929b9c2017-06-28 10:15:48 +0100177unsigned int CPPScheduler::num_threads() const
178{
179 return _num_threads;
180}
181
182void CPPScheduler::set_target(CPUTarget target)
183{
184 _target = target;
185}
186
187CPUTarget CPPScheduler::target() const
188{
189 return _target;
190}
191
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100192void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension)
193{
194 ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel");
195
196 /** [Scheduler example] */
197 const Window &max_window = kernel->window();
198 const unsigned int num_iterations = max_window.num_iterations(split_dimension);
199 const unsigned int num_threads = std::min(num_iterations, _num_threads);
200
201 if(!kernel->is_parallelisable() || 1 == num_threads)
202 {
203 kernel->run(max_window);
204 }
205 else
206 {
207 for(unsigned int t = 0; t < num_threads; ++t)
208 {
209 Window win = max_window.split_window(split_dimension, t, num_threads);
210 win.set_thread_id(t);
211 win.set_num_threads(num_threads);
212
213 if(t != num_threads - 1)
214 {
215 _threads[t].start(kernel, win);
216 }
217 else
218 {
219 kernel->run(win);
220 }
221 }
222
223 try
224 {
225 for(unsigned int t = 1; t < num_threads; ++t)
226 {
227 _threads[t - 1].wait();
228 }
229 }
230 catch(const std::system_error &e)
231 {
232 std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
233 }
234 }
235 /** [Scheduler example] */
236}