Compute Library
 21.02
CPPScheduler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016-2021 Arm Limited.
3  *
4  * SPDX-License-Identifier: MIT
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
25 
27 #include "arm_compute/core/Error.h"
29 #include "arm_compute/core/Utils.h"
30 #include "src/runtime/CPUUtils.h"
31 #include "support/Mutex.h"
32 
33 #include <atomic>
34 #include <condition_variable>
35 #include <iostream>
36 #include <list>
37 #include <memory>
38 #include <mutex>
39 #include <system_error>
40 #include <thread>
41 
42 namespace arm_compute
43 {
44 namespace
45 {
46 class ThreadFeeder
47 {
48 public:
49  /** Constructor
50  *
51  * @param[in] start First value that will be returned by the feeder
52  * @param[in] end End condition (The last value returned by get_next() will be end - 1)
53  */
54  explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0)
55  : _atomic_counter(start), _end(end)
56  {
57  }
58  /** Return the next element in the range if there is one.
59  *
60  * @param[out] next Will contain the next element if there is one.
61  *
62  * @return False if the end of the range has been reached and next wasn't set.
63  */
64  bool get_next(unsigned int &next)
65  {
66  next = atomic_fetch_add_explicit(&_atomic_counter, 1u, std::memory_order_relaxed);
67  return next < _end;
68  }
69 
70 private:
71  std::atomic_uint _atomic_counter;
72  const unsigned int _end;
73 };
74 
75 /** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run.
76  *
77  * Will run workloads until the feeder reaches the end of its range.
78  *
79  * @param[in] workloads The array of workloads
80  * @param[in,out] feeder The feeder indicating which workload to execute next.
81  * @param[in] info Threading and CPU info.
82  */
83 void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder, const ThreadInfo &info)
84 {
85  unsigned int workload_index = info.thread_id;
86  do
87  {
88  ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size());
89  workloads[workload_index](info);
90  }
91  while(feeder.get_next(workload_index));
92 }
93 
94 void set_thread_affinity(int core_id)
95 {
96  if(core_id < 0)
97  {
98  return;
99  }
100 
101 #if !defined(__APPLE__)
102  cpu_set_t set;
103  CPU_ZERO(&set);
104  CPU_SET(core_id, &set);
105  ARM_COMPUTE_EXIT_ON_MSG(sched_setaffinity(0, sizeof(set), &set), "Error setting thread affinity");
106 #endif /* !defined(__APPLE__) */
107 }
108 
109 class Thread final
110 {
111 public:
112  /** Start a new thread
113  *
114  * Thread will be pinned to a given core id if value is non-negative
115  *
116  * @param[in] core_pin Core id to pin the thread on. If negative no thread pinning will take place
117  */
118  explicit Thread(int core_pin = -1);
119 
120  Thread(const Thread &) = delete;
121  Thread &operator=(const Thread &) = delete;
122  Thread(Thread &&) = delete;
123  Thread &operator=(Thread &&) = delete;
124 
125  /** Destructor. Make the thread join. */
126  ~Thread();
127 
128  /** Request the worker thread to start executing workloads.
129  *
130  * The thread will start by executing workloads[info.thread_id] and will then call the feeder to
131  * get the index of the following workload to run.
132  *
133  * @note This function will return as soon as the workloads have been sent to the worker thread.
134  * wait() needs to be called to ensure the execution is complete.
135  */
136  void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
137 
138  /** Wait for the current kernel execution to complete. */
139  void wait();
140 
141  /** Function ran by the worker thread. */
142  void worker_thread();
143 
144 private:
145  std::thread _thread{};
146  ThreadInfo _info{};
147  std::vector<IScheduler::Workload> *_workloads{ nullptr };
148  ThreadFeeder *_feeder{ nullptr };
149  std::mutex _m{};
150  std::condition_variable _cv{};
151  bool _wait_for_work{ false };
152  bool _job_complete{ true };
153  std::exception_ptr _current_exception{ nullptr };
154  int _core_pin{ -1 };
155 };
156 
157 Thread::Thread(int core_pin)
158  : _core_pin(core_pin)
159 {
160  _thread = std::thread(&Thread::worker_thread, this);
161 }
162 
163 Thread::~Thread()
164 {
165  // Make sure worker thread has ended
166  if(_thread.joinable())
167  {
168  ThreadFeeder feeder;
169  start(nullptr, feeder, ThreadInfo());
170  _thread.join();
171  }
172 }
173 
174 void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
175 {
176  _workloads = workloads;
177  _feeder = &feeder;
178  _info = info;
179  {
180  std::lock_guard<std::mutex> lock(_m);
181  _wait_for_work = true;
182  _job_complete = false;
183  }
184  _cv.notify_one();
185 }
186 
187 void Thread::wait()
188 {
189  {
190  std::unique_lock<std::mutex> lock(_m);
191  _cv.wait(lock, [&] { return _job_complete; });
192  }
193 
194  if(_current_exception)
195  {
196  std::rethrow_exception(_current_exception);
197  }
198 }
199 
200 void Thread::worker_thread()
201 {
202  set_thread_affinity(_core_pin);
203 
204  while(true)
205  {
206  std::unique_lock<std::mutex> lock(_m);
207  _cv.wait(lock, [&] { return _wait_for_work; });
208  _wait_for_work = false;
209 
210  _current_exception = nullptr;
211 
212  // Time to exit
213  if(_workloads == nullptr)
214  {
215  return;
216  }
217 
218 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
219  try
220  {
221 #endif /* ARM_COMPUTE_EXCEPTIONS_ENABLED */
222  process_workloads(*_workloads, *_feeder, _info);
223 
224 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
225  }
226  catch(...)
227  {
228  _current_exception = std::current_exception();
229  }
230 #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
231  _job_complete = true;
232  lock.unlock();
233  _cv.notify_one();
234  }
235 }
236 } //namespace
237 
238 struct CPPScheduler::Impl final
239 {
240  explicit Impl(unsigned int thread_hint)
241  : _num_threads(thread_hint), _threads(_num_threads - 1)
242  {
243  }
244  void set_num_threads(unsigned int num_threads, unsigned int thread_hint)
245  {
246  _num_threads = num_threads == 0 ? thread_hint : num_threads;
247  _threads.resize(_num_threads - 1);
248  }
249  void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func)
250  {
251  _num_threads = num_threads == 0 ? thread_hint : num_threads;
252 
253  // Set affinity on main thread
254  set_thread_affinity(func(0, thread_hint));
255 
256  // Set affinity on worked threads
257  _threads.clear();
258  for(auto i = 1U; i < _num_threads; ++i)
259  {
260  _threads.emplace_back(func(i, thread_hint));
261  }
262  }
263  unsigned int num_threads() const
264  {
265  return _num_threads;
266  }
267 
268  void run_workloads(std::vector<IScheduler::Workload> &workloads);
269 
270  unsigned int _num_threads;
271  std::list<Thread> _threads;
272  arm_compute::Mutex _run_workloads_mutex{};
273 };
274 
275 /*
276  * This singleton has been deprecated and will be removed in future releases
277  */
279 {
280  static CPPScheduler scheduler;
281  return scheduler;
282 }
283 
285  : _impl(std::make_unique<Impl>(num_threads_hint()))
286 {
287 }
288 
289 CPPScheduler::~CPPScheduler() = default;
290 
291 void CPPScheduler::set_num_threads(unsigned int num_threads)
292 {
293  // No changes in the number of threads while current workloads are running
294  arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
295  _impl->set_num_threads(num_threads, num_threads_hint());
296 }
297 
298 void CPPScheduler::set_num_threads_with_affinity(unsigned int num_threads, BindFunc func)
299 {
300  // No changes in the number of threads while current workloads are running
301  arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
302  _impl->set_num_threads_with_affinity(num_threads, num_threads_hint(), func);
303 }
304 
305 unsigned int CPPScheduler::num_threads() const
306 {
307  return _impl->num_threads();
308 }
309 
310 #ifndef DOXYGEN_SKIP_THIS
311 void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
312 {
313  // Mutex to ensure other threads won't interfere with the setup of the current thread's workloads
314  // Other thread's workloads will be scheduled after the current thread's workloads have finished
315  // This is not great because different threads workloads won't run in parallel but at least they
316  // won't interfere each other and deadlock.
317  arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
318  const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
319  if(num_threads < 1)
320  {
321  return;
322  }
323  ThreadFeeder feeder(num_threads, workloads.size());
325  info.cpu_info = &_cpu_info;
326  info.num_threads = num_threads;
327  unsigned int t = 0;
328  auto thread_it = _impl->_threads.begin();
329  for(; t < num_threads - 1; ++t, ++thread_it)
330  {
331  info.thread_id = t;
332  thread_it->start(&workloads, feeder, info);
333  }
334 
335  info.thread_id = t;
336  process_workloads(workloads, feeder, info);
337 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
338  try
339  {
340 #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
341  for(auto &thread : _impl->_threads)
342  {
343  thread.wait();
344  }
345 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
346  }
347  catch(const std::system_error &e)
348  {
349  std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
350  }
351 #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
352 }
353 #endif /* DOXYGEN_SKIP_THIS */
354 
355 void CPPScheduler::schedule_op(ICPPKernel *kernel, const Hints &hints, const Window &window, ITensorPack &tensors)
356 {
357  schedule_common(kernel, hints, window, tensors);
358 }
359 
360 void CPPScheduler::schedule(ICPPKernel *kernel, const Hints &hints)
361 {
362  ITensorPack tensors;
363  schedule_common(kernel, hints, kernel->window(), tensors);
364 }
365 } // namespace arm_compute
static CPPScheduler & get()
Access the scheduler singleton.
const Window & window() const
The maximum window the kernel can be executed on.
Definition: IKernel.cpp:28
unsigned int num_threads() const override
Returns the number of threads that the SingleThreadScheduler has in his pool.
Common interface for all kernels implemented in C++.
Definition: ICPPKernel.h:38
#define ARM_COMPUTE_ERROR_ON(cond)
If the condition is true then an error message is printed and an exception thrown.
Definition: Error.h:466
std::mutex Mutex
Wrapper of Mutex data-object.
Definition: Mutex.h:33
Copyright (c) 2017-2021 Arm Limited.
void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) override
Sets the number of threads the scheduler will use to run the kernels but also using a binding functio...
std::function< int(int, int)> BindFunc
Function to be used and map a given thread id to a logical core id.
Definition: IScheduler.h:56
#define ARM_COMPUTE_EXIT_ON_MSG(cond, msg)
If the condition is true, the given message is printed and program exits.
Definition: Error.h:379
C++11 implementation of a pool of threads to automatically split a kernel&#39;s execution among several t...
Definition: CPPScheduler.h:35
~CPPScheduler()
Default destructor.
void end(TokenStream &in, bool &valid)
Definition: MLGOParser.cpp:290
void set_num_threads(unsigned int num_threads) override
Sets the number of threads the scheduler will use to run the kernels.
Basic pool of threads to execute CPP/Neon code on several cores in parallel.
FloorUKernelPtr func
ScaleKernelInfo info(interpolation_policy, default_border_mode, PixelValue(), sampling_policy, false)
Information about executing thread and CPU.
Definition: CPPTypes.h:235
void schedule_op(ICPPKernel *kernel, const Hints &hints, const Window &window, ITensorPack &tensors) override
Runs the kernel in the same thread as the caller synchronously.
unsigned int num_threads_hint() const
Get a hint for the best possible number of execution threads.
Definition: IScheduler.cpp:53
Tensor packing service.
Definition: ITensorPack.h:37
std::lock_guard< Mutex > lock_guard
Wrapper of lock_guard data-object.
Definition: Mutex.h:37
Describe a multidimensional execution window.
Definition: Window.h:39
void schedule(ICPPKernel *kernel, const Hints &hints) override
Runs the kernel in the same thread as the caller synchronously.
CPPScheduler()
Constructor: create a pool of threads.