blob: e084cc64944c3be33eb93daec722e56205640ec1 [file] [log] [blame]
Anthony Barbier6ff3b192017-09-04 18:44:23 +01001/*
Georgios Pinitas45514032020-12-30 00:03:09 +00002 * Copyright (c) 2016-2021 Arm Limited.
Anthony Barbier6ff3b192017-09-04 18:44:23 +01003 *
4 * SPDX-License-Identifier: MIT
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24#include "arm_compute/runtime/CPP/CPPScheduler.h"
25
26#include "arm_compute/core/CPP/ICPPKernel.h"
27#include "arm_compute/core/Error.h"
28#include "arm_compute/core/Helpers.h"
29#include "arm_compute/core/Utils.h"
Sang-Hoon Park68dd25f2020-10-19 16:00:11 +010030#include "src/runtime/CPUUtils.h"
Pablo Tello27251972019-09-19 16:39:04 +010031#include "support/Mutex.h"
Anthony Barbier6ff3b192017-09-04 18:44:23 +010032
Anthony Barbierd89940e2018-06-28 13:39:35 +010033#include <atomic>
Moritz Pflanzerff06f202017-09-08 13:48:23 +010034#include <condition_variable>
Anthony Barbier6ff3b192017-09-04 18:44:23 +010035#include <iostream>
Georgios Pinitas12833d02019-07-25 13:31:10 +010036#include <list>
Georgios Pinitas40f51a62020-11-21 03:04:18 +000037#include <memory>
Moritz Pflanzerff06f202017-09-08 13:48:23 +010038#include <mutex>
Anthony Barbier6ff3b192017-09-04 18:44:23 +010039#include <system_error>
40#include <thread>
41
Moritz Pflanzerff06f202017-09-08 13:48:23 +010042namespace arm_compute
43{
Anthony Barbier52ecb062018-05-25 13:32:10 +010044namespace
45{
46class ThreadFeeder
47{
48public:
49 /** Constructor
50 *
51 * @param[in] start First value that will be returned by the feeder
52 * @param[in] end End condition (The last value returned by get_next() will be end - 1)
53 */
54 explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0)
Anthony Barbierd89940e2018-06-28 13:39:35 +010055 : _atomic_counter(start), _end(end)
Anthony Barbier52ecb062018-05-25 13:32:10 +010056 {
57 }
58 /** Return the next element in the range if there is one.
59 *
60 * @param[out] next Will contain the next element if there is one.
61 *
62 * @return False if the end of the range has been reached and next wasn't set.
63 */
64 bool get_next(unsigned int &next)
65 {
Anthony Barbierd89940e2018-06-28 13:39:35 +010066 next = atomic_fetch_add_explicit(&_atomic_counter, 1u, std::memory_order_relaxed);
67 return next < _end;
Anthony Barbier52ecb062018-05-25 13:32:10 +010068 }
69
70private:
Anthony Barbierd89940e2018-06-28 13:39:35 +010071 std::atomic_uint _atomic_counter;
Anthony Barbier52ecb062018-05-25 13:32:10 +010072 const unsigned int _end;
Anthony Barbier52ecb062018-05-25 13:32:10 +010073};
74
75/** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run.
76 *
77 * Will run workloads until the feeder reaches the end of its range.
78 *
79 * @param[in] workloads The array of workloads
80 * @param[in,out] feeder The feeder indicating which workload to execute next.
81 * @param[in] info Threading and CPU info.
82 */
83void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder, const ThreadInfo &info)
84{
85 unsigned int workload_index = info.thread_id;
86 do
87 {
88 ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size());
89 workloads[workload_index](info);
90 }
91 while(feeder.get_next(workload_index));
92}
Anthony Barbier52ecb062018-05-25 13:32:10 +010093
Georgios Pinitas06e890b2020-07-09 18:38:34 +010094void set_thread_affinity(int core_id)
Georgios Pinitas12833d02019-07-25 13:31:10 +010095{
Georgios Pinitas06e890b2020-07-09 18:38:34 +010096 if(core_id < 0)
Georgios Pinitas12833d02019-07-25 13:31:10 +010097 {
Georgios Pinitas06e890b2020-07-09 18:38:34 +010098 return;
Georgios Pinitas12833d02019-07-25 13:31:10 +010099 }
100
Georgios Pinitas45514032020-12-30 00:03:09 +0000101#if !defined(__APPLE__)
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100102 cpu_set_t set;
103 CPU_ZERO(&set);
104 CPU_SET(core_id, &set);
Georgios Pinitas45514032020-12-30 00:03:09 +0000105 ARM_COMPUTE_EXIT_ON_MSG(sched_setaffinity(0, sizeof(set), &set), "Error setting thread affinity");
106#endif /* !defined(__APPLE__) */
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100107}
Georgios Pinitas12833d02019-07-25 13:31:10 +0100108
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100109class Thread final
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100110{
111public:
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100112 /** Start a new thread
113 *
114 * Thread will be pinned to a given core id if value is non-negative
115 *
116 * @param[in] core_pin Core id to pin the thread on. If negative no thread pinning will take place
117 */
118 explicit Thread(int core_pin = -1);
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100119
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100120 Thread(const Thread &) = delete;
121 Thread &operator=(const Thread &) = delete;
122 Thread(Thread &&) = delete;
123 Thread &operator=(Thread &&) = delete;
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100124
125 /** Destructor. Make the thread join. */
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100126 ~Thread();
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100127
Anthony Barbier52ecb062018-05-25 13:32:10 +0100128 /** Request the worker thread to start executing workloads.
129 *
130 * The thread will start by executing workloads[info.thread_id] and will then call the feeder to
131 * get the index of the following workload to run.
132 *
133 * @note This function will return as soon as the workloads have been sent to the worker thread.
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100134 * wait() needs to be called to ensure the execution is complete.
135 */
Anthony Barbier52ecb062018-05-25 13:32:10 +0100136 void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100137
138 /** Wait for the current kernel execution to complete. */
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100139 void wait();
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100140
141 /** Function ran by the worker thread. */
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100142 void worker_thread();
143
144private:
Anthony Barbier52ecb062018-05-25 13:32:10 +0100145 std::thread _thread{};
146 ThreadInfo _info{};
147 std::vector<IScheduler::Workload> *_workloads{ nullptr };
148 ThreadFeeder *_feeder{ nullptr };
149 std::mutex _m{};
150 std::condition_variable _cv{};
151 bool _wait_for_work{ false };
152 bool _job_complete{ true };
153 std::exception_ptr _current_exception{ nullptr };
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100154 int _core_pin{ -1 };
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100155};
156
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100157Thread::Thread(int core_pin)
158 : _core_pin(core_pin)
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100159{
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100160 _thread = std::thread(&Thread::worker_thread, this);
161}
162
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100163Thread::~Thread()
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100164{
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100165 // Make sure worker thread has ended
166 if(_thread.joinable())
167 {
Anthony Barbier52ecb062018-05-25 13:32:10 +0100168 ThreadFeeder feeder;
169 start(nullptr, feeder, ThreadInfo());
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100170 _thread.join();
171 }
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100172}
173
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100174void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100175{
Anthony Barbier52ecb062018-05-25 13:32:10 +0100176 _workloads = workloads;
177 _feeder = &feeder;
178 _info = info;
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100179 {
180 std::lock_guard<std::mutex> lock(_m);
181 _wait_for_work = true;
182 _job_complete = false;
183 }
184 _cv.notify_one();
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100185}
186
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100187void Thread::wait()
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100188{
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100189 {
190 std::unique_lock<std::mutex> lock(_m);
191 _cv.wait(lock, [&] { return _job_complete; });
192 }
193
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100194 if(_current_exception)
195 {
196 std::rethrow_exception(_current_exception);
197 }
198}
199
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100200void Thread::worker_thread()
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100201{
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100202 set_thread_affinity(_core_pin);
203
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100204 while(true)
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100205 {
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100206 std::unique_lock<std::mutex> lock(_m);
207 _cv.wait(lock, [&] { return _wait_for_work; });
208 _wait_for_work = false;
209
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100210 _current_exception = nullptr;
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100211
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100212 // Time to exit
Anthony Barbier52ecb062018-05-25 13:32:10 +0100213 if(_workloads == nullptr)
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100214 {
215 return;
216 }
217
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000218#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100219 try
220 {
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000221#endif /* ARM_COMPUTE_EXCEPTIONS_ENABLED */
Anthony Barbier52ecb062018-05-25 13:32:10 +0100222 process_workloads(*_workloads, *_feeder, _info);
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000223
224#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100225 }
226 catch(...)
227 {
228 _current_exception = std::current_exception();
229 }
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000230#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100231 _job_complete = true;
232 lock.unlock();
233 _cv.notify_one();
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100234 }
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100235}
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100236} //namespace
237
238struct CPPScheduler::Impl final
239{
240 explicit Impl(unsigned int thread_hint)
241 : _num_threads(thread_hint), _threads(_num_threads - 1)
242 {
243 }
244 void set_num_threads(unsigned int num_threads, unsigned int thread_hint)
245 {
246 _num_threads = num_threads == 0 ? thread_hint : num_threads;
247 _threads.resize(_num_threads - 1);
248 }
249 void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func)
250 {
251 _num_threads = num_threads == 0 ? thread_hint : num_threads;
252
253 // Set affinity on main thread
254 set_thread_affinity(func(0, thread_hint));
255
256 // Set affinity on worked threads
257 _threads.clear();
258 for(auto i = 1U; i < _num_threads; ++i)
259 {
260 _threads.emplace_back(func(i, thread_hint));
261 }
262 }
263 unsigned int num_threads() const
264 {
265 return _num_threads;
266 }
267
268 void run_workloads(std::vector<IScheduler::Workload> &workloads);
269
270 unsigned int _num_threads;
271 std::list<Thread> _threads;
272 arm_compute::Mutex _run_workloads_mutex{};
273};
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100274
Georgios Pinitas12833d02019-07-25 13:31:10 +0100275/*
276 * This singleton has been deprecated and will be removed in the next release
277 */
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100278CPPScheduler &CPPScheduler::get()
279{
280 static CPPScheduler scheduler;
281 return scheduler;
282}
283
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100284CPPScheduler::CPPScheduler()
Georgios Pinitas40f51a62020-11-21 03:04:18 +0000285 : _impl(std::make_unique<Impl>(num_threads_hint()))
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100286{
287}
288
Georgios Pinitas12833d02019-07-25 13:31:10 +0100289CPPScheduler::~CPPScheduler() = default;
290
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100291void CPPScheduler::set_num_threads(unsigned int num_threads)
292{
Pablo Tello27251972019-09-19 16:39:04 +0100293 // No changes in the number of threads while current workloads are running
294 arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
Georgios Pinitas12833d02019-07-25 13:31:10 +0100295 _impl->set_num_threads(num_threads, num_threads_hint());
Anthony Barbier6ff3b192017-09-04 18:44:23 +0100296}
297
Georgios Pinitas06e890b2020-07-09 18:38:34 +0100298void CPPScheduler::set_num_threads_with_affinity(unsigned int num_threads, BindFunc func)
299{
300 // No changes in the number of threads while current workloads are running
301 arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
302 _impl->set_num_threads_with_affinity(num_threads, num_threads_hint(), func);
303}
304
Moritz Pflanzerd929b9c2017-06-28 10:15:48 +0100305unsigned int CPPScheduler::num_threads() const
306{
Georgios Pinitas12833d02019-07-25 13:31:10 +0100307 return _impl->num_threads();
Moritz Pflanzerd929b9c2017-06-28 10:15:48 +0100308}
309
Vidhya Sudhan Loganathand646ae12018-11-19 15:18:20 +0000310#ifndef DOXYGEN_SKIP_THIS
Anthony Barbier52ecb062018-05-25 13:32:10 +0100311void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
312{
Pablo Tello27251972019-09-19 16:39:04 +0100313 // Mutex to ensure other threads won't interfere with the setup of the current thread's workloads
314 // Other thread's workloads will be scheduled after the current thread's workloads have finished
315 // This is not great because different threads workloads won't run in parallel but at least they
316 // won't interfere each other and deadlock.
317 arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
318 const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
Anthony Barbier52ecb062018-05-25 13:32:10 +0100319 if(num_threads < 1)
320 {
321 return;
322 }
323 ThreadFeeder feeder(num_threads, workloads.size());
324 ThreadInfo info;
325 info.cpu_info = &_cpu_info;
326 info.num_threads = num_threads;
327 unsigned int t = 0;
Georgios Pinitas12833d02019-07-25 13:31:10 +0100328 auto thread_it = _impl->_threads.begin();
Anthony Barbier52ecb062018-05-25 13:32:10 +0100329 for(; t < num_threads - 1; ++t, ++thread_it)
330 {
331 info.thread_id = t;
332 thread_it->start(&workloads, feeder, info);
333 }
334
335 info.thread_id = t;
336 process_workloads(workloads, feeder, info);
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000337#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
Anthony Barbier52ecb062018-05-25 13:32:10 +0100338 try
339 {
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000340#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
Georgios Pinitas12833d02019-07-25 13:31:10 +0100341 for(auto &thread : _impl->_threads)
Anthony Barbier52ecb062018-05-25 13:32:10 +0100342 {
343 thread.wait();
344 }
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000345#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
Anthony Barbier52ecb062018-05-25 13:32:10 +0100346 }
347 catch(const std::system_error &e)
348 {
349 std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
350 }
Michalis Spyrou323ce0f2018-11-30 16:30:43 +0000351#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
Anthony Barbier52ecb062018-05-25 13:32:10 +0100352}
Vidhya Sudhan Loganathand646ae12018-11-19 15:18:20 +0000353#endif /* DOXYGEN_SKIP_THIS */
Anthony Barbier52ecb062018-05-25 13:32:10 +0100354
Georgios Pinitas0499dff2020-07-31 22:21:38 +0100355void CPPScheduler::schedule_op(ICPPKernel *kernel, const Hints &hints, ITensorPack &tensors)
Michalis Spyroubcd23522020-05-21 15:02:36 +0100356{
Georgios Pinitas0499dff2020-07-31 22:21:38 +0100357 schedule_common(kernel, hints, tensors);
Michalis Spyroubcd23522020-05-21 15:02:36 +0100358}
359
360void CPPScheduler::schedule(ICPPKernel *kernel, const Hints &hints)
361{
Georgios Pinitas0499dff2020-07-31 22:21:38 +0100362 ITensorPack tensors;
363 schedule_common(kernel, hints, tensors);
Michalis Spyroubcd23522020-05-21 15:02:36 +0100364}
Moritz Pflanzerff06f202017-09-08 13:48:23 +0100365} // namespace arm_compute