Ocean
Worker.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) Meta Platforms, Inc. and affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  */
7 
8 #ifndef META_OCEAN_BASE_WORKER_H
9 #define META_OCEAN_BASE_WORKER_H
10 
11 #include "ocean/base/Base.h"
12 #include "ocean/base/Caller.h"
13 #include "ocean/base/Signal.h"
14 #include "ocean/base/Thread.h"
15 
16 #include <vector>
17 
18 namespace Ocean
19 {
20 
21 /**
22  * This class implements a worker able to distribute function calls over different threads.
23  * Thus, this worker object can be used to distribute complex operations to several CPU cores to speed up the entire computation significantly.<br>
24  * The worker provides several modes to distribute the computational load of a complex operation.<br>
25  * Function call my be made faster by using subsets of the entire data by individual CPU cores only.<br>
26  * Further, this worker supports abortable functions executing the same function several times and stops all other threads if the first function receives a valid result.<br>
27  * For more details several code examples are provided:
28  * @see executeFunction(), executeFunctions().
29  * @see WorkerPool.
30  * @ingroup base
31  */
32 class OCEAN_BASE_EXPORT Worker
33 {
34  public:
35 
36  /**
37  * Definition of a worker function caller object for standard functions to be distributed.<br>
38  * Standard functions allow the computation using a subset of the entire data.
39  */
41 
42  /**
43  * Definition of a worker function caller object for abort functions.
44  */
46 
47  /**
48  * Definition of a vector holding worker functions.
49  */
50  typedef std::vector<Function> Functions;
51 
52  /**
53  * Definition of CPU load types.
54  */
55  enum LoadType : uint32_t
56  {
57  /// One CPU core is used.
59  /// Half of the CPU cores are used, minimum is one CPU core.
61  /// All CPU cores but one, minimum is one CPU core.
63  /// All CPU cores are used.
65  /// For each CPU core two thread are used.
67  /// A custom amount of CPU cores is used.
68  TYPE_CUSTOM
69  };
70 
71  /**
72  * Definition of a vector holding indices.
73  */
74  typedef std::vector<unsigned int> StartIndices;
75 
76  protected:
77 
78  /**
79  * This class implements a thread with an explicit external thread function.<br>
80  */
81  class WorkerThread : protected Thread
82  {
83  public:
84 
85  /**
86  * Definition of different worker thread states.
87  */
89  {
90  /// The worker thread is in a state with unknown result.
92  /// The worker thread is in a state with positive function result.
94  /// The worker thread is in a state with negative function result.
95  WS_NEGATIVE_RESULT
96  };
97 
98  public:
99 
100  /**
101  * Creates a new worker thread object.
102  * @param workerSeedValue Worker specific seed value e.g., for random number generators, each thread will work with an own seed value: threadSeedValue = workerSeedValue + workerThreadId
103  * @param workerThreadId Id of the worker thread to distinguish between all threads owned by one worker
104  */
105  explicit WorkerThread(const unsigned int workerSeedValue, const unsigned int workerThreadId);
106 
107  /**
108  * Destructs a worker thread object.
109  */
110  virtual ~WorkerThread();
111 
112  /**
113  * Returns the id of this worker thread.
114  * @return Worker thread id
115  */
116  inline unsigned int id();
117 
118  /**
119  * Starts the worker thread.
120  * @param signal Extern worker signal pushed if this thread has finished
121  * @return True, if succeeded
122  */
123  bool start(Signal& signal);
124 
125  /**
126  * Sets a new thread function which will be executed immediately using the internal thread.
127  * @param function Thread function to be set
128  */
129  inline void setThreadFunction(const Function& function);
130 
131  /**
132  * Sets/moves a new thread function which will be executed immediately using the internal thread.
133  * @param function Thread function to be moved
134  */
135  inline void setThreadFunction(Function&& function);
136 
137  /**
138  * Sets a new thread function which will be executed immediately using the internal thread.
139  * @param abortableFunction Thread function able to be aborted during execution
140  * @param abortIndex Index of the boolean abort parameter, which must be a pointer to a boolean parameter
141  */
142  inline void setThreadFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex);
143 
144  /**
145  * Sets/moves a new thread function which will be executed immediately using the internal thread.
146  * @param abortableFunction Thread function able to be aborted during execution, will be moved
147  * @param abortIndex Index of the boolean abort parameter, which must be a pointer to a boolean parameter
148  */
149  inline void setThreadFunction(AbortableFunction&& abortableFunction, const unsigned int abortIndex);
150 
151  /**
152  * Returns the current worker thread state.
153  * @return Worker state
154  */
155  inline WorkerState state() const;
156 
157  private:
158 
159  /**
160  * This thread function has to be overloaded by derived classes.
161  * @see Thread::threadRun().
162  */
163  virtual void threadRun();
164 
165  private:
166 
167  /// Function using the internal thread to execute.
169 
170  /// Abortable function using the internal thread to execute.
172 
173  /// Internal signal handling the internal thread execution.
175 
176  /// External signal determining the termination of the thread function.
177  Signal* externalSignal_ = nullptr;
178 
179  /// Id of the worker thread.
180  unsigned int id_ = (unsigned int)(-1);
181 
182  /// Index the abort parameter for abortable thread functions.
183  unsigned int abortIndex_ = (unsigned int)(-1);
184 
185  /// Worker thread result.
187  };
188 
189  /**
190  * Definition of a vector holding worker threads.
191  */
192  typedef std::vector<WorkerThread*> WorkerThreads;
193 
194  public:
195 
196  /**
197  * Creates a new worker object.
198  * The load type defines the number of cores to be used, however the worker will not address more than 'maximalNumberCores'.
199  * @param loadType Load type used for this worker, must not be TYPE_CUSTOM
200  * @param maximalNumberCores The maximal number of cores to be used, with range [1, infinity)
201  */
202  explicit Worker(const LoadType loadType = TYPE_ALL_CORES, const unsigned int maximalNumberCores = 16u);
203 
204  /**
205  * Creates a new worker object with a custom amount of worker threads.
206  * @param numberCores The number of threads to use, with range [1, infinity)
207  * @param loadType Must be TYPE_CUSTOM
208  */
209  Worker(const unsigned int numberCores, const LoadType loadType);
210 
211  /**
212  * Destructs a worker object.
213  */
215 
216  /**
217  * Returns the number of threads this worker uses.
218  * @return Number of used threads
219  */
220  unsigned int threads() const;
221 
222  /**
223  * Executes a callback function separable by two function parameters.
224  * The first separable function parameter defines the start point.<br>
225  * The second separable function parameter defines the number of iterations for the specified start point.<br>
226  *
227  * This example shows how to use the worker object in combination with a distributable function:<br>
228  * @code
229  * void distributableFunction(unsigned char* data, const unsigned int firstByte, const unsigned int numberBytes)
230  * {
231  * std::cout << "Function call handling bytes: " << firstByte << " up to " << firstByte + numberBytes - 1 << std::endl;
232  *
233  * // do something with the byte buffer
234  * // Beware: Change bytes inside the range [firstByte, firstByte + numberBytes - 1] only!
235  * // ...
236  * }
237  *
238  * void main()
239  * {
240  * // worker object to execute the distributable function
241  * Worker worker;
242  *
243  * // create byte buffer
244  * unsigned char* data = new unsigned char[1024];
245  *
246  * // initialize buffer with something
247  * // ...
248  *
249  * // call the distributable function to do something with the buffer
250  * // start with element: 0
251  * // number of entire elements: 1024
252  * // the index of the function parameter 'firstByte' is 1 in the 'distributableFunction'
253  * // the index of the function parameter 'numberBytes' is 2 in the 'distributableFunction'
254  * worker.executeFunction(Worker::Function::createStatic(&distributeFunction, data, 0u, 0u), 0u, 1024u, 1u, 2u);
255  *
256  * // we can also skip the indices of the two function parameters 'firstByte' and 'numberBytes' as these order and position is like the default case (last two parameters while the number is the last parameter):
257  * worker.executeFunction(Worker::Function::createStatic(&distributeFunction, data, 0u, 0u), 0u, 1024u);
258  *
259  * // use the changed buffer
260  * // ...
261  *
262  * delete [] data;
263  * }
264  * @endcode
265  * Output for a CPU with 4 cores would be e.g. (the order of the output may vary):<br>
266  * Function call handling bytes: 0, 255<br>
267  * Function call handling bytes: 256, 511<br>
268  * Function call handling bytes: 512, 767<br>
269  * Function call handling bytes: 768, 1023<bR>
270  *
271  * @param function Separable function to be execute
272  * @param first First function parameter
273  * @param size Size function parameter
274  * @param firstIndex Index of the worker function parameter receiving the start value, if -1 than the index will be set to the second last parameter, otherwise with range [0, function.parameters())
275  * @param sizeIndex Index of the worker function parameter receiving the number value, if -1 than the index will be set to the last parameter, otherwise with range [0, function.parameters())
276  * @param minimalIterations Minimal number of iterations assigned to one internal thread
277  * @param threadIndex Optional index of the worker function parameter receiving the index of the individual thread
278  * @return True, if succeeded
279  */
280  bool executeFunction(const Function& function, const unsigned int first, const unsigned int size, const unsigned int firstIndex = (unsigned int)(-1), const unsigned int sizeIndex = (unsigned int)(-1), const unsigned int minimalIterations = 1u, const unsigned int threadIndex = (unsigned int)(-1));
281 
282  /**
283  * Executes several callback functions concurrently.
284  *
285  * This example shows how to use the worker object in combination with several individual callback functions:<br>
286  * @code
287  * void function0(unsigned int value)
288  * {
289  * std::cout << "Function0 call with value: " << value << std::endl;
290  *
291  * // do something here
292  * // ...
293  * }
294  *
295  * void function1(std::string value)
296  * {
297  * std::cout << "Function1 call with value: " << value << std::endl;
298  *
299  * // do something here
300  * // ...
301  * }
302  *
303  * void main()
304  * {
305  * // worker object to execute the distributable function
306  * Worker worker;
307  *
308  * // vector with callback functions
309  * Worker::Functions functions;
310  *
311  * // creating simple callback functions and adding them to the vector
312  * functions.push_back(Worker::Function::createStatic(&function0, 9));
313  * functions.push_back(Worker::Function::createStatic(&function0, 101));
314  * functions.push_back(Worker::Function::createStatic(&function1, std::string("hallo"));
315  * functions.push_back(Worker::Function::createStatic(&function1, std::string("world"));
316  *
317  * // call the callback functions concurrently
318  * worker.executeFunctions(functions);
319  *
320  * // do something
321  * }
322  * @endcode
323  * Output (the order of the output may vary):<br>
324  * Function0 call with value: 9<br>
325  * Function1 call with value: world<br>
326  * Function0 call with value: 101<br>
327  * Function1 call with value: hallo<br>
328  *
329  * @param functions Callback function to execute
330  * @return True, if succeeded
331  */
332  bool executeFunctions(const Functions& functions);
333 
334  /**
335  * Executes an abortable function on several CPU cores.
336  * The function must provide an abort parameter. The parameter is a pointer to a boolean state initialized with False.<br>
337  * @param abortableFunction Function supporting an abort state to terminate the function explicitly
338  * @param abortIndex Index of the abort parameter
339  * @param maximalExecutions Number of maximal CPU core executions
340  * @return True, if one of the functions succeeded
341  */
342  bool executeAbortableFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex, const unsigned int maximalExecutions = 0u);
343 
344  /**
345  * Executes an abortable and separable function on several CPU cores.
346  * The function must be separable by two function parameters and must provide an abort parameter allowing to stop the function execution immediately.
347  * @param separableAbortableFunction Function supporting an separation and aborting
348  * @param first The first function parameter
349  * @param size The size function parameter
350  * @param firstIndex Index of the worker function parameter receiving the start value
351  * @param sizeIndex Index of the worker function parameter receiving the number value
352  * @param abortIndex Index of the abort parameter
353  * @param minimalIterations Minimal number of iterations assigned to one internal thread
354  * @return True, if one of the abortable functions succeeded
355  */
356  bool executeSeparableAndAbortableFunction(const AbortableFunction& separableAbortableFunction, const unsigned int first, const unsigned int size, const unsigned int firstIndex, const unsigned int sizeIndex, const unsigned int abortIndex, const unsigned int minimalIterations = 1);
357 
358  /**
359  * Returns a separation this worker would apply to execute a distributable function.
360  * @param first The first function parameter
361  * @param size The size function parameter
362  * @param minimalIterations Minimal number of iterations assigned to one internal thread
363  * @return Resulting separation (indices of the start points)
364  */
365  StartIndices separation(const unsigned int first, const unsigned int size, const unsigned int minimalIterations = 1u);
366 
367  /**
368  * Returns whether this worker uses more than one thread to distribute a function.
369  * @return True, if so
370  */
371  explicit inline operator bool() const;
372 
373  /**
374  * Explicit worker pointer operator.
375  * @return Worker operator
376  */
377  inline operator Worker*() const;
378 
379  protected:
380 
381  /**
382  * Disabled copy constructor.
383  * @param worker Object which would be copied
384  */
385  Worker(const Worker& worker) = delete;
386 
387  /**
388  * Disabled copy operator.
389  * @param worker Object which would be copied
390  * @return Reference to this object
391  */
392  Worker& operator=(const Worker& worker) = delete;
393 
394  protected:
395 
396  /// Worker threads.
398 
399  /// Worker signals determining whether all thread finished their work.
401 
402  /// Worker lock.
404 };
405 
406 inline unsigned int Worker::WorkerThread::id()
407 {
408  return id_;
409 }
410 
412 {
413  ocean_assert(externalSignal_);
414 
415  ocean_assert(!function_);
416  ocean_assert(!abortableFunction_);
417 
418  function_ = function;
419 
420  internalSignal_.pulse();
421 }
422 
424 {
425  ocean_assert(externalSignal_);
426 
427  ocean_assert(!function_);
428  ocean_assert(!abortableFunction_);
429 
430  function_ = std::move(function);
431 
432  internalSignal_.pulse();
433 }
434 
435 inline void Worker::WorkerThread::setThreadFunction(const AbortableFunction& abortableFunction, const unsigned int abortIndex)
436 {
437  ocean_assert(externalSignal_);
438 
439  ocean_assert(!function_);
440  ocean_assert(!abortableFunction_);
441 
442  abortableFunction_ = abortableFunction;
443  abortIndex_ = abortIndex;
444 
445  internalSignal_.pulse();
446 }
447 
448 inline void Worker::WorkerThread::setThreadFunction(AbortableFunction&& abortableFunction, const unsigned int abortIndex)
449 {
450  ocean_assert(externalSignal_);
451 
452  ocean_assert(!function_);
453  ocean_assert(!abortableFunction_);
454 
455  abortableFunction_ = std::move(abortableFunction);
456  abortIndex_ = abortIndex;
457 
458  internalSignal_.pulse();
459 }
460 
462 {
463  return workerState_;
464 }
465 
466 inline Worker::operator bool() const
467 {
468  return threads() > 1;
469 }
470 
471 inline Worker::operator Worker*() const
472 {
473  ocean_assert(false && "Use the explicit address operator instead!");
474  return nullptr;
475 }
476 
477 }
478 
479 #endif // META_OCEAN_BASE_WORKER_H
This class implements a recursive lock object.
Definition: Lock.h:31
This class implements a signal.
Definition: Signal.h:31
bool pulse()
Pulses this signal.
This class implements a signal array.
Definition: Signal.h:132
This class implements a thread.
Definition: Thread.h:115
This class implements a thread with an explicit external thread function.
Definition: Worker.h:82
unsigned int id_
Id of the worker thread.
Definition: Worker.h:180
WorkerState workerState_
Worker thread result.
Definition: Worker.h:186
virtual ~WorkerThread()
Destructs a worker thread object.
Function function_
Function using the internal thread to execute.
Definition: Worker.h:168
Signal internalSignal_
Internal signal handling the internal thread execution.
Definition: Worker.h:174
WorkerState state() const
Returns the current worker thread state.
Definition: Worker.h:461
WorkerThread(const unsigned int workerSeedValue, const unsigned int workerThreadId)
Creates a new worker thread object.
AbortableFunction abortableFunction_
Abortable function using the internal thread to execute.
Definition: Worker.h:171
virtual void threadRun()
This thread function has to be overloaded by derived classes.
bool start(Signal &signal)
Starts the worker thread.
void setThreadFunction(const Function &function)
Sets a new thread function which will be executed immediately using the internal thread.
Definition: Worker.h:411
WorkerState
Definition of different worker thread states.
Definition: Worker.h:89
@ WS_POSITIVE_RESULT
The worker thread is in a state with positive function result.
Definition: Worker.h:93
@ WS_UNKNOWN_RESULT
The worker thread is in a state with unknown result.
Definition: Worker.h:91
unsigned int id()
Returns the id of this worker thread.
Definition: Worker.h:406
This class implements a worker able to distribute function calls over different threads.
Definition: Worker.h:33
Lock lock_
Worker lock.
Definition: Worker.h:403
Worker & operator=(const Worker &worker)=delete
Disabled copy operator.
Caller< bool > AbortableFunction
Definition of a worker function caller object for abort functions.
Definition: Worker.h:45
StartIndices separation(const unsigned int first, const unsigned int size, const unsigned int minimalIterations=1u)
Returns a separation this worker would apply to execute a distributable function.
unsigned int threads() const
Returns the number of threads this worker uses.
WorkerThreads workerThreads_
Worker threads.
Definition: Worker.h:397
bool executeFunctions(const Functions &functions)
Executes several callback functions concurrently.
std::vector< WorkerThread * > WorkerThreads
Definition of a vector holding worker threads.
Definition: Worker.h:192
Caller< void > Function
Definition of a worker function caller object for standard functions to be distributed.
Definition: Worker.h:40
Signals signals_
Worker signals determining whether all thread finished their work.
Definition: Worker.h:400
~Worker()
Destructs a worker object.
bool executeSeparableAndAbortableFunction(const AbortableFunction &separableAbortableFunction, const unsigned int first, const unsigned int size, const unsigned int firstIndex, const unsigned int sizeIndex, const unsigned int abortIndex, const unsigned int minimalIterations=1)
Executes an abortable and separable function on several CPU cores.
Worker(const LoadType loadType=TYPE_ALL_CORES, const unsigned int maximalNumberCores=16u)
Creates a new worker object.
LoadType
Definition of CPU load types.
Definition: Worker.h:56
@ TYPE_DOUBLE_CORES
For each CPU core two thread are used.
Definition: Worker.h:66
@ TYPE_ALL_CORES
All CPU cores are used.
Definition: Worker.h:64
@ TYPE_ONE_CORE
One CPU core is used.
Definition: Worker.h:58
@ TYPE_ALL_BUT_ONE_CORE
All CPU cores but one, minimum is one CPU core.
Definition: Worker.h:62
@ TYPE_HALF_CORES
Half of the CPU cores are used, minimum is one CPU core.
Definition: Worker.h:60
std::vector< unsigned int > StartIndices
Definition of a vector holding indices.
Definition: Worker.h:74
bool executeFunction(const Function &function, const unsigned int first, const unsigned int size, const unsigned int firstIndex=(unsigned int)(-1), const unsigned int sizeIndex=(unsigned int)(-1), const unsigned int minimalIterations=1u, const unsigned int threadIndex=(unsigned int)(-1))
Executes a callback function separable by two function parameters.
std::vector< Function > Functions
Definition of a vector holding worker functions.
Definition: Worker.h:50
Worker(const Worker &worker)=delete
Disabled copy constructor.
Worker(const unsigned int numberCores, const LoadType loadType)
Creates a new worker object with a custom amount of worker threads.
bool executeAbortableFunction(const AbortableFunction &abortableFunction, const unsigned int abortIndex, const unsigned int maximalExecutions=0u)
Executes an abortable function on several CPU cores.
The namespace covering the entire Ocean framework.
Definition: Accessor.h:15