34 #include <condition_variable> 39 #include <system_error> 54 explicit ThreadFeeder(
unsigned int start = 0,
unsigned int end = 0)
64 bool get_next(
unsigned int &next)
66 next = atomic_fetch_add_explicit(&_atomic_counter, 1u, std::memory_order_relaxed);
71 std::atomic_uint _atomic_counter;
72 const unsigned int _end;
83 void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder,
const ThreadInfo &
info)
85 unsigned int workload_index = info.thread_id;
89 workloads[workload_index](
info);
91 while(feeder.get_next(workload_index));
94 void set_thread_affinity(
int core_id)
101 #if !defined(__APPLE__) 104 CPU_SET(core_id, &
set);
118 explicit Thread(
int core_pin = -1);
120 Thread(
const Thread &) =
delete;
121 Thread &operator=(
const Thread &) =
delete;
122 Thread(Thread &&) =
delete;
123 Thread &operator=(Thread &&) =
delete;
136 void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &info);
142 void worker_thread();
145 std::thread _thread{};
147 std::vector<IScheduler::Workload> *_workloads{
nullptr };
148 ThreadFeeder *_feeder{
nullptr };
150 std::condition_variable _cv{};
151 bool _wait_for_work{
false };
152 bool _job_complete{
true };
153 std::exception_ptr _current_exception{
nullptr };
157 Thread::Thread(
int core_pin)
158 : _core_pin(core_pin)
160 _thread = std::thread(&Thread::worker_thread,
this);
166 if(_thread.joinable())
169 start(
nullptr, feeder, ThreadInfo());
174 void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder,
const ThreadInfo &info)
176 _workloads = workloads;
180 std::lock_guard<std::mutex> lock(_m);
181 _wait_for_work =
true;
182 _job_complete =
false;
190 std::unique_lock<std::mutex> lock(_m);
191 _cv.wait(lock, [&] {
return _job_complete; });
194 if(_current_exception)
196 std::rethrow_exception(_current_exception);
200 void Thread::worker_thread()
202 set_thread_affinity(_core_pin);
206 std::unique_lock<std::mutex> lock(_m);
207 _cv.wait(lock, [&] {
return _wait_for_work; });
208 _wait_for_work =
false;
210 _current_exception =
nullptr;
213 if(_workloads ==
nullptr)
218 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 222 process_workloads(*_workloads, *_feeder, _info);
224 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 228 _current_exception = std::current_exception();
231 _job_complete =
true;
238 struct CPPScheduler::Impl final
240 explicit Impl(
unsigned int thread_hint)
241 : _num_threads(thread_hint), _threads(_num_threads - 1)
244 void set_num_threads(
unsigned int num_threads,
unsigned int thread_hint)
246 _num_threads = num_threads == 0 ? thread_hint : num_threads;
247 _threads.resize(_num_threads - 1);
249 void set_num_threads_with_affinity(
unsigned int num_threads,
unsigned int thread_hint, BindFunc
func)
251 _num_threads = num_threads == 0 ? thread_hint : num_threads;
254 set_thread_affinity(
func(0, thread_hint));
258 for(
auto i = 1U; i < _num_threads; ++i)
260 _threads.emplace_back(
func(i, thread_hint));
263 unsigned int num_threads()
const 268 void run_workloads(std::vector<IScheduler::Workload> &workloads);
270 unsigned int _num_threads;
271 std::list<Thread> _threads;
285 : _impl(
std::make_unique<Impl>(num_threads_hint()))
302 _impl->set_num_threads_with_affinity(num_threads,
num_threads_hint(), func);
307 return _impl->num_threads();
310 #ifndef DOXYGEN_SKIP_THIS 311 void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
318 const unsigned int num_threads = std::min(_impl->num_threads(),
static_cast<unsigned int>(workloads.size()));
323 ThreadFeeder feeder(num_threads, workloads.size());
325 info.cpu_info = &_cpu_info;
328 auto thread_it = _impl->_threads.begin();
329 for(; t < num_threads - 1; ++
t, ++thread_it)
332 thread_it->start(&workloads, feeder, info);
336 process_workloads(workloads, feeder, info);
337 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 341 for(
auto &thread : _impl->_threads)
345 #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED 347 catch(
const std::system_error &e)
349 std::cerr <<
"Caught system_error with code " << e.code() <<
" meaning " << e.what() <<
'\n';
357 schedule_common(kernel, hints, window, tensors);
363 schedule_common(kernel, hints, kernel->
window(), tensors);
static CPPScheduler & get()
Access the scheduler singleton.
const Window & window() const
The maximum window the kernel can be executed on.
unsigned int num_threads() const override
Returns the number of threads that the SingleThreadScheduler has in his pool.
Common interface for all kernels implemented in C++.
#define ARM_COMPUTE_ERROR_ON(cond)
If the condition is true then an error message is printed and an exception thrown.
std::mutex Mutex
Wrapper of Mutex data-object.
Copyright (c) 2017-2021 Arm Limited.
void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) override
Sets the number of threads the scheduler will use to run the kernels but also using a binding functio...
std::function< int(int, int)> BindFunc
Function to be used and map a given thread id to a logical core id.
#define ARM_COMPUTE_EXIT_ON_MSG(cond, msg)
If the condition is true, the given message is printed and program exits.
C++11 implementation of a pool of threads to automatically split a kernel's execution among several t...
~CPPScheduler()
Default destructor.
void end(TokenStream &in, bool &valid)
void set_num_threads(unsigned int num_threads) override
Sets the number of threads the scheduler will use to run the kernels.
Basic pool of threads to execute CPP/Neon code on several cores in parallel.
ScaleKernelInfo info(interpolation_policy, default_border_mode, PixelValue(), sampling_policy, false)
Information about executing thread and CPU.
void schedule_op(ICPPKernel *kernel, const Hints &hints, const Window &window, ITensorPack &tensors) override
Runs the kernel in the same thread as the caller synchronously.
unsigned int num_threads_hint() const
Get a hint for the best possible number of execution threads.
std::lock_guard< Mutex > lock_guard
Wrapper of lock_guard data-object.
Describe a multidimensional execution window.
void schedule(ICPPKernel *kernel, const Hints &hints) override
Runs the kernel in the same thread as the caller synchronously.
CPPScheduler()
Constructor: create a pool of threads.