Ocean
Loading...
Searching...
No Matches
Worker.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#ifndef META_OCEAN_BASE_WORKER_H
9#define META_OCEAN_BASE_WORKER_H
10
11#include "ocean/base/Base.h"
12#include "ocean/base/Caller.h"
13#include "ocean/base/Signal.h"
14#include "ocean/base/Thread.h"
15
16#include <vector>
17
18namespace Ocean
19{
20
21/**
22 * This class implements a worker able to distribute function calls over different threads.
23 * Thus, this worker object can be used to distribute complex operations to several CPU cores to speed up the entire computation significantly.<br>
24 * The worker provides several modes to distribute the computational load of a complex operation.<br>
25 * Function call my be made faster by using subsets of the entire data by individual CPU cores only.<br>
26 * Further, this worker supports abortable functions executing the same function several times and stops all other threads if the first function receives a valid result.<br>
27 * For more details several code examples are provided:
28 * @see executeFunction(), executeFunctions().
29 * @see WorkerPool.
30 * @ingroup base
31 */
32class OCEAN_BASE_EXPORT Worker
33{
34 public:
35
36 /**
37 * Definition of a worker function caller object for standard functions to be distributed.<br>
38 * Standard functions allow the computation using a subset of the entire data.
39 */
41
42 /**
43 * Definition of a worker function caller object for abort functions.
44 */
46
47 /**
48 * Definition of a vector holding worker functions.
49 */
50 typedef std::vector<Function> Functions;
51
52 /**
53 * Definition of CPU load types.
54 */
55 enum LoadType : uint32_t
56 {
57 /// One CPU core is used.
59 /// Half of the CPU cores are used, minimum is one CPU core.
61 /// All CPU cores but one, minimum is one CPU core.
63 /// All CPU cores are used.
65 /// For each CPU core two thread are used.
67 /// A custom amount of CPU cores is used.
68 TYPE_CUSTOM
69 };
70
71 /**
72 * Definition of a vector holding indices.
73 */
74 typedef std::vector<unsigned int> StartIndices;
75
76 protected:
77
78 /**
79 * This class implements a thread with an explicit external thread function.<br>
80 */
81 class WorkerThread : protected Thread
82 {
83 public:
84
85 /**
86 * Definition of different worker thread states.
87 */
89 {
90 /// The worker thread is in a state with unknown result.
92 /// The worker thread is in a state with positive function result.
94 /// The worker thread is in a state with negative function result.
95 WS_NEGATIVE_RESULT
96 };
97
98 public:
99
100 /**
101 * Creates a new worker thread object.
102 * @param workerSeedValue Worker specific seed value e.g., for random number generators, each thread will work with an own seed value: threadSeedValue = workerSeedValue + workerThreadId
103 * @param workerThreadId Id of the worker thread to distinguish between all threads owned by one worker
104 */
105 explicit WorkerThread(const unsigned int workerSeedValue, const unsigned int workerThreadId);
106
107 /**
108 * Destructs a worker thread object.
109 */
110 virtual ~WorkerThread();
111
112 /**
113 * Returns the id of this worker thread.
114 * @return Worker thread id
115 */
116 inline unsigned int id();
117
118 /**
119 * Starts the worker thread.
120 * @param signal Extern worker signal pushed if this thread has finished
121 * @return True, if succeeded
122 */
123 bool start(Signal& signal);
124
125 /**
126 * Sets a new thread function which will be executed immediately using the internal thread.
127 * @param function Thread function to be set
128 */
129 inline void setThreadFunction(const Function& function);
130
131 /**
132 * Sets/moves a new thread function which will be executed immediately using the internal thread.
133 * @param function Thread function to be moved
134 */
135 inline void setThreadFunction(Function&& function);
136
137 /**
138 * Sets a new thread function which will be executed immediately using the internal thread.
139 * @param abortableFunction Thread function able to be aborted during execution
140 * @param abortIndex Index of the boolean abort parameter, which must be a pointer to a boolean parameter
141 */
142 inline void setThreadFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex);
143
144 /**
145 * Sets/moves a new thread function which will be executed immediately using the internal thread.
146 * @param abortableFunction Thread function able to be aborted during execution, will be moved
147 * @param abortIndex Index of the boolean abort parameter, which must be a pointer to a boolean parameter
148 */
149 inline void setThreadFunction(AbortableFunction&& abortableFunction, const unsigned int abortIndex);
150
151 /**
152 * Returns the current worker thread state.
153 * @return Worker state
154 */
155 inline WorkerState state() const;
156
157 private:
158
159 /**
160 * This thread function has to be overloaded by derived classes.
161 * @see Thread::threadRun().
162 */
163 virtual void threadRun();
164
165 private:
166
167 /// Function using the internal thread to execute.
169
170 /// Abortable function using the internal thread to execute.
172
173 /// Internal signal handling the internal thread execution.
175
176 /// External signal determining the termination of the thread function.
177 Signal* externalSignal_ = nullptr;
178
179 /// Id of the worker thread.
180 unsigned int id_ = (unsigned int)(-1);
181
182 /// Index the abort parameter for abortable thread functions.
183 unsigned int abortIndex_ = (unsigned int)(-1);
184
185 /// Worker thread result.
187 };
188
189 /**
190 * Definition of a vector holding worker threads.
191 */
192 typedef std::vector<WorkerThread*> WorkerThreads;
193
194 public:
195
196 /**
197 * Creates a new worker object.
198 * The load type defines the number of cores to be used, however the worker will not address more than 'maximalNumberCores'.
199 * @param loadType Load type used for this worker, must not be TYPE_CUSTOM
200 * @param maximalNumberCores The maximal number of cores to be used, with range [1, infinity)
201 */
202 explicit Worker(const LoadType loadType = TYPE_ALL_CORES, const unsigned int maximalNumberCores = 16u);
203
204 /**
205 * Creates a new worker object with a custom amount of worker threads.
206 * @param numberCores The number of threads to use, with range [1, infinity)
207 * @param loadType Must be TYPE_CUSTOM
208 */
209 Worker(const unsigned int numberCores, const LoadType loadType);
210
211 /**
212 * Destructs a worker object.
213 */
215
216 /**
217 * Returns the number of threads this worker uses.
218 * @return Number of used threads
219 */
220 unsigned int threads() const;
221
222 /**
223 * Executes a callback function separable by two function parameters.
224 * The first separable function parameter defines the start point.<br>
225 * The second separable function parameter defines the number of iterations for the specified start point.<br>
226 *
227 * This example shows how to use the worker object in combination with a distributable function:<br>
228 * @code
229 * void distributableFunction(unsigned char* data, const unsigned int firstByte, const unsigned int numberBytes)
230 * {
231 * std::cout << "Function call handling bytes: " << firstByte << " up to " << firstByte + numberBytes - 1 << std::endl;
232 *
233 * // do something with the byte buffer
234 * // Beware: Change bytes inside the range [firstByte, firstByte + numberBytes - 1] only!
235 * // ...
236 * }
237 *
238 * void main()
239 * {
240 * // worker object to execute the distributable function
241 * Worker worker;
242 *
243 * // create byte buffer
244 * unsigned char* data = new unsigned char[1024];
245 *
246 * // initialize buffer with something
247 * // ...
248 *
249 * // call the distributable function to do something with the buffer
250 * // start with element: 0
251 * // number of entire elements: 1024
252 * // the index of the function parameter 'firstByte' is 1 in the 'distributableFunction'
253 * // the index of the function parameter 'numberBytes' is 2 in the 'distributableFunction'
254 * worker.executeFunction(Worker::Function::createStatic(&distributeFunction, data, 0u, 0u), 0u, 1024u, 1u, 2u);
255 *
256 * // we can also skip the indices of the two function parameters 'firstByte' and 'numberBytes' as these order and position is like the default case (last two parameters while the number is the last parameter):
257 * worker.executeFunction(Worker::Function::createStatic(&distributeFunction, data, 0u, 0u), 0u, 1024u);
258 *
259 * // use the changed buffer
260 * // ...
261 *
262 * delete [] data;
263 * }
264 * @endcode
265 * Output for a CPU with 4 cores would be e.g. (the order of the output may vary):<br>
266 * Function call handling bytes: 0, 255<br>
267 * Function call handling bytes: 256, 511<br>
268 * Function call handling bytes: 512, 767<br>
269 * Function call handling bytes: 768, 1023<bR>
270 *
271 * @param function Separable function to be execute
272 * @param first First function parameter
273 * @param size Size function parameter
274 * @param firstIndex Index of the worker function parameter receiving the start value, if -1 than the index will be set to the second last parameter, otherwise with range [0, function.parameters())
275 * @param sizeIndex Index of the worker function parameter receiving the number value, if -1 than the index will be set to the last parameter, otherwise with range [0, function.parameters())
276 * @param minimalIterations Minimal number of iterations assigned to one internal thread
277 * @param threadIndex Optional index of the worker function parameter receiving the index of the individual thread
278 * @return True, if succeeded
279 */
280 bool executeFunction(const Function& function, const unsigned int first, const unsigned int size, const unsigned int firstIndex = (unsigned int)(-1), const unsigned int sizeIndex = (unsigned int)(-1), const unsigned int minimalIterations = 1u, const unsigned int threadIndex = (unsigned int)(-1));
281
282 /**
283 * Executes several callback functions concurrently.
284 *
285 * This example shows how to use the worker object in combination with several individual callback functions:<br>
286 * @code
287 * void function0(unsigned int value)
288 * {
289 * std::cout << "Function0 call with value: " << value << std::endl;
290 *
291 * // do something here
292 * // ...
293 * }
294 *
295 * void function1(std::string value)
296 * {
297 * std::cout << "Function1 call with value: " << value << std::endl;
298 *
299 * // do something here
300 * // ...
301 * }
302 *
303 * void main()
304 * {
305 * // worker object to execute the distributable function
306 * Worker worker;
307 *
308 * // vector with callback functions
309 * Worker::Functions functions;
310 *
311 * // creating simple callback functions and adding them to the vector
312 * functions.push_back(Worker::Function::createStatic(&function0, 9));
313 * functions.push_back(Worker::Function::createStatic(&function0, 101));
314 * functions.push_back(Worker::Function::createStatic(&function1, std::string("hallo"));
315 * functions.push_back(Worker::Function::createStatic(&function1, std::string("world"));
316 *
317 * // call the callback functions concurrently
318 * worker.executeFunctions(functions);
319 *
320 * // do something
321 * }
322 * @endcode
323 * Output (the order of the output may vary):<br>
324 * Function0 call with value: 9<br>
325 * Function1 call with value: world<br>
326 * Function0 call with value: 101<br>
327 * Function1 call with value: hallo<br>
328 *
329 * @param functions Callback function to execute
330 * @return True, if succeeded
331 */
332 bool executeFunctions(const Functions& functions);
333
334 /**
335 * Executes an abortable function on several CPU cores.
336 * The function must provide an abort parameter. The parameter is a pointer to a boolean state initialized with False.<br>
337 * @param abortableFunction Function supporting an abort state to terminate the function explicitly
338 * @param abortIndex Index of the abort parameter
339 * @param maximalExecutions Number of maximal CPU core executions
340 * @return True, if one of the functions succeeded
341 */
342 bool executeAbortableFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex, const unsigned int maximalExecutions = 0u);
343
344 /**
345 * Executes an abortable and separable function on several CPU cores.
346 * The function must be separable by two function parameters and must provide an abort parameter allowing to stop the function execution immediately.
347 * @param separableAbortableFunction Function supporting an separation and aborting
348 * @param first The first function parameter
349 * @param size The size function parameter
350 * @param firstIndex Index of the worker function parameter receiving the start value
351 * @param sizeIndex Index of the worker function parameter receiving the number value
352 * @param abortIndex Index of the abort parameter
353 * @param minimalIterations Minimal number of iterations assigned to one internal thread
354 * @return True, if one of the abortable functions succeeded
355 */
356 bool executeSeparableAndAbortableFunction(const AbortableFunction& separableAbortableFunction, const unsigned int first, const unsigned int size, const unsigned int firstIndex, const unsigned int sizeIndex, const unsigned int abortIndex, const unsigned int minimalIterations = 1);
357
358 /**
359 * Returns a separation this worker would apply to execute a distributable function.
360 * @param first The first function parameter
361 * @param size The size function parameter
362 * @param minimalIterations Minimal number of iterations assigned to one internal thread
363 * @return Resulting separation (indices of the start points)
364 */
365 StartIndices separation(const unsigned int first, const unsigned int size, const unsigned int minimalIterations = 1u);
366
367 /**
368 * Returns whether this worker uses more than one thread to distribute a function.
369 * @return True, if so
370 */
371 explicit inline operator bool() const;
372
373 /**
374 * Explicit worker pointer operator.
375 * @return Worker operator
376 */
377 inline operator Worker*() const;
378
379 protected:
380
381 /**
382 * Disabled copy constructor.
383 * @param worker Object which would be copied
384 */
385 Worker(const Worker& worker) = delete;
386
387 /**
388 * Disabled copy operator.
389 * @param worker Object which would be copied
390 * @return Reference to this object
391 */
392 Worker& operator=(const Worker& worker) = delete;
393
394 protected:
395
396 /// Worker threads.
398
399 /// Worker signals determining whether all thread finished their work.
401
402 /// Worker lock.
404};
405
406inline unsigned int Worker::WorkerThread::id()
407{
408 return id_;
409}
410
412{
413 ocean_assert(externalSignal_);
414
415 ocean_assert(!function_);
416 ocean_assert(!abortableFunction_);
417
418 function_ = function;
419
420 internalSignal_.pulse();
421}
422
424{
425 ocean_assert(externalSignal_);
426
427 ocean_assert(!function_);
428 ocean_assert(!abortableFunction_);
429
430 function_ = std::move(function);
431
432 internalSignal_.pulse();
433}
434
435inline void Worker::WorkerThread::setThreadFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex)
436{
437 ocean_assert(externalSignal_);
438
439 ocean_assert(!function_);
440 ocean_assert(!abortableFunction_);
441
442 abortableFunction_ = abortableFunction;
443 abortIndex_ = abortIndex;
444
445 internalSignal_.pulse();
446}
447
448inline void Worker::WorkerThread::setThreadFunction(AbortableFunction&& abortableFunction, const unsigned int abortIndex)
449{
450 ocean_assert(externalSignal_);
451
452 ocean_assert(!function_);
453 ocean_assert(!abortableFunction_);
454
455 abortableFunction_ = std::move(abortableFunction);
456 abortIndex_ = abortIndex;
457
458 internalSignal_.pulse();
459}
460
462{
463 return workerState_;
464}
465
466inline Worker::operator bool() const
467{
468 return threads() > 1;
469}
470
471inline Worker::operator Worker*() const
472{
473 ocean_assert(false && "Use the explicit address operator instead!");
474 return nullptr;
475}
476
477}
478
479#endif // META_OCEAN_BASE_WORKER_H
This class implements a callback function container using defined function parameters.
Definition Caller.h:1565
This class implements a recursive lock object.
Definition Lock.h:31
This class implements a signal.
Definition Signal.h:31
bool pulse()
Pulses this signal.
This class implements a signal array.
Definition Signal.h:132
This class implements a thread.
Definition Thread.h:115
This class implements a thread with an explicit external thread function.
Definition Worker.h:82
unsigned int id_
Id of the worker thread.
Definition Worker.h:180
WorkerState workerState_
Worker thread result.
Definition Worker.h:186
virtual ~WorkerThread()
Destructs a worker thread object.
Function function_
Function using the internal thread to execute.
Definition Worker.h:168
Signal internalSignal_
Internal signal handling the internal thread execution.
Definition Worker.h:174
WorkerState state() const
Returns the current worker thread state.
Definition Worker.h:461
WorkerThread(const unsigned int workerSeedValue, const unsigned int workerThreadId)
Creates a new worker thread object.
AbortableFunction abortableFunction_
Abortable function using the internal thread to execute.
Definition Worker.h:171
virtual void threadRun()
This thread function has to be overloaded by derived classes.
bool start(Signal &signal)
Starts the worker thread.
void setThreadFunction(const Function &function)
Sets a new thread function which will be executed immediately using the internal thread.
Definition Worker.h:411
WorkerState
Definition of different worker thread states.
Definition Worker.h:89
@ WS_POSITIVE_RESULT
The worker thread is in a state with positive function result.
Definition Worker.h:93
@ WS_UNKNOWN_RESULT
The worker thread is in a state with unknown result.
Definition Worker.h:91
unsigned int id()
Returns the id of this worker thread.
Definition Worker.h:406
This class implements a worker able to distribute function calls over different threads.
Definition Worker.h:33
Lock lock_
Worker lock.
Definition Worker.h:403
Caller< bool > AbortableFunction
Definition of a worker function caller object for abort functions.
Definition Worker.h:45
Worker & operator=(const Worker &worker)=delete
Disabled copy operator.
StartIndices separation(const unsigned int first, const unsigned int size, const unsigned int minimalIterations=1u)
Returns a separation this worker would apply to execute a distributable function.
unsigned int threads() const
Returns the number of threads this worker uses.
WorkerThreads workerThreads_
Worker threads.
Definition Worker.h:397
bool executeFunctions(const Functions &functions)
Executes several callback functions concurrently.
std::vector< WorkerThread * > WorkerThreads
Definition of a vector holding worker threads.
Definition Worker.h:192
Caller< void > Function
Definition of a worker function caller object for standard functions to be distributed.
Definition Worker.h:40
Signals signals_
Worker signals determining whether all thread finished their work.
Definition Worker.h:400
~Worker()
Destructs a worker object.
bool executeSeparableAndAbortableFunction(const AbortableFunction &separableAbortableFunction, const unsigned int first, const unsigned int size, const unsigned int firstIndex, const unsigned int sizeIndex, const unsigned int abortIndex, const unsigned int minimalIterations=1)
Executes an abortable and separable function on several CPU cores.
Worker(const LoadType loadType=TYPE_ALL_CORES, const unsigned int maximalNumberCores=16u)
Creates a new worker object.
LoadType
Definition of CPU load types.
Definition Worker.h:56
@ TYPE_DOUBLE_CORES
For each CPU core two thread are used.
Definition Worker.h:66
@ TYPE_ALL_CORES
All CPU cores are used.
Definition Worker.h:64
@ TYPE_ONE_CORE
One CPU core is used.
Definition Worker.h:58
@ TYPE_ALL_BUT_ONE_CORE
All CPU cores but one, minimum is one CPU core.
Definition Worker.h:62
@ TYPE_HALF_CORES
Half of the CPU cores are used, minimum is one CPU core.
Definition Worker.h:60
std::vector< unsigned int > StartIndices
Definition of a vector holding indices.
Definition Worker.h:74
bool executeFunction(const Function &function, const unsigned int first, const unsigned int size, const unsigned int firstIndex=(unsigned int)(-1), const unsigned int sizeIndex=(unsigned int)(-1), const unsigned int minimalIterations=1u, const unsigned int threadIndex=(unsigned int)(-1))
Executes a callback function separable by two function parameters.
std::vector< Function > Functions
Definition of a vector holding worker functions.
Definition Worker.h:50
Worker(const Worker &worker)=delete
Disabled copy constructor.
Worker(const unsigned int numberCores, const LoadType loadType)
Creates a new worker object with a custom amount of worker threads.
bool executeAbortableFunction(const AbortableFunction &abortableFunction, const unsigned int abortIndex, const unsigned int maximalExecutions=0u)
Executes an abortable function on several CPU cores.
The namespace covering the entire Ocean framework.
Definition Accessor.h:15