ALib C++ Framework
by
Library Version: 2605 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
threadpool.hpp
Go to the documentation of this file.
1//==================================================================================================
2/// \file
3/// This header-file is part of module \alib_threadmodel of the \aliblong.
4///
5/// Copyright 2013-2026 A-Worx GmbH, Germany.
6/// Published under #"mainpage_license".
7//==================================================================================================
8ALIB_EXPORT namespace alib {
9
10/// This is the namespace of \alibmod <b>"ThreadModel"</b>. Please refer to the
11/// #"alib_mod_threadmodel;Programmer's Manual" of this module for information
12/// about how the types found in this namespace are used.
13namespace threadmodel {
14
15//==================================================================================================
16/// Instances of this thread-class are created by #"ThreadPool"s.
17/// Applications may extend this class and override the method #"PrepareJob".
18//==================================================================================================
19struct PoolWorker : protected Thread {
20 #if !DOXYGEN
21 friend class ThreadPool;
22 #endif
23 ThreadPool& threadPool; ///< The pool that this instance belongs to.
24
25#if ALIB_STRINGS
26 String64 nameBuffer; ///< Buffer to store the thread name given with construction.
27
28 /// Constructor.
29 /// @param pThreadPool The pool that constructed us.
30 /// @param threadName The name of this thread (for logging and diagnosis)
31 PoolWorker(ThreadPool& pThreadPool, const character* threadName )
32 : Thread(threadName)
33 , threadPool(pThreadPool)
34 , nameBuffer(threadName)
35 { SetName(nameBuffer); } // fix the name to local pointer
36#else
38 : Thread(A_CHAR("Poolworker"))
39 , threadPool(ptp)
40 {}
41#endif
42
43 /// This default implementation does nothing. Derived types may dynamically cast the
44 /// parameter \p{job} to a known custom type and, for example, attach custom data found in
45 /// a derived version of this class.
46 ///@param job The job to prepare.
47 virtual void PrepareJob(Job* job) { (void) job; }
48
49 /// Implementation of #"Thread::Run;*". Retrieves jobs from the pool, invokes
50 /// #"PrepareJob", and then calls #"Job::Do;*".
51 void Run() override;
52
53}; // class PoolWorker
54
55//==================================================================================================
56/// This class provides a configurable and scalable thread pooling mechanism for managing
57/// concurrent job execution.
58/// It supports fix-sized or dynamically resizing thread pools to balance workload and optimize
59/// resource usage, along with several key methods to schedule, manage, and monitor job execution.
60///
61/// ### Key Features
62/// 1. <b>Thread Pool Management</b>:<br>
63/// - Supports fixed-size and dynamically resizable thread pools.
64/// - Dynamic resizing is handled through customizable parameters found in the struct
65/// #"ThreadPool::ResizeStrategy".
66/// - Automatically adjusts the number of threads based on workload.
67///
68/// 2. <b>Jobs</b>:<br>
69/// The worker-threads managed by the pool are processing #"Job" objects,
70/// which<br>
71/// a) Carry the data needed for processing, and<br>
72/// b) Optionally provide synchronization mechanics that allow a caller to wait until
73/// a job was processed or periodically test for completion.
74///
75/// Allocation and life-cycle management of jobs is efficiently implemented leveraging
76/// the mechanics provided by the module \alib_monomem.
77///
78/// 3. <b>Synchronization and Job-Cleanup</b>:<br>
79/// Provides #"ThreadPool::Sync;a method to synchronize" all worker threads
80/// to ensure that processing continues only after a certain set of jobs has been executed.
81/// <br><br>
82///
83/// 4. <b>Worker Management and Monitoring</b>:<br>
84/// - Supports querying idle worker status and actively running threads.
85/// - Debugging options to analyze job types and execution states.
86///
87/// @see Chapter #"alib_thrmod_threadpool" of this module's Programmer's Manual
88/// provides a quick source code sample that demonstrates the use this class.
89//==================================================================================================
90class ThreadPool : protected TCondition<ThreadPool>
91#if ALIB_DEBUG_CRITICAL_SECTIONS
93#endif
94
95{
96 #if !DOXYGEN
97 friend struct PoolWorker;
98 friend struct threads::TCondition<ThreadPool>;
99 #endif
100
101 public:
102 /// The set of management parameters that determine how a thread pool balances the number
103 /// of worker threads.
105 #if !DOXYGEN
106 friend class ThreadPool;
107 #endif
108
109 /// The modes, fixed or automatic.
110 enum class Modes {
111 /// The number of threads is fixed.
113
114 /// The number of threads is increased when the load increases and
115 /// decreased when the load decreases.
117 };
118
119 /// The mode of operation.
121
122 /// If #".Mode" equals #"%Modes::Fixed", this is used and all other parameters are ignored.
123 /// If #".Mode" equals #"%Modes::Auto", this field gives the maximum number of workers that
124 /// are run in parallel.<br>
126
127 /// The minimum number of threads to keep alive.
128 int WorkersMin = 0;
129
130 /// A threshold in percent that determines at which overload factor the number of threads
131 /// is increased.
132 /// Defaults to \c 300%. For example, this means if the pool currently holds 10 threads,
133 /// then new threads are created when the load increases to \c 30 unprocess jobs.
135
136 /// A threshold in percent that determines at which underload factor the number of threads
137 /// are decreased.
138 /// Defaults to \c 50%.
139 /// For example, \c 30% this means that if 70% of the threads are idle the number of
140 /// threads is decreased.
142
143 /// The duration of that the pool has to be overloaded before an increase of threads
144 /// starts.
145 /// Defaults to zero time interval.
146 Ticks::Duration IncreaseSchedule = Ticks::Duration::FromAbsoluteMilliseconds(0);
147
148 /// The duration of that the pool has to be underloaded before a decrease of threads starts.
149 /// Defaults to 500ms.
150 Ticks::Duration DecreaseSchedule = Ticks::Duration::FromAbsoluteMilliseconds(500);
151
152 protected:
153
154 /// Calculates the target size, depending on the parameters set in this struct and
155 /// the actual values passed.
156 /// @param currentWorkers The number of threads currently in the pool.
157 /// @param idleWorkers The size of the subset of threads currently idle.
158 /// @param load The number of jobs currently queued.
159 /// @param lastChangeTime Time point of last increase or decrease.
160 /// @return The new target size.
161 inline
162 int GetSize( int currentWorkers , int idleWorkers, int load,
163 Ticks& lastChangeTime ) {
164 int target= currentWorkers;
165
166 // fixed mode? -> nothing to do
167 if (Mode == Modes::Fixed)
168 { target= WorkersMax; }
169
170 // check bounds
171 else if( target < WorkersMin ) { target= WorkersMin; }
172 else if( target > WorkersMax ) { target= WorkersMax; }
173
174 // increase?
175 else if( lastChangeTime.Age() >= IncreaseSchedule
176 && (load >= (currentWorkers * IncreaseThreshold / 100) ) )
177 { target = (std::min)(WorkersMax, currentWorkers + 1); }
178
179 // decrease?
180 else if( lastChangeTime.Age() >= DecreaseSchedule
181 && (currentWorkers - idleWorkers) <= (currentWorkers * DecreaseThreshold / 100) ) {
182 target = (std::max)(WorkersMin, currentWorkers - 1);
183 if ( target == 0 && load > 0)
184 target= 1;
185 }
186
187 // that's it
188 if (target != currentWorkers)
189 lastChangeTime.Reset();
190 return target;
191 }
192 };
193
194
195 protected:
196 /// Mono allocator. Used for jobs and by PoolWorkers.
198
199 /// Pool allocator. Used for job objects.
201
202 /// The list of worker threads.
204
205 /// The counted number of currently of workers.
207
208 /// The counted number of currently idle workers.
209 int ctdIdle =0;
210
211 /// The point in time of the last change of thread size.
213
214 /// A number that is increased with the creation of new workers and added to their
215 /// #"Thread::GetName;thread name".
217
218 #if ALIB_DEBUG
219 /// Entry in the field #"DbgKnownJobs".
221 const std::type_info* TID; ///< The job type.
222 size_t JobSize; ///< The size of the job object.
223 size_t Usage; ///< Counter of scheduled jobs of this type.
224 };
225
226 /// Serves as template parameter \p{TValueDescriptor} of field #"DbgKnownJobs".
228 const std::type_info* >
229 {
230 /// Mandatory function to implement.
231 /// @param entry The table entry to extract the key from.
232 /// @return The key portion of the given \p{entry}.
233 const std::type_info* Key(DbgKnownJobsEntry& entry) const { return entry.TID; }
234 };
235
236 /// Table of known job types and their sizes.
239 #endif
240
241 /// Special synchronization job. Pushed with #"ThreadPool::Sync;2" and
242 /// #"DedicatedWorker::DeleteJobDeferred;2"
243 /// With the latter, the field #".JobToDelete" will be given, otherwise this is nulled.
244 struct JobSyncer : Job {
245 /// Optionally a job to be deleted.
247
248 /// Constructor.
249 /// @param job The job that is scheduled to be deleted.
251 : Job(typeid(JobSyncer))
252 , JobToDelete(job) {}
253
254 /// Overrides the parent function as necessary.
255 /// @return The sizeof this derived type.
256 virtual size_t SizeOf() override { return sizeof(JobSyncer); }
257 };
258
259 //================================================================================================
260 // The queue
261 //================================================================================================
262 /// Container element of the queue.
263 struct QueueEntry {
264 Job* job; ///< The job.
265 bool keep; ///< If true, the job is not deleted by the processing worker,
266 ///< but has to be deleted by the caller.
267 };
268
269 /// The queue of jobs.
271
272
273 /// The number of Jobs that have been scheduled during the lifetime of this instance.
275
276 /// The number of jobs currently in the queue.
277 int ctdOpenJobs {0};
278
279 /// Mandatory method needed and invoked by templated base type #"TCondition".
280 /// @return \c true if the field #".queue" is not empty and either no sync-job is next or
281 /// all are idle.
283 return queue.IsNotEmpty()
284 && ( queue.back().job->ID != typeid(JobSyncer)
285 || ctdIdle == ctdWorkers );
286 }
287
288 /// Pushes the given \p{cmd} into the priority queue that this class implements.
289 /// @param entry The Job and the deletion flag.
291 // insert before found
292 queue.emplace_front( entry );
293 ++ctdOpenJobs;
295
296 ALIB_MESSAGE( "TMOD/QUEUE", "{} Job({}) pushed",
297 this, &entry.job->ID )
298
300 }
301
302 /// Set if the last thread is terminated and #".ctdWorkers" goes to \c 0.
303 /// This thread is joined by #".Shutdown" or when a new thread is added.
305
306 /// Moves the job of highest priority out of the queue.
307 /// Blocks the thread until a job is available.
308 /// @param worker The instance that called this method.
309 /// @return The job with the highest priority.
311
312 /// Internal method that adds a thread. Must only be called when acquired.
314 void addThread();
315
316 /// Implementation of #".Schedule" and #".ScheduleVoid".
317 /// @tparam TJob The job type to create and schedule.
318 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
319 /// @param keepJob Denotes whether the job should be deleted after execution or not.
320 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
321 /// @return The scheduled job.
322 template<typename TJob, typename... TArgs>
323 [[nodiscard]]
324 TJob* schedule( bool keepJob, TArgs&&... args ) {
326 // first check if this pool is active (has threads)
327 if (ctdWorkers == 0) {
328 ALIB_ASSERT_ERROR( Strategy.WorkersMax > 0
330 || Strategy.WorkersMax > 0 ), "TMOD/STRGY",
331 "No threads to schedule job. Strategy values:\n"
332 " WorkersMax: {}\n"
333 " Strategy.Mode: ",
334 Strategy.WorkersMax ,
335 Strategy.Mode == ResizeStrategy::Modes::Auto ? "Auto" : "Fixed" )
336
337 addThread();
338 }
339 TJob* job= pool().New<TJob>( std::forward<TArgs>(args)... );
340 ALIB_ASSERT_ERROR( job->SizeOf()==sizeof(TJob), "TMOD",
341 "{} error in schedule: Job size mismatch. Expected {} "
342 "while virtual method SizeOf returns {}.\n"
343 "Override this method for job-type <{}>", this, sizeof(TJob), job->SizeOf(), &typeid(*job) )
344
345 ALIB_ASSERT_ERROR( Strategy.WorkersMax > 0, "TMOD",
346 "{} error: Job pushed while this pool is shut down already. "
347 "(Strategy.WorkersMax == 0) ", this )
348
349 #if ALIB_DEBUG
350 auto pair= DbgKnownJobs.EmplaceIfNotExistent( DbgKnownJobsEntry{ &typeid(TJob),
351 sizeof(TJob), 0 } );
352 ++pair.first.Value().Usage;
353 #endif
354
355 pushAndRelease( {job, keepJob} );
356 return job;
357 }
358
359 public:
360 /// The parameters used for scaling the amount of worker threads.
361 /// The values herein can be changed from outside with direct access.
363
364 /// The wait-time slice used by method #".WaitForAllIdle".
365 Ticks::Duration IdleWaitTime = Ticks::Duration::FromAbsoluteMicroseconds(50);
366
367 /// Constructor.
368 /// Initializes the thread pool with default settings for field #".Strategy".
370
371 #if ALIB_DEBUG_CRITICAL_SECTIONS
372 /// Destructor. Cleans up and shuts down the thread pool.
374
375 /// @return \c true if the lock is acquired (in non-shared mode), \c false otherwise.
376 ALIB_DLL bool DCSIsAcquired() const override;
377
378 /// @return \c true if the lock is shared-acquired (by at least any thread).
379 /// Otherwise, returns \c false.
380 ALIB_DLL bool DCSIsSharedAcquired() const override;
381 #else
382 virtual
384 #endif
385
388
389 /// Returns the mono allocator used by the thread pool.
390 /// The pool has to be acquired before using it.
391 /// @return The mono allocator.
393
394 /// Returns the pool allocator used by the thread pool.
395 /// The pool has to be acquired before using it.
396 /// @return The pool allocator.
398
399 /// Creates a worker using the #".pool" allocator.
400 /// Derived types may override this method and create a derived #"PoolWorker"-type.
401 /// @return A pool worker.
403
404 /// Destructs the given pool worker.
405 /// Derived types may add further logic here.
406 /// @param poolWorker The pool worker to destruct and delete.
407 virtual void DisposeWorker( PoolWorker* poolWorker );
408
409 /// Just an alias to
410 /// \https{Empty Base Optimization,en.cppreference.com/w/cpp/thread/thread/hardware_concurrency}.
411 ///
412 /// While the specification says
413 /// <em>"If the value is not well-defined or not computable, returns \c 0"</em>,
414 /// this method returns \c 1 in this case.
415 ///
416 /// Used as the default value for constructor parameter \p{pWorkersMax}.
417 /// @return Returns the maximum number of threads that can be expected to run concurrently.
418 static int HardwareConcurrency() noexcept
419 { return int(std::thread::hardware_concurrency()); }
420
421 /// Returns the current number of worker threads.
422 /// @return The number of jobs to process, including any currently processed one.
423 int CountedWorkers() const { return ctdWorkers; }
424
425 /// Returns the current number of idle workers.
426 /// @return The number of workers waiting on jobs to process.
427 int CountedIdleWorkers() const { return ctdIdle; }
428
429 /// Checks if all workers are idle.
430 /// @return \c true if the number of idle workers equals the number of workers,
431 /// \c false otherwise.
432 bool IsIdle() const { return ctdIdle == ctdWorkers; }
433
434 /// Pushes a job of the custom type \p{TJob} into the priority queue.<br>
435 /// The job is returned to the caller to be able to await results.
436 /// It is the responsibility of the caller to pass the job to either method
437 /// #".DeleteJob" or #".DeleteJobDeferred" for disposal.
438 /// Note that the latter causes a #".Sync" on this pool, while with use of the former,
439 /// the fulfilment of the returned job object has to be awaited first.
440 /// @tparam TJob The job type to create and schedule.
441 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
442 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
443 /// @return A reference to the job object for the caller to await results.
444 template<typename TJob, typename... TArgs>
445 [[nodiscard]]
446 TJob& Schedule( TArgs&&... args )
447 { return *schedule<TJob, TArgs...>( true, std::forward<TArgs>(args)... ); }
448
449 /// Pushes a job of the custom type \p{TJob} into the priority queue.
450 /// In contrast to the sibling method #".Schedule", the job is not returned by this method.
451 /// Instead, it is scheduled for automatic disposal after execution.
452 /// @tparam TJob The job type to create and schedule.
453 /// @tparam TArgs Types of the variadic arguments \p{args} that construct \p{TJob}.
454 /// @param args Variadic arguments forwarded to the constructor of \p{TJob}.
455 template<typename TJob, typename... TArgs>
456 void ScheduleVoid( TArgs&&... args )
457 { (void) schedule<TJob, TArgs...>( false, std::forward<TArgs>(args)... ); }
458
459 /// Deletes a job object previously scheduled with #".Schedule".
460 ///
461 /// \attention
462 /// The caller must not delete received job instances before they are processed.
463 ///
464 /// \attention
465 /// In case a caller does not want to wait longer, the method #".DeleteJobDeferred" is to be used,
466 /// which causes a #".Sync" on this pool.
467 /// Therefore, it is preferable to either wait on the job and use this method for deletion,
468 /// or to use the method #".ScheduleVoid" instead of #".Schedule" to not even get involved
469 /// with job-deletion.
470 ///
471 /// @param job The job returned from method #".Schedule".
472 void DeleteJob(Job& job) {
474 auto size= job.SizeOf();
475 job.~Job();
476 pool.free(&job, size);
477 }
478
479 /// Same as #".DeleteJob" but schedules the deletion to be performed.
480 /// This method is useful when a job instance was received with the method #".Schedule", but the
481 /// caller does not want to continue waiting for the execution of the job.<br>
482 /// If jobs indicate that they have been processed, then the method #".DeleteJob" is to be used.
483 ///
484 /// \attention Calling this method schedules a #".Sync".
485 /// Therefore, the use of this method should be avoided.
486 ///
487 /// @see Methods #".ScheduleVoid", #".DeleteJob" and #".Sync".
488 /// @param job The job object to delete.
489 void DeleteJobDeferred(Job& job) { (void) schedule<JobSyncer>(false, &job); }
490
491 /// This method ensures all worker threads in the thread pool complete their currently running
492 /// jobs and also process all jobs that have been scheduled before a call to this method.
493 /// This forces synchronization such that no new jobs are processed until the
494 /// synchronization request is fulfilled.<br>
495 /// It is particularly useful for scenarios requiring a consistent state or ensuring all pending
496 /// asynchronous jobs are complete before proceeding.
497 ///
498 /// Consequently, a call to this method may inherently involve blocking the execution in the
499 /// pool until all prior tasks are finalized. While it is designed to work efficiently with the
500 /// thread pool mechanism, unnecessary or frequent calls to this method impose a performance
501 /// disadvantage.
502 ///
503 /// Invoking this method schedules #"JobSyncer;a special synchronization job" in the queue of
504 /// this #"%ThreadPool".
505 /// Thus, the method is non-blocking and instantly returns.
506 ///
507 /// \par Necessity for Synchronization Explained with a Sample:
508 /// The requirement for synchronization is best illustrated with a practical scenario.
509 /// Consider a case where the main thread is responsible for scanning the filesystem.
510 /// For each file that meets certain criteria, a job is scheduled in the #"%ThreadPool"
511 /// to read the content of that file.
512 /// At this stage, the main thread is focused solely on scheduling file-reading jobs and
513 /// isn't directly processing the files.
514 ///
515 /// \par
516 /// Now, before the application proceeds to schedule further jobs, such as processing,
517 /// analyzing, or aggregating the file data, it is crucial to ensure that all file-reading
518 /// jobs have completed.
519 /// Without a synchronization mechanism, there is a risk that some worker threads are still
520 /// reading files while other threads - already assigned to the dependent processing tasks —
521 /// begin before the file data is available.
522 ///
523 /// \par
524 /// A call to this method resolves this issue by ensuring that all file-reading jobs are
525 /// completed before any subsequent jobs that rely on their output are scheduled or processed.
526 /// This guarantees consistency, prevents race conditions, and ensures that no dependent
527 /// thread gets an incomplete or inconsistent dataset to work with.
528 /// In summary, synchronization acts as a safeguard in parallelized workflows where the
529 /// logical order of operations must be maintained across multiple threads, particularly
530 /// when tasks are interdependent.
531 ///
532 /// @see Method #DeleteJobDeferred, which likewise causes a synchronization.
533 void Sync() { (void) schedule<JobSyncer>(false, nullptr); }
534
535 #if DOXYGEN
536 /// Waits until all threads are idle.
537 /// @param timeout The maximum time to wait.
538 /// @param dbgWarnAfter The time after which a warning message will be printed to the
539 /// debug log if the timeout was reached.<br>
540 /// This parameter is only available in debug-compilations and thus
541 /// should be passed using macro #"ALIB_DBG".
542 /// @return \c true if all threads are idle, \c false otherwise.
544 bool WaitForAllIdle( Ticks::Duration timeout,
545 Ticks::Duration dbgWarnAfter );
546 #else
547 ALIB_DLL bool WaitForAllIdle( Ticks::Duration timeout
548 ALIB_DBG(, Ticks::Duration dbgWarnAfter) );
549 bool WaitForAllIdle( Ticks::Duration::TDuration timeout
550 ALIB_DBG(, Ticks::Duration::TDuration dbgWarnAfter) )
551 { return WaitForAllIdle( Ticks::Duration(timeout) ALIB_DBG(,Ticks::Duration(dbgWarnAfter))); }
552
553 #endif
554
555 /// Removes all threads.
556 /// While this method waits that all jobs are finalized just as the method #WaitForAllIdle does,
557 /// it is recommended to #WaitForAllIdle explicitly, before this method is called.
558 /// This allows a more graceful shutdown with the possibility to take action on timeouts.
559 ///
560 /// If after the call to #"%.WaitForAllIdle" no jobs were scheduled, this method is \b not #
561 /// supposed to block.
563 void Shutdown();
564
565 /// Returns the current number of jobs in the queue.
566 /// \note To get the overall number of unprocessed jobs, the difference between
567 /// #CountedWorkers and #CountedIdleWorkers has to be added.
568 /// However, under racing conditions, this difference might evaluated wrongly.
569 /// Therefore, if crucial, this pool has to be acquired before determining this.
570 /// @return The number of jobs to process, not including any currently processed one.
571 int CountedOpenJobs() const { return ctdOpenJobs; }
572
573 /// Returns the number of Jobs that have been scheduled during the lifetime of this instance.
574 /// This is a statistics method.
575 /// @return The number of jobs to process, including any currently processed one.
577
578 #if ALIB_DEBUG && ALIB_STRINGS
579 /// Writes the list of known jobs and their object sizes to the given target.
580 /// \par Availability
581 /// This function is available only with debug-compilations and if the module
582 /// \alib_strings is included in the\alibbuild.
583 /// @see Field DbgDumpKnownJobs.
584 ///
585 /// @param target The string to write to.
586 /// @param linePrefix A prefix string to each line. Defaults to two spaces.
587 /// @return The number of known jobs.
589 int DbgDumpKnownJobs(NAString& target, const NString& linePrefix= " " );
590 #endif
591
592}; // class ThreadPool
593
594} // namespace alib[::threadmodel]
595
596/// Type alias in namespace #"%alib".
598
599/// Type alias in namespace #"%alib".
601
602} // namespace [alib]
603
604//##################################################################################################
605// struct AppendableTraits<ThreadPool>
606//##################################################################################################
607
608#if ALIB_STRINGS
609// Faking all template specializations of namespace strings for doxygen into namespace
610// strings::APPENDABLES to keep the documentation of namespace string clean!
611namespace alib::strings {
612#if DOXYGEN
613namespace APPENDABLES {
614#endif
615
616/// Specialization of functor #"AppendableTraits" for type #"ThreadPool".
617template<>
619 /// Writes some information about the given #"%ThreadPool".
620 /// @param target The #"%NAString" that #"%Append(const TAppendable&)" was invoked on.
621 /// @param tpool The thread pool instance.
622 void operator()( AString& target, const alib::threadmodel::ThreadPool& tpool );
623};
624
625#if DOXYGEN
626} // namespace alib::strings[APPENDABLES]
627#endif
628} // namespace [alib::strings]
629#endif
#define ALIB_MESSAGE(domain,...)
#define ALIB_DLL
#define A_CHAR(STR)
#define ALIB_EXPORT
#define ALIB_LOCK
#define ALIB_DBG(...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
bool DCSIsSharedAcquired() const override
int ctdWorkers
The counted number of currently of workers.
Ticks::Duration IdleWaitTime
The wait-time slice used by method #".WaitForAllIdle".
int ctdIdle
The counted number of currently idle workers.
PoolAllocator pool
Pool allocator. Used for job objects.
static int HardwareConcurrency() noexcept
void pushAndRelease(QueueEntry &&entry)
uinteger StatsCountedScheduledJobs() const
void Acquire(ALIB_DBG_TAKE_CI)
MonoAllocator & GetAllocator()
int DbgDumpKnownJobs(NAString &target, const NString &linePrefix=" ")
virtual void DisposeWorker(PoolWorker *poolWorker)
TJob & Schedule(TArgs &&... args)
HashTable< MonoAllocator, DbgKnownJobsVD > DbgKnownJobs
Table of known job types and their sizes.
int ctdOpenJobs
The number of jobs currently in the queue.
ListPA< QueueEntry, Recycling::None > queue
The queue of jobs.
QueueEntry pop(PoolWorker *worker)
bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
TJob * schedule(bool keepJob, TArgs &&... args)
void addThread()
Internal method that adds a thread. Must only be called when acquired.
PoolAllocator & GetPoolAllocator()
~ThreadPool() override
Destructor. Cleans up and shuts down the thread pool.
HashSet< MonoAllocator, PoolWorker * > workers
The list of worker threads.
bool DCSIsAcquired() const override
virtual PoolWorker * CreateWorker()
uinteger ctdStatJobsScheduled
The number of Jobs that have been scheduled during the lifetime of this instance.
MonoAllocator ma
Mono allocator. Used for jobs and by PoolWorkers.
Ticks timeOfLastSizeChange
The point in time of the last change of thread size.
void ScheduleVoid(TArgs &&... args)
Thread(const character *pName=A_CHAR(""))
Definition thread.hpp:178
virtual void SetName(const character *newName)
Definition thread.hpp:243
Definition alox.cpp:14
monomem::TMonoAllocator< lang::HeapAllocator > MonoAllocator
strings::TString< nchar > NString
Type alias in namespace #"%alib".
Definition string.hpp:2174
containers::HashSet< TAllocator, T, THash, TEqual, THashCaching, TRecycling > HashSet
Type alias in namespace #"%alib". See type definition #"alib::containers::HashSet".
threads::Thread Thread
Type alias in namespace #"%alib".
Definition thread.hpp:387
containers::HashTable< TAllocator, TValueDescriptor, THash, TEqual, THashCaching, TRecycling > HashTable
Type alias in namespace #"%alib". See type definition #"alib::containers::HashSet".
LocalString< 64 > String64
Type alias name for #"TLocalString;TLocalString<character,64>".
strings::TAString< nchar, lang::HeapAllocator > NAString
Type alias in namespace #"%alib".
monomem::TPoolAllocator< MonoAllocator > PoolAllocator
containers::List< T, PoolAllocator, TRecycling > ListPA
Type alias in namespace #"%alib".
Definition list.hpp:693
time::Ticks Ticks
Type alias in namespace #"%alib".
Definition ticks.hpp:86
threadmodel::ThreadPool ThreadPool
Type alias in namespace #"%alib".
strings::TAString< character, lang::HeapAllocator > AString
Type alias in namespace #"%alib".
characters::character character
Type alias in namespace #"%alib".
lang::uinteger uinteger
Type alias in namespace #"%alib".
Definition integers.hpp:152
threadmodel::PoolWorker PoolWorker
Type alias in namespace #"%alib".
virtual size_t SizeOf()
Definition jobs.hpp:94
Job(const std::type_info &id)
Definition jobs.hpp:62
virtual ~Job()=default
Protected destructor.
virtual void PrepareJob(Job *job)
ThreadPool & threadPool
The pool that this instance belongs to.
String64 nameBuffer
Buffer to store the thread name given with construction.
PoolWorker(ThreadPool &pThreadPool, const character *threadName)
Entry in the field #"DbgKnownJobs".
size_t Usage
Counter of scheduled jobs of this type.
size_t JobSize
The size of the job object.
const std::type_info * TID
The job type.
Serves as template parameter TValueDescriptor of field #"DbgKnownJobs".
const std::type_info * Key(DbgKnownJobsEntry &entry) const
Job * JobToDelete
Optionally a job to be deleted.
Container element of the queue.
int WorkersMin
The minimum number of threads to keep alive.
int GetSize(int currentWorkers, int idleWorkers, int load, Ticks &lastChangeTime)
TCondition(const character *dbgName)
void Release(ALIB_DBG_TAKE_CI)
void Acquire(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)