ALib C++ Framework
by
Library Version: 2605 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
dedicatedworker.hpp
Go to the documentation of this file.
1//==================================================================================================
2/// \file
3/// This header-file is part of module \alib_threadmodel of the \aliblong.
4///
5/// Copyright 2013-2026 A-Worx GmbH, Germany.
6/// Published under #"mainpage_license".
7//==================================================================================================
8ALIB_EXPORT namespace alib { namespace threadmodel {
9class DWManager;
10
11//==================================================================================================
12/// \attention This class belongs to module \alib_threadmodel, which is not in a fully stable and
13/// consistent state, yet.
14/// Also, this type is considered experimental.
15///
16/// This #"Singleton;singleton-class" manages worker threads of type
17/// #"DedicatedWorker".
18/// Such threads are started by adding them to the singleton instance of this type and are stopped
19/// with removal.
20///
21/// Class #"DedicatedWorker" uses this type's #"%PoolAllocator" to create and dispose #"Job"
22/// objects and derived types are allowed to allocate resources in its #"%MonoAllocator".
23//==================================================================================================
24class DWManager : public singletons::Singleton<DWManager>
25 , public Lock
26
27{
28 friend class singletons::Singleton<DWManager>;
29
30 protected:
31 /// Mono allocator. Used for commands and by <b>DedicatedWorker</b>s.
33
34 /// Pool allocator. Used for command objects.
36
37 /// The list of workers.
39
40 private:
41 /// Constructor.
43
44 public:
45 /// Allows access to the singleton instances' allocator.
46 /// @return The monotonic allocator.
48
49 /// Allows access to the singleton instances' pool allocator.
50 /// @return The pool allocator.
52
53 /// Adds and starts a worker.
54 /// @param thread The thread to add.
55 ALIB_DLL void Add( DedicatedWorker& thread );
56
57 /// Remove a previously added worker.
58 /// The thread is stopped by invoking #"DedicatedWorker::ScheduleStop;*" using the given
59 /// priority. It is waited until it exits, and finally the thread is joined.<br>
60 /// With debug-compilations, an \alib_warning is raised every second if the thread
61 /// does not stop.
62 /// @param thread The thread to remove.
63 /// @param stopPriority The priority passed to <b>DedicatedWorker::ScheduleStop</b>.
64 /// Use #"%Lowest" (the default) to ensure that all other commands are duly
65 /// executed before exiting.
66 /// @return \c true if the thread was found and removed, \c false otherwise.
67 ALIB_DLL bool Remove( DedicatedWorker& thread,
68 Priority stopPriority= Priority::Lowest );
69
70
71 #if DOXYGEN
72 /// Waits until all threads are idle.
73 /// @param timeout The maximum time to wait.
74 /// @param dbgWarnAfter The time after which a warning message will be printed to the
75 /// debug log if the timeout was reached.<br>
76 /// This parameter is only available in debug-compilations and thus
77 /// should be passed using macro #"ALIB_DBG".
78 /// @return \c true if all threads are idle, \c false otherwise.
80 bool WaitForAllIdle( Ticks::Duration timeout,
81 Ticks::Duration dbgWarnAfter );
82 #else
83 ALIB_DLL bool WaitForAllIdle( Ticks::Duration timeout
84 ALIB_DBG(, Ticks::Duration dbgWarnAfter) );
85 bool WaitForAllIdle( Ticks::Duration::TDuration timeout
86 ALIB_DBG(, Ticks::Duration::TDuration dbgWarnAfter) )
87 { return WaitForAllIdle( Ticks::Duration(timeout) ALIB_DBG(,Ticks::Duration(dbgWarnAfter))); }
88
89 #endif
90
91
92 /// Stops execution and terminates all workers by invoking
93 /// #"DedicatedWorker::ScheduleStop;*".<br>
94 /// After this method has been invoked, no further commands should be scheduled, even if
95 /// the given priority equals #"%Lowest". This <b>should be assured</b> by the using code.
96 ///
97 /// With debug-compilations, an \alib_warning is raised every second until all managed
98 /// threads stopped.
99 /// @param stopPriority The priority passed to <b>DedicatedWorker::ScheduleStop</b>.
100 /// Use #"%Lowest" (the default) to ensure that all other commands are duly
101 /// executed before exiting.
102 ALIB_DLL void RemoveAll(Priority stopPriority= Priority::Lowest);
103
104}; // class DWManager
105
106//==================================================================================================
107/// \attention
108/// This class is part of \b experimental module \alib_threadmodel and thus is experimental
109/// in respect to interface, functionality, and documentation.
110///
111///
112/// This class implements a worker thread that receives jobs from a private queue.
113/// Jobs can have a different #"threadmodel::Priority", and thus may not be executed in the
114/// order of insertion.
115/// This concept comprises a very simple scheduling approach and should not be overused.
116/// In the end, any multithreading software has to be designed in a way that a job queue never
117/// grows to a high level, so that all jobs will be processed in a maximum time.
118///
119/// This class is virtual, and it is designed for inheritance: Using software has to derive custom
120/// types that provide the custom functionality.
121///
122/// Jobs are represented with the protected type #"Job".
123///
124/// Derived custom types need to duly fulfill a certain contract of interaction.
125/// In short, the following rules are given:
126/// 1. Derived types define jobs by exposing a type derived from virtual class
127/// #"Job".
128/// Preferably, the derived types are designed to be public inner types because they
129/// usually explicitly belong to the scope of an #"%DedicatedWorker".<br>
130/// A #"%Job" - besides its inherited type-ID - contains input data to the #"%DedicatedWorker"
131/// as well as output data to the requesting thread.
132/// (Both are optional, as can be seen, for example, with the very simple built-in type
133/// #"DedicatedWorker::JobTrigger" that has neither input nor output data.)
134/// <br><br>
135/// 2. The derived type exposes interface methods according to the different jobs available
136/// to be scheduled.
137/// Here, the #"%Job" is constructed and returned to the caller, usually as a reference
138/// in its derived form.<br>
139/// Thus, the worker has knowledge of what is to be expected when extracting a job from its
140/// queue, and with that is able to safely cast the virtual type "up" to the according
141/// derived type.
142/// <br><br>
143/// 3. The #"%Job" objects returned by the interface (usually) contain some sort of thread-safe
144/// acknowledge mechanics, for example, a #"Promise" or a #"threads::Event" that a caller can
145/// decide to wait on.
146/// <br><br>
147/// 4. Disposal of #"%Job" instances has to be explicitly performed by the thread that schedules
148/// a job. There are two methods provided for doing it.
149/// The first is #".DeleteJob", which performs instant deletion. This can be called \b after
150/// the job was processed. The latter has to be awaited, as described in the previous paragraph.
151/// The second is #".DeleteJobDeferred", which covers the case that the caller decides not to
152/// wait for an acknowledgment.
153/// Here, the instance must not be deleted because the #"%DedicatedWorker" will access it when
154/// it is extracted from the queue (or already is working on it).
155/// Thus, this second version schedules a deletion job and lets the worker do it.<br>
156/// The deletion job has a low priority of #"Priority::DeferredDeletion". Jobs scheduled with an
157/// even lower priority must not be deferred-deleted.
158/// \note A similar concept of deferred-deletion is provided with sibling type #"ThreadPool".
159/// There, deferred-deletion has a huge negative impact on the performance of the pool.
160/// With type #"%DedicatedWorker", the negative impact of deferred deletion is negligible.
161/// <br><br>
162///
163/// 5. The custom types #"%Job" types have to likewise fulfill a few contractual rules.
164/// Those are given with the type's #"Job;reference documentation".
165///
166/// ### Starting and Stopping: ###
167/// The class inherits type #"Thread" as a protected base type. With that, the
168/// conventional methods to start and stop a thread are hidden.<br>
169/// Starting and stopping a #"%DedicatedWorker" is instead performed by adding, respectively
170/// removing an instance of this type to singleton class #"DWManager".
171///
172/// ### Triggering: ###
173/// This type implements abstract interface #"Triggered".
174/// If this is considered useful by a derived type, then three things have to be performed:
175/// - The parameterless job #"DedicatedWorker::JobTrigger" has to be processed
176/// in the overridden method #"process".
177/// - Field #"triggerDuration" has to be adjusted as required, or as an alternative, the method
178/// #".triggerPeriod" has to be overridden. The latter might allow more flexibility to
179/// adjust the trigger time dependent on certain custom states.
180/// - The instance of the custom #"%DedicatedWorker" has to be registered by calling method
181/// #"Trigger::Add;*".
182//==================================================================================================
184 , protected Triggered
185 , protected TCondition<DedicatedWorker>
186{
187 #if !DOXYGEN
188 friend class DWManager;
190 friend class lang::Owner<DedicatedWorker&>; /// needed as we inherit TCondition
191 #endif
192 protected:
193
194 //================================================================================================
195 // Fields
196 //================================================================================================
197
198 /// Reference to #"%DWManager" instance.
200
201 /// Statistical information: Point in time of last job execution.
202 /// In case no job was executed, yet, this is the creation time of the object.
204
205 /// Flag which is set when the stop-job was scheduled.
206 bool stopJobPushed = false;
207
208 /// Flag which is set when the stop-job was executed.
209 bool stopJobExecuted = false;
210
211 /// If this #"%DedicatedWorker" is (in addition) attached to a #"DWManager" as a
212 /// triggered object, this duration is returned by overridden method
213 /// #"Triggered::triggerPeriod;*" to determine the interval between
214 /// scheduling two trigger jobs.<br>
215 /// Defaults to one second, which usually is changed by a derived type.
216 Ticks::Duration triggerDuration = Ticks::Duration::FromSeconds(1);
217
218 //================================================================================================
219 // The queue
220 //================================================================================================
221 /// Container element of the queue.
223 Job* job; ///< The job containing the pool-allocated shared data.
224 Priority priority; ///< The job's priority.
225 bool keepJob; ///< Flag which indicates whether the job should be deleted after
226 ///< execution.
227 };
228
229 /// The queue of jobs.
230 /// This is a simple list, instead of a 'real' priority queue.
231 /// This design decision was taken because there should never be too many jobs queued
232 /// and the naive iteration is more efficient than using a 'real' priority queue type.
234
235 /// The current number of jobs in the queue.
237
238 /// Mandatory method needed and invoked by templated base type #"TCondition".
239 /// @return \c true if field #".queue" is not empty, \c false otherwise.
240 bool isConditionMet() { return length > 0; }
241
242 /// Pushes the given \p{jobInfo} into the priority queue that this class implements.
243 /// When invoked, the thread-manager as well as this instance are both locked.
244 /// @param jobInfo The job, the priority, and a flag if this job is to be deleted
245 /// automatically.
247 void pushAndRelease(QueueElement&& jobInfo);
248
249 /// Moves the job of highest priority out of the queue.
250 /// Blocks the thread until a job is available.
251 /// @return The job with the highest priority.
252 std::pair<Job*, bool> pop();
253
254 //================================================================================================
255 // Inner Job types
256 //================================================================================================
257 /// The stop job sent by method #"ScheduleStop".
258 struct JobStop : Job {
259 /// Constructor.
260 JobStop() : Job( typeid(JobStop) ) {}
261 };
262
263 /// The job sent by methods #"DedicatedWorker::DeleteJobDeferred" and
264 /// #"ThreadPool::DeleteJobDeferred".
265 struct JobDeleter : Job {
266 /// The job to be deleted.
268
269 /// Constructor.
270 /// @param job The job that is scheduled to be deleted.
272 : Job( typeid(JobDeleter) )
273 , JobToDelete( job ) {}
274
275 /// Overrides the parent function as necessary.
276 /// @return The sizeof this derived type.
277 virtual size_t SizeOf() override { return sizeof(JobDeleter); }
278 };
279
280 /// The job sent if (optional) trigger-interface #"Triggered::trigger;*"
281 /// is invoked.
282 struct JobTrigger : Job {
283 /// Constructor.
284 JobTrigger() : Job( typeid(JobTrigger) ) {}
285 };
286
287 /// Pushes a custom job into the priority queue.<br>
288 /// This method is protected because derived types should provide dedicated "speaking"
289 /// interfaces for scheduling jobs.
290 /// Those are typically short inline methods, which/ are optimized out by C++ compilers.
291 /// @tparam TJob The job type as well as the job's shared data type.
292 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
293 /// @param priority The priority of the job.
294 /// @param keepJob Denotes whether the job should be deleted after execution or not.
295 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
296 /// @return The shared data. Deletion of the object is the responsibility of the caller and
297 /// is to be done by either invoking #"DedicatedWorker::DeleteJob"
298 /// or #"DedicatedWorker::DeleteJobDeferred".
299 template<typename TJob, typename... TArgs>
300 [[nodiscard]]
301 TJob& schedule( Priority priority,
302 bool keepJob,
303 TArgs&&... args ) {
305 TJob* job= manager.GetPoolAllocator()().New<TJob>( std::forward<TArgs>(args)... );
307 ALIB_ASSERT_ERROR( job->SizeOf()==sizeof(TJob), "TMOD",
308 "Error in DedicatedWorker::schedule: Job size mismatch. Expected {} "
309 "while virtual method SizeOf returns {}.\n"
310 "Override this method for job-type <{}>",
311 sizeof(TJob), job->SizeOf(), &typeid(*job) )
312
314 || GetState() == State::Running, "TMOD",
315 "Error in DedicatedWorker::schedule: Job pushed while this thread was not started, yet. "
316 "State: ", GetState() )
317
318 pushAndRelease( {job, priority, keepJob} );
319 return *job;
320 }
321
322 /// Pushes a custom job into the priority queue.<br>
323 /// The job is returned to the caller to be able to await results.
324 /// It is the responsibility of the caller to pass the job to either method
325 /// #".DeleteJob" or #".DeleteJobDeferred" for disposal.
326 /// @tparam TJob The job type to create and schedule.
327 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
328 /// @param priority The priority of the job.
329 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
330 /// @return The scheduled job.
331 template<typename TJob, typename... TArgs>
332 TJob& Schedule( Priority priority, TArgs&&... args )
333 { return schedule<TJob, TArgs...>( priority, true, std::forward<TArgs>(args)... ); }
334
335 /// Pushes a custom job into the priority queue.
336 /// In contrast to the sibling method #".Schedule", the job is not returned by this method.
337 /// Instead, it is scheduled for automatic disposal after execution.
338 /// @tparam TJob The job type to create and schedule.
339 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
340 /// @param priority The priority of the job.
341 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
342 template<typename TJob, typename... TArgs>
343 void ScheduleVoid( Priority priority, TArgs&&... args )
344 { (void) schedule<TJob, TArgs...>( priority, false, std::forward<TArgs>(args)... ); }
345
346
347 //================================================================================================
348 // Triggered interface implementation
349 //================================================================================================
350 /// Return the sleep time, between two trigger events.
351 /// Precisely, this method is called after #".trigger" has been executed and defines the
352 /// next sleep time.
353 /// @return The desired sleep time between two trigger events.
354 virtual Ticks::Duration triggerPeriod() override { return triggerDuration; }
355
356 /// Schedules #"DedicatedWorker::JobTrigger" need to implement this function and
357 /// perform their trigger actions here.
358 virtual void trigger() override
359 { (void) schedule<JobTrigger>( Priority::Low, false ); }
360
361 //================================================================================================
362 // Protected virtual execution methods
363 //================================================================================================
364 /// This implementation of method #"Thread::Run;*" constitutes a very simple loop
365 /// that waits for jobs in the #".queue" and passes them to likewise virtual method #".process".
366 /// The (only) condition to continuing the loop is that the flag #".stopJobExecuted" is \c false.
367 /// This flag will be set by the internal job type #"DedicatedWorker::JobStop"
368 /// which is pushed with the method #".ScheduleStop".
370 virtual void Run() override;
371
372
373 /// This virtual method is called during thread execution (method #".Run") for each job
374 /// extracted from the internal job-queue.<br>
375 /// While this default-implementation is empty, internally, before a call to this method,
376 /// the following jobs are already detected and processed:
377 /// - #"DedicatedWorker::JobStop", which signals flag #".stopJobExecuted", and
378 /// this way lets calling method #".Run" leave its loop, and
379 /// - #"DedicatedWorker::JobDeleter", which deletes the given other job.
380 /// (See method #".DeleteJobDeferred".)
381 ///
382 /// A derived type may decide to call this parent method before or after checking for its own
383 /// jobs. If the call returns \c true, then the job was processed.
384 ///
385 /// It is also allowed to implement a custom processing, for example, with #"%JobStop" and thus
386 /// not to call this version for the internal jobs covered by the custom version.
387 ///
388 /// It is important that overridden versions do return the \c true if the job was processed
389 /// and \c false if not.
390 /// @param vjob The job to process.
391 /// @return \c true if the job was processed, \c false if not.
392 virtual bool process(Job& vjob) { (void)vjob; return false; }
393
394 public:
395 #if ALIB_DEBUG
396 /// The maximum number of jobs in the queue at any time.
397 /// Available only with debug-compilations.
399 #endif
400
401
402 //================================================================================================
403 // Construction/Destruction
404 //================================================================================================
405 /// Constructor taking a thread name that is passed to parent class #"Thread".
406 /// @param threadName The name of this thread.
407 DedicatedWorker(const character* threadName)
408 : Thread ( threadName )
409 #if ALIB_STRINGS
410 , Triggered ( threadName )
411 #endif
412 , TCondition ( ALIB_DBG(threadName) )
413 , manager { DWManager::GetSingleton() }
414 , statLastJobExecution(lang::Initialization::Nulled )
415 , length {0}
417
418 /// Destructor.
419 ~DedicatedWorker() override {
420 #if ALIB_STRINGS
421 ALIB_ASSERT_WARNING( Load() == 0, "TMOD",
422 "DedicatedWorker \"{}\" destructed, while job-queue is not empty.", GetName() )
423 #else
424 ALIB_ASSERT_WARNING( Load() == 0, "TMOD",
425 "DedicatedWorker destructed, while job-queue is not empty.")
426 #endif
427 }
428
429 using Thread::GetName;
430
431 //================================================================================================
432 // Load/Stop
433 //================================================================================================
434 /// Returns the current number of jobs in the queue.
435 /// @return The number of jobs to process, including any currently processed one.
436 int Load() const { return length; }
437
438 /// Returns the state of the internal flag, which is set with the invocation of #".ScheduleStop".
439 /// @return \c true, if the #".ScheduleStop" was called, \c false otherwise.
441
442 /// Returns the state of the internal flag, which is set with the execution of #".ScheduleStop".
443 /// @return \c true, if the #".ScheduleStop" was processed, \c false otherwise.
445
446
447 //================================================================================================
448 // Job Interface
449 //================================================================================================
450 /// Schedules a stop job into this thread's job queue.
451 /// When this job is processed from the queue, this thread will exit.
452 /// @param priority The priority of the job. Use #"%Lowest" if it is assured that no
453 /// other jobs will be pushed.
454 void ScheduleStop(Priority priority)
455 {
456 stopJobPushed= true;
457 (void) schedule<JobStop>( priority, false );
458 }
459
460 /// Deletes a data object previously received via method #".schedule", one of its siblings, or
461 /// scheduling methods of derived types.
462 ///
463 /// The latter might have names that do not contain the term "schedule" but still internally
464 /// create and return data objects. Any object returned needs to be deleted.
465 ///
466 /// \attention
467 /// Deletion of data objects must not be done by the caller of a schedule-method before
468 /// the job is processed by this #"%DedicatedWorker".
469 /// For example, if a derived type's interface returns an job instance of type
470 /// #"JPromise", then such a result must only be deleted once the promise is
471 /// fulfilled.
472 /// <p>
473 /// This can be cumbersome in case the calling thread is just not interested in
474 /// the result. For this case, the alternative method # DeleteJobDeferred is given.
475 ///
476 /// @param job The job returned when scheduling a command.
477 void DeleteJob(Job& job) {
479 auto size= job.SizeOf();
480 job.~Job();
481 manager.GetPoolAllocator().free(&job, size);
482 }
483
484 /// Same as #".DeleteJob" but schedules the deletion to be performed. Scheduling is
485 /// done with #"Priority::DeferredDeletion".<br>
486 /// This assures that the job deletion is performed \b after the job execution.
487 ///
488 /// This method is useful when the thread that schedules a job with this #"%DedicatedWorker" is
489 /// not interested in the result, i.e., does not perform an asynchronous wait.
490 /// Custom jobs where this is rather usually the case, should be exposed in two versions,
491 /// one that returns a result and another that does not.
492 /// With the second, the derived #"%DedicatedWorker" would delete the given shared data
493 /// with processing the job.
494 /// If this is offered, then this method does not need to be called (and cannot be called
495 /// because the caller does not receive any data).
496 /// @param job The job object to delete.
498 { (void) schedule<JobDeleter>( Priority::DeferredDeletion, false, &job ); }
499
500}; // class DedicatedWorker
501
502
503} // namespace alib[::threadmodel]
504
505/// Type alias in namespace #"%alib".
507
508/// Type alias in namespace #"%alib".
510
511} // namespace [alib]
512
513#if ALIB_ENUMRECORDS
515#endif
#define ALIB_DLL
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_EXPORT
#define ALIB_DBG(...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
#define ALIB_STRINGS
#define ALIB_LOCK_WITH(lock)
ListMA< DedicatedWorker * > workers
The list of workers.
MonoAllocator ma
Mono allocator. Used for commands and by DedicatedWorkers.
void RemoveAll(Priority stopPriority=Priority::Lowest)
void Add(DedicatedWorker &thread)
bool Remove(DedicatedWorker &thread, Priority stopPriority=Priority::Lowest)
PoolAllocator pool
Pool allocator. Used for command objects.
bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
virtual Ticks::Duration triggerPeriod() override
TJob & schedule(Priority priority, bool keepJob, TArgs &&... args)
bool stopJobPushed
Flag which is set when the stop-job was scheduled.
void pushAndRelease(QueueElement &&jobInfo)
DWManager & manager
Reference to #"%DWManager" instance.
int length
The current number of jobs in the queue.
void ScheduleVoid(Priority priority, TArgs &&... args)
TJob & Schedule(Priority priority, TArgs &&... args)
bool stopJobExecuted
Flag which is set when the stop-job was executed.
DedicatedWorker(const character *threadName)
virtual const character * GetName() const
Definition thread.hpp:237
Triggered(const String &pName)
Definition trigger.hpp:46
@ Running
The thread's #".Run" method is currently processed.
Definition thread.hpp:138
@ Started
Method #".Start" was invoked but not running, yet.
Definition thread.hpp:137
Thread(const character *pName=A_CHAR(""))
Definition thread.hpp:178
virtual const character * GetName() const
Definition thread.hpp:237
#define ALIB_ENUMS_ASSIGN_RECORD(TEnum, TRecord)
Priority
Possible priorities of jobs assigned to an #"DedicatedWorker".
Definition jobs.hpp:166
Definition alox.cpp:14
monomem::TMonoAllocator< lang::HeapAllocator > MonoAllocator
threads::Lock Lock
Type alias in namespace #"%alib".
Definition lock.hpp:124
variables::Priority Priority
Type alias in namespace #"%alib".
threads::Thread Thread
Type alias in namespace #"%alib".
Definition thread.hpp:387
containers::List< T, MonoAllocator, TRecycling > ListMA
Type alias in namespace #"%alib".
Definition list.hpp:689
monomem::TPoolAllocator< MonoAllocator > PoolAllocator
time::Ticks Ticks
Type alias in namespace #"%alib".
Definition ticks.hpp:86
threadmodel::DWManager DWManager
Type alias in namespace #"%alib".
containers::List< T, HeapAllocator, TRecycling > List
Type alias in namespace #"%alib".
Definition list.hpp:684
characters::character character
Type alias in namespace #"%alib".
threadmodel::DedicatedWorker DedicatedWorker
Type alias in namespace #"%alib".
Job * job
The job containing the pool-allocated shared data.
virtual size_t SizeOf()
Definition jobs.hpp:94
Job(const std::type_info &id)
Definition jobs.hpp:62
virtual ~Job()=default
Protected destructor.
TCondition(const character *dbgName)