ALib C++ Library
Library Version: 2510 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
alib::threadmodel::ThreadPool Class Reference

Description:

Attention
This class belongs to module ALib ThreadModel, which is not in a stable and consistent state, yet. Also this type is considered experimental.

This class provides a configurable and scalable thread pooling mechanism for managing concurrent job execution. It supports fixed or dynamically resizing thread pools to balance workload and optimize resource usage, along with several key methods to schedule, manage, and monitor job execution.

Key Features

  1. Thread Pool Management:
    • Supports fixed-size and dynamically resizable thread pools.
    • Dynamic resizing is handled through customizable parameters found in the struct ResizeStrategy.
    • Automatically adjusts the number of threads based on workload.
  2. Jobs:
    The worker-threads managed by the pool are processing Job objects, which
    a) Carry the data needed for processing, and
    b) Optionally provide synchronization mechanics that allow a caller to wait until a job was processed or periodically test for completion.

    Allocation and life-cycle management of jobs is efficiently implemented leveraging the mechanics provided by the module ALib Monomem.

  3. Synchronization and Job-Cleanup:
    Provides a method to synchronize all worker threads to ensure that processing continues only after a certain set of jobs has been executed.

  4. Worker Management and Monitoring:
    • Supports querying idle worker status and actively running threads.
    • Debugging options to analyze job types and execution states.
See also
Chapter 2. Class ThreadPool of this module's Programmer's Manual provides a quick source code sample that demonstrates the use this class.

Definition at line 60 of file threadpool.inl.

Inheritance diagram for alib::threadmodel::ThreadPool:
[legend]
Collaboration diagram for alib::threadmodel::ThreadPool:
[legend]

Inner Type Index:

struct  DbgKnownJobsEntry
 Entry in the field DbgKnownJobs. More...
 
struct  DbgKnownJobsVD
 Serves as template parameter TValueDescriptor of field DbgKnownJobs. More...
 
struct  JobSyncer
 
struct  QueueEntry
 Container element of the queue. More...
 
struct  ResizeStrategy
 

Public Static Method Index:

static int HardwareConcurrency () noexcept
 

Public Field Index:

ResizeStrategy Strategy
 

Public Method Index:

ALIB_DLL ThreadPool ()
 
ALIB_DLL ~ThreadPool () override
 Destructor. Cleans up and shuts down the thread pool.
 
void Acquire (ALIB_DBG_TAKE_CI)
 
int CountedIdleWorkers ()
 
int CountedOpenJobs ()
 
int CountedWorkers ()
 
ALIB_DLL int DbgDumpKnownJobs (NAString &target, const NString &linePrefix=" ")
 
virtual ALIB_DLL bool DCSIsAcquired () const override
 
virtual ALIB_DLL bool DCSIsSharedAcquired () const override
 
void DeleteJob (Job &job)
 
void DeleteJobDeferred (Job &job)
 
MonoAllocatorGetAllocator ()
 
PoolAllocatorGetPoolAllocator ()
 
bool IsIdle ()
 
void Release (ALIB_DBG_TAKE_CI)
 
template<typename TJob, typename... TArgs>
TJob & Schedule (TArgs &&... args)
 
template<typename TJob, typename... TArgs>
void ScheduleVoid (TArgs &&... args)
 
ALIB_DLL void Shutdown ()
 
uinteger StatsCountedScheduledJobs ()
 
void Sync ()
 
ALIB_DLL bool WaitForAllIdle (Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
 
- Public Method Index: inherited from alib::lang::DbgCriticalSections::AssociatedLock
virtual ~AssociatedLock ()
 Virtual Destructor.
 

Protected Field Index:

int ctdIdle =0
 The counted number of currently idle workers.
 
int ctdOpenJobs {0}
 The number of jobs in the queue.
 
uinteger ctdStatJobsScheduled {0}
 The number of jobs in the queue.
 
int ctdWorkers =0
 The counted number of currently of workers.
 
HashTable< MonoAllocator, DbgKnownJobsVDDbgKnownJobs
 Table of known job types and their sizes.
 
PWorker * lastThreadToJoin = nullptr
 
MonoAllocator ma
 Mono allocator. Used for jobs and by PWorkers.
 
int nextWorkerID =0
 A number that is increased with the creation of new workers and added to their.
 
PoolAllocator pool
 Pool allocator. Used for job objects.
 
List< PoolAllocator, QueueEntry, Recycling::None > queue
 The queue of jobs.
 
Ticks timeOfLastSizeChange
 The point in time of the last change of thread size.
 
HashSet< MonoAllocator, PWorker * > workers
 The list of worker threads.
 
- Protected Field Index: inherited from alib::threads::TCondition< T >
std::condition_variable conditionVariable
 The condition variable used for blocking and notification.
 
std::condition_variable conditionVariable
 The condition variable used for blocking and notification.
 
DbgConditionAsserter Dbg
 The debug tool instance.
 
DbgConditionAsserter Dbg
 The debug tool instance.
 
std::mutex mutex
 The mutex used for locking this instance.
 
std::mutex mutex
 The mutex used for locking this instance.
 

Protected Method Index:

ALIB_DLL void addThread ()
 Internal method that adds a thread. Must only be called when acquired.
 
bool isConditionMet ()
 
QueueEntry pop (PWorker *worker)
 
void pushAndRelease (QueueEntry &&entry)
 
template<typename TJob, typename... TArgs>
TJob * schedule (bool keepJob, TArgs &&... args)
 
- Protected Method Index: inherited from alib::threads::TCondition< T >
 TCondition (const character *dbgName)
 
 TCondition (const character *dbgName)
 
void Acquire (ALIB_DBG_TAKE_CI)
 
void Acquire (ALIB_DBG_TAKE_CI)
 
T & cast ()
 
T & cast ()
 
void Release (ALIB_DBG_TAKE_CI)
 
void Release (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotify (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotify (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotifyAll (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotifyAll (ALIB_DBG_TAKE_CI)
 
void WaitForNotification (ALIB_DBG_TAKE_CI)
 
void WaitForNotification (ALIB_DBG_TAKE_CI)
 
void WaitForNotification (const Ticks &wakeUpTime, const CallerInfo &ci)
 
void WaitForNotification (const Ticks &wakeUpTime, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration &maxWaitTimeSpan, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration &maxWaitTimeSpan, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration::TDuration &maxWaitTimeSpan, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration::TDuration &maxWaitTimeSpan, const CallerInfo &ci)
 
- Protected Method Index: inherited from alib::lang::DbgCriticalSections::AssociatedLock
virtual ~AssociatedLock ()
 Virtual Destructor.
 

Friends And Related Entity Details:

◆ PWorker

friend struct PWorker
friend

Definition at line 66 of file threadpool.inl.

◆ threads::TCondition< ThreadPool >

friend struct threads::TCondition< ThreadPool >
friend

Definition at line 66 of file threadpool.inl.

Field Details:

◆ ctdIdle

int alib::threadmodel::ThreadPool::ctdIdle =0
protected

The counted number of currently idle workers.

Definition at line 177 of file threadpool.inl.

◆ ctdOpenJobs

int alib::threadmodel::ThreadPool::ctdOpenJobs {0}
protected

The number of jobs in the queue.

Definition at line 249 of file threadpool.inl.

◆ ctdStatJobsScheduled

uinteger alib::threadmodel::ThreadPool::ctdStatJobsScheduled {0}
protected

The number of jobs in the queue.

Definition at line 246 of file threadpool.inl.

◆ ctdWorkers

int alib::threadmodel::ThreadPool::ctdWorkers =0
protected

The counted number of currently of workers.

Definition at line 174 of file threadpool.inl.

◆ DbgKnownJobs

HashTable<MonoAllocator, DbgKnownJobsVD > alib::threadmodel::ThreadPool::DbgKnownJobs
protected

Table of known job types and their sizes.

Definition at line 207 of file threadpool.inl.

◆ lastThreadToJoin

PWorker* alib::threadmodel::ThreadPool::lastThreadToJoin = nullptr
protected

Set if the last thread is terminated and ctdWorkers goes to 0. This thread is joined by Shutdown or when a new thread is added.

Definition at line 275 of file threadpool.inl.

◆ ma

MonoAllocator alib::threadmodel::ThreadPool::ma
protected

Mono allocator. Used for jobs and by PWorkers.

Definition at line 165 of file threadpool.inl.

◆ nextWorkerID

int alib::threadmodel::ThreadPool::nextWorkerID =0
protected

A number that is increased with the creation of new workers and added to their.

Definition at line 184 of file threadpool.inl.

◆ pool

PoolAllocator alib::threadmodel::ThreadPool::pool
protected

Pool allocator. Used for job objects.

Definition at line 168 of file threadpool.inl.

◆ queue

List<PoolAllocator, QueueEntry, Recycling::None> alib::threadmodel::ThreadPool::queue
protected

The queue of jobs.

Definition at line 242 of file threadpool.inl.

◆ Strategy

ResizeStrategy alib::threadmodel::ThreadPool::Strategy

The parameters used for scaling the amount of worker threads. The values herein can be changed from outside with direct access.

Definition at line 336 of file threadpool.inl.

◆ timeOfLastSizeChange

Ticks alib::threadmodel::ThreadPool::timeOfLastSizeChange
protected

The point in time of the last change of thread size.

Definition at line 180 of file threadpool.inl.

◆ workers

HashSet<MonoAllocator, PWorker*> alib::threadmodel::ThreadPool::workers
protected

The list of worker threads.

Definition at line 171 of file threadpool.inl.

Constructor(s) / Destructor Details:

◆ ThreadPool()

ALIB_DLL alib::threadmodel::ThreadPool::ThreadPool ( )

Constructor. Initializes the thread pool with default settings for field Strategy.

Method Details:

◆ Acquire()

void alib::threads::TCondition< T >::Acquire ( ALIB_DBG_TAKE_CI )
inline

A thread which invokes this method gets registered as the current owner of this object, until the same thread releases the ownership invoking Release. In the case that this object is already owned by another thread, the invoking thread is suspended until ownership can be gained.

Multiple (nested) calls to this method are not supported and constitute undefined behavior. In debug-compilations, an error is raised.

An instance has to be acquired before invoking any of the notifiy- or wait-methods. When return from a notification method, the instance is released. With return from a wait method, the instance is still held.

Debug Parameter:
Pass macro ALIB_CALLER_PRUNED with invocations.

Definition at line 121 of file condition.inl.

◆ CountedIdleWorkers()

int alib::threadmodel::ThreadPool::CountedIdleWorkers ( )
inline

Returns the current number of idle workers.

Returns
The number of workers waiting on jobs to process.

Definition at line 387 of file threadpool.inl.

◆ CountedOpenJobs()

int alib::threadmodel::ThreadPool::CountedOpenJobs ( )
inline

Returns the current number of jobs in the queue.

Note
To get the overall number of unprocessed jobs, the difference between CountedWorkers and CountedIdleWorkers has to be added. However, under racing conditions, this difference might evaluated wrongly. Therefore, if crucial, this pool has to be acquired before determining this.
Returns
The number of jobs to process, not including any currently processed one.

Definition at line 533 of file threadpool.inl.

◆ CountedWorkers()

int alib::threadmodel::ThreadPool::CountedWorkers ( )
inline

Returns the current number of worker threads.

Returns
The number of jobs to process, including any currently processed one.

Definition at line 383 of file threadpool.inl.

◆ DbgDumpKnownJobs()

ALIB_DLL int alib::threadmodel::ThreadPool::DbgDumpKnownJobs ( NAString & target,
const NString & linePrefix = "  " )

Writes the list of known jobs and their object sizes to the given target.

Availability
This function is available only with debug-compilations and if the module ALib Strings is included in theALib Build.
See also
Field DbgDumpKnownJobs.
Parameters
targetThe string to write to.
linePrefixA prefix string to each line. Defaults to two spaces.
Returns
The number of known jobs.

◆ DCSIsAcquired()

virtual ALIB_DLL bool alib::threadmodel::ThreadPool::DCSIsAcquired ( ) const
overridevirtual
Returns
true if the lock is acquired (in non-shared mode), false otherwise.

Implements alib::lang::DbgCriticalSections::AssociatedLock.

◆ DCSIsSharedAcquired()

virtual ALIB_DLL bool alib::threadmodel::ThreadPool::DCSIsSharedAcquired ( ) const
overridevirtual
Returns
true if the lock is shared-acquired (by at least any thread). Otherwise, returns false.

Implements alib::lang::DbgCriticalSections::AssociatedLock.

Here is the call graph for this function:

◆ DeleteJob()

void alib::threadmodel::ThreadPool::DeleteJob ( Job & job)
inline

Deletes a job object previously scheduled with Schedule.

Attention
The caller must not delete received job instances before they are processed.
In case a caller does not want to wait longer, the method DeleteJobDeferred is to be used, which causes a Sync on this pool. Therefore, it is preferable to either wait on the job and use this method for deletion, or to use the method ScheduleVoid instead of Schedule to not even get involved with job-deletion.
Parameters
jobThe job returned from method Schedule.

Definition at line 432 of file threadpool.inl.

Here is the call graph for this function:

◆ DeleteJobDeferred()

void alib::threadmodel::ThreadPool::DeleteJobDeferred ( Job & job)
inline

Same as DeleteJob but schedules the deletion to be performed. This method is useful when a job instance was received with method Schedule, but the caller does not want to continue waiting for the execution of the job.
If jobs indicate that they have been processed, then the method DeleteJob is to be used.

Attention
Calling this method schedules a Sync. Therefore, the use of this method should be avoided.
See also
Methods ScheduleVoid, DeleteJob and Sync.
Parameters
jobThe job object to delete.

Definition at line 450 of file threadpool.inl.

Here is the call graph for this function:

◆ GetAllocator()

MonoAllocator & alib::threadmodel::ThreadPool::GetAllocator ( )
inline

Returns the mono allocator used by the thread pool. The pool has to be acquired before using it.

Returns
The mono allocator.

Definition at line 362 of file threadpool.inl.

◆ GetPoolAllocator()

PoolAllocator & alib::threadmodel::ThreadPool::GetPoolAllocator ( )
inline

Returns the pool allocator used by the thread pool. The pool has to be acquired before using it.

Returns
The pool allocator.

Definition at line 367 of file threadpool.inl.

◆ HardwareConcurrency()

int alib::threadmodel::ThreadPool::HardwareConcurrency ( )
inlinestaticnoexcept

Just an alias to Empty Base Optimization .

While the specification says "If the value is not well-defined or not computable, returns \c 0", this method returns 1 in this case.

Used as the default value for constructor parameter pWorkersMax.

Returns
Returns the maximum number of threads that can be expected to run concurrently.

Definition at line 378 of file threadpool.inl.

◆ isConditionMet()

bool alib::threadmodel::ThreadPool::isConditionMet ( )
inlineprotected

Mandatory method needed and invoked by templated base type TCondition.

Returns
true if field queue is not empty and either no sync-job is next or all are idle.

Definition at line 254 of file threadpool.inl.

◆ IsIdle()

bool alib::threadmodel::ThreadPool::IsIdle ( )
inline

Checks if all workers are idle.

Returns
true if the number of idle workers equals the number of workers, false otherwise.

Definition at line 392 of file threadpool.inl.

◆ pop()

QueueEntry alib::threadmodel::ThreadPool::pop ( PWorker * worker)
protected

Moves the job of highest priority out of the queue. Blocks the thread until a job is available.

Parameters
workerThe instance that called this method.
Returns
The job with the highest priority.

◆ pushAndRelease()

void alib::threadmodel::ThreadPool::pushAndRelease ( QueueEntry && entry)
inlineprotected

Pushes the given cmd into the priority queue that this class implements.

Parameters
entryThe Job and the deletion flag.

Definition at line 260 of file threadpool.inl.

Here is the call graph for this function:

◆ Release()

void alib::threads::TCondition< T >::Release ( ALIB_DBG_TAKE_CI )
inline

Releases ownership of this object. If this method is invoked on an object that is not acquired or acquired by a different thread, in debug-compilations an error is raised. In release compilations, this leads to undefined behavior.

See also
Method Acquire.
Debug Parameter:
Pass macro ALIB_CALLER_PRUNED with invocations.

Definition at line 147 of file condition.inl.

◆ Schedule()

template<typename TJob, typename... TArgs>
TJob & alib::threadmodel::ThreadPool::Schedule ( TArgs &&... args)
inlinenodiscard

Pushes a job of the custom type TJob into the priority queue.
The job is returned to the caller to be able to await results. It is the responsibility of the caller to pass the job to either method DeleteJob or DeleteJobDeferred for disposal. Note that the latter causes a Sync on this pool, while with use of the former, the fulfilment of the returned job object has to be awaited first.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
argsVariadic arguments forwarded to the constructor of TJob.
Returns
A reference to the job object for the caller to await results.

Definition at line 406 of file threadpool.inl.

Here is the call graph for this function:

◆ schedule()

template<typename TJob, typename... TArgs>
TJob * alib::threadmodel::ThreadPool::schedule ( bool keepJob,
TArgs &&... args )
inlinenodiscardprotected

Implementation of Schedule and ScheduleVoid.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
keepJobDenotes whether the job should be deleted after execution or not.
argsVariadic arguments forwarded to the constructor of TJob.
Returns
The scheduled job.

Definition at line 295 of file threadpool.inl.

Here is the call graph for this function:

◆ ScheduleVoid()

template<typename TJob, typename... TArgs>
void alib::threadmodel::ThreadPool::ScheduleVoid ( TArgs &&... args)
inline

Pushes a job of the custom type TJob into the priority queue. In contrast to the sibling method Schedule, the job is not returned by this method. Instead, it is scheduled for automatic disposal after execution.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
argsVariadic arguments forwarded to the constructor of TJob.

Definition at line 416 of file threadpool.inl.

Here is the call graph for this function:

◆ Shutdown()

ALIB_DLL void alib::threadmodel::ThreadPool::Shutdown ( )

Removes all threads. While this method waits that all jobs are finalized just as the method WaitForAllIdle does, it is recommended to WaitForAllIdle explicitly, before this method is called. This allows a more graceful shutdown with the possibility to take action on timeouts.

If after the call to WaitForAllIdle no jobs were scheduled, this method is not # supposed to block.

◆ StatsCountedScheduledJobs()

uinteger alib::threadmodel::ThreadPool::StatsCountedScheduledJobs ( )
inline

Returns the number of Jobs that have been scheduled during the lifetime of this instance. This is a statistics method.

Returns
The number of jobs to process, including any currently processed one.

Definition at line 538 of file threadpool.inl.

◆ Sync()

void alib::threadmodel::ThreadPool::Sync ( )
inline

This method ensures all worker threads in the thread pool complete their currently running jobs and also process all jobs that have been scheduled before a call to this method. This forces synchronization such that no new jobs are processed until the synchronization request is fulfilled.
It is particularly useful for scenarios requiring a consistent state or ensuring all pending asynchronous jobs are complete before proceeding.

Consequently, a call to this method may inherently involve blocking the execution in the pool until all prior tasks are finalized. While it is designed to work efficiently with the thread pool mechanism, unnecessary or frequent calls to this method impose a performance disadvantage.

Invoking Sync() schedules a special synchronization job in the queue of this ThreadPool. Thus, the method is non-blocking and instantly returns.

Necessity for Synchronization Explained with a Sample:
The requirement for synchronization is best illustrated with a practical scenario. Consider a case where the main thread is responsible for scanning the filesystem. For each file that meets certain criteria, a job is scheduled in the ThreadPool to read the content of that file. At this stage, the main thread is focused solely on scheduling file-reading jobs and isn't directly processing the files.
Now, before the application proceeds to schedule further jobs, such as processing, analyzing, or aggregating the file data, it is crucial to ensure that all file-reading jobs have completed. Without a synchronization mechanism, there is a risk that some worker threads are still reading files while other threads - already assigned to the dependent processing tasks — begin before the file data is available.
A call to Sync() resolves this issue by ensuring that all file-reading jobs are completed before any subsequent jobs that rely on their output are scheduled or processed. This guarantees consistency, prevents race conditions, and ensures that no dependent thread gets an incomplete or inconsistent dataset to work with. In summary, synchronization acts as a safeguard in parallelized workflows where the logical order of operations must be maintained across multiple threads, particularly when tasks are interdependent.
See also
Method DeleteJobDeferred, which likewise causes a synchronization.

Definition at line 495 of file threadpool.inl.

Here is the call graph for this function:

◆ WaitForAllIdle()

ALIB_DLL bool alib::threadmodel::ThreadPool::WaitForAllIdle ( Ticks::Duration timeout,
Ticks::Duration dbgWarnAfter )

Waits until all threads are idle.

Parameters
timeoutThe maximum time to wait.
dbgWarnAfterThe time after which a warning message will be printed to the debug log if the timeout was reached.
This parameter is only available in debug-compilations and thus should be passed using macro ALIB_DBG.
Returns
true if all threads are idle, false otherwise.

The documentation for this class was generated from the following file: