ALib C++ Library
Library Version: 2412 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
ThreadPool Class Reference

Description:

Attention
This class belongs to module ALib ThreadModel, which is not in a stable and consistent state, yet. Also this type is considered experimental.

This class provides a configurable and scalable thread pooling mechanism for managing concurrent job execution. It supports fixed or dynamically resizing thread pools to balance workload and optimize resource usage, along with several key methods to schedule, manage, and monitor job execution.

Key Features

  1. Thread Pool Management:
    • Supports fixed-size and dynamically resizable thread pools.
    • Dynamic resizing is handled through customizable parameters found in the struct ResizeStrategy.
    • Automatically adjusts the number of threads based on workload.
  2. Jobs:
    The worker-threads managed by the pool are processing Job objects, which
    a) Carry the data needed for processing, and
    b) Optionally provide synchronization mechanics that allow a caller to wait until a job was processed or periodically test for completion.

    Allocation and life-cycle management of jobs is efficiently implemented leveraging the mechanics provided by ALib Module ALib Monomem.

  3. Synchronization and Job-Cleanup:
    Provides a method to synchronize all worker threads to ensure that processing continues only after a certain set of jobs has been executed.

  4. Worker Management and Monitoring:
    • Supports querying idle worker status and actively running threads.
    • Debugging options to analyze job types and execution states.
See also
Chapter 2. Class ThreadPool of this module's Programmer's Manual provides a quick source code sample that demonstrates the use this class.

Definition at line 84 of file threadpool.hpp.

#include <threadpool.hpp>

Inheritance diagram for ThreadPool:
[legend]
Collaboration diagram for ThreadPool:
[legend]

Inner Type Index:

struct  DbgKnownJobsEntry
 Entry in the field DbgKnownJobs. More...
 
struct  DbgKnownJobsVD
 Serves as template parameter TValueDescriptor of field DbgKnownJobs. More...
 
struct  JobSyncer
 
struct  QueueEntry
 Container element of the queue. More...
 
struct  ResizeStrategy
 

Public Static Method Index:

static int HardwareConcurrency () noexcept
 

Public Field Index:

ResizeStrategy Strategy
 

Public Method Index:

ALIB_API ThreadPool ()
 
ALIB_API ~ThreadPool () override
 Destructor. Cleans up and shuts down the thread pool.
 
void Acquire (ALIB_DBG_TAKE_CI)
 
int CountedIdleWorkers ()
 
int CountedOpenJobs ()
 
int CountedWorkers ()
 
ALIB_API int DbgDumpKnownJobs (NAString &target, const NString &linePrefix=" ")
 
virtual ALIB_API bool DCSIsAcquired () const override
 
virtual ALIB_API bool DCSIsSharedAcquired () const override
 
void DeleteJob (Job &job)
 
void DeleteJobDeferred (Job &job)
 
MonoAllocatorGetAllocator ()
 
PoolAllocatorGetPoolAllocator ()
 
bool IsIdle ()
 
void Release (ALIB_DBG_TAKE_CI)
 
template<typename TJob , typename... TArgs>
TJob & Schedule (TArgs &&... args)
 
template<typename TJob , typename... TArgs>
void ScheduleVoid (TArgs &&... args)
 
ALIB_API void Shutdown ()
 
uinteger StatsCountedScheduledJobs ()
 
void Sync ()
 
ALIB_API bool WaitForAllIdle (Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
 
- Public Method Index: inherited from DbgCriticalSections::AssociatedLock
virtual ~AssociatedLock ()
 Virtual Destructor.
 

Protected Field Index:

int ctdIdle =0
 The counted number of currently idle workers.
 
int ctdOpenJobs {0}
 The number of jobs in the queue.
 
uinteger ctdStatJobsScheduled {0}
 The number of jobs in the queue.
 
int ctdWorkers =0
 The counted number of currently of workers.
 
HashTable< MonoAllocator, DbgKnownJobsVDDbgKnownJobs
 Table of known job types and their sizes.
 
PWorker * lastThreadToJoin = nullptr
 
MonoAllocator ma
 Mono allocator. Used for jobs and by PWorkers.
 
int nextWorkerID =0
 A number that is increased with the creation of new workers and added to their.
 
PoolAllocator pool
 Pool allocator. Used for job objects.
 
List< PoolAllocator, QueueEntry, Recycling::None > queue
 The queue of jobs.
 
Ticks timeOfLastSizeChange
 The point in time of the last change of thread size.
 
HashSet< MonoAllocator, PWorker * > workers
 The list of worker threads.
 
- Protected Field Index: inherited from TCondition< TDerived >
std::condition_variable conditionVariable
 The condition variable used for blocking and notification.
 
DbgConditionAsserter Dbg
 The debug tool instance.
 
std::mutex mutex
 The mutex used for locking this instance.
 

Protected Method Index:

ALIB_API void addThread ()
 Internal method that adds a thread. Must only be called when acquired.
 
bool isConditionMet ()
 
QueueEntry pop (PWorker *worker)
 
void pushAndRelease (QueueEntry &&entry)
 
template<typename TJob , typename... TArgs>
TJob * schedule (bool keepJob, TArgs &&... args)
 
- Protected Method Index: inherited from TCondition< TDerived >
 TCondition (const String &dbgName)
 
void Acquire (ALIB_DBG_TAKE_CI)
 
TDerived & cast ()
 
void Release (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotify (ALIB_DBG_TAKE_CI)
 
void ReleaseAndNotifyAll (ALIB_DBG_TAKE_CI)
 
void WaitForNotification (ALIB_DBG_TAKE_CI)
 
void WaitForNotification (const Ticks &wakeUpTime, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration &maxWaitTimeSpan, const CallerInfo &ci)
 
void WaitForNotification (const Ticks::Duration::TDuration &maxWaitTimeSpan, const CallerInfo &ci)
 
- Protected Method Index: inherited from DbgCriticalSections::AssociatedLock
virtual ~AssociatedLock ()
 Virtual Destructor.
 

Friends And Related Entity Details:

◆ PWorker

friend struct PWorker
friend

Definition at line 90 of file threadpool.hpp.

◆ threads::TCondition< ThreadPool >

friend struct threads::TCondition< ThreadPool >
friend

Definition at line 90 of file threadpool.hpp.

Field Details:

◆ ctdIdle

int ctdIdle =0
protected

The counted number of currently idle workers.

Definition at line 201 of file threadpool.hpp.

◆ ctdOpenJobs

int ctdOpenJobs {0}
protected

The number of jobs in the queue.

Definition at line 273 of file threadpool.hpp.

◆ ctdStatJobsScheduled

uinteger ctdStatJobsScheduled {0}
protected

The number of jobs in the queue.

Definition at line 270 of file threadpool.hpp.

◆ ctdWorkers

int ctdWorkers =0
protected

The counted number of currently of workers.

Definition at line 198 of file threadpool.hpp.

◆ DbgKnownJobs

HashTable<MonoAllocator, DbgKnownJobsVD > DbgKnownJobs
protected

Table of known job types and their sizes.

Definition at line 231 of file threadpool.hpp.

◆ lastThreadToJoin

PWorker* lastThreadToJoin = nullptr
protected

Set if the last thread is terminated and ctdWorkers goes to 0. This thread is joined by Shutdown or when a new thread is added.

Definition at line 301 of file threadpool.hpp.

◆ ma

MonoAllocator ma
protected

Mono allocator. Used for jobs and by PWorkers.

Definition at line 189 of file threadpool.hpp.

◆ nextWorkerID

int nextWorkerID =0
protected

A number that is increased with the creation of new workers and added to their.

Definition at line 208 of file threadpool.hpp.

◆ pool

PoolAllocator pool
protected

Pool allocator. Used for job objects.

Definition at line 192 of file threadpool.hpp.

◆ queue

List<PoolAllocator, QueueEntry, Recycling::None> queue
protected

The queue of jobs.

Definition at line 266 of file threadpool.hpp.

◆ Strategy

ResizeStrategy Strategy

The parameters used for scaling the amount of worker threads. The values herein can be changed from outside with direct access.

Definition at line 360 of file threadpool.hpp.

◆ timeOfLastSizeChange

Ticks timeOfLastSizeChange
protected

The point in time of the last change of thread size.

Definition at line 204 of file threadpool.hpp.

◆ workers

HashSet<MonoAllocator, PWorker*> workers
protected

The list of worker threads.

Definition at line 195 of file threadpool.hpp.

Constructor(s) / Destructor Details:

◆ ThreadPool()

Constructor. Initializes the thread pool with default settings for field Strategy.

Method Details:

◆ Acquire()

void Acquire ( ALIB_DBG_TAKE_CI )
inline

A thread which invokes this method gets registered as the current owner of this object, until the same thread releases the ownership invoking Release. In the case that this object is already owned by another thread, the invoking thread is suspended until ownership can be gained.

Multiple (nested) calls to this method are not supported and constitute undefined behavior. In debug-compilations, an assertion is raised.

An instance has to be acquired before invoking any of the notifiy- or wait-methods. When return from a notification method, the instance is released. With return from a wait method, the instance is still held.

Debug Parameter:
Pass macro ALIB_CALLER_PRUNED with invocations.

Definition at line 233 of file condition.hpp.

◆ CountedIdleWorkers()

int CountedIdleWorkers ( )
inline

Returns the current number of idle workers.

Returns
The number of workers waiting on jobs to process.

Definition at line 411 of file threadpool.hpp.

◆ CountedOpenJobs()

int CountedOpenJobs ( )
inline

Returns the current number of jobs in the queue.

Note
To get the overall number of unprocessed jobs, the difference between CountedWorkers and CountedIdleWorkers has to be added. However, under racing conditions, this difference might evaluated wrongly. Therefore, if crucial, this pool has to be acquired before determining this.
Returns
The number of jobs to process, not including any currently processed one.

Definition at line 557 of file threadpool.hpp.

◆ CountedWorkers()

int CountedWorkers ( )
inline

Returns the current number of worker threads.

Returns
The number of jobs to process, including any currently processed one.

Definition at line 407 of file threadpool.hpp.

◆ DbgDumpKnownJobs()

ALIB_API int DbgDumpKnownJobs ( NAString & target,
const NString & linePrefix = "  " )

Writes the list of known jobs and their object sizes to the given target.

See also
Field DbgDumpKnownJobs.
Parameters
targetThe string to write to.
linePrefixA prefix string to each line. Defaults to two spaces.
Returns
The number of known jobs.

◆ DCSIsAcquired()

virtual ALIB_API bool DCSIsAcquired ( ) const
overridevirtual
Returns
true if the lock is acquired (in non-shared mode), false otherwise.

Implements DbgCriticalSections::AssociatedLock.

◆ DCSIsSharedAcquired()

virtual ALIB_API bool DCSIsSharedAcquired ( ) const
overridevirtual
Returns
true if the lock is shared-acquired (by at least any thread). Otherwise, returns false.

Implements DbgCriticalSections::AssociatedLock.

◆ DeleteJob()

void DeleteJob ( Job & job)
inline

Deletes a job object previously scheduled with Schedule.

Attention
The caller must not delete received job instances before they are processed.
In case a caller does not want to wait longer, the method DeleteJobDeferred is to be used, which causes a Sync on this pool. Therefore, it is preferable to either wait on the job and use this method for deletion, or to use the method ScheduleVoid instead of Schedule to not even get involved with job-deletion.
Parameters
jobThe job returned from method Schedule.

Definition at line 456 of file threadpool.hpp.

Here is the call graph for this function:

◆ DeleteJobDeferred()

void DeleteJobDeferred ( Job & job)
inline

Same as DeleteJob but schedules the deletion to be performed. This method is useful when a job instance was received with method Schedule, but the caller does not want to continue waiting for the execution of the job.
If jobs indicate that they have been processed, then the method DeleteJob is to be used.

Attention
Calling this method schedules a Sync. Therefore, the use of this method should be avoided.
See also
Methods ScheduleVoid, DeleteJob and Sync.
Parameters
jobThe job object to delete.

Definition at line 474 of file threadpool.hpp.

Here is the call graph for this function:

◆ GetAllocator()

MonoAllocator & GetAllocator ( )
inline

Returns the mono allocator used by the thread pool. The pool has to be acquired before using it.

Returns
The mono allocator.

Definition at line 386 of file threadpool.hpp.

◆ GetPoolAllocator()

PoolAllocator & GetPoolAllocator ( )
inline

Returns the pool allocator used by the thread pool. The pool has to be acquired before using it.

Returns
The pool allocator.

Definition at line 391 of file threadpool.hpp.

◆ HardwareConcurrency()

static int HardwareConcurrency ( )
inlinestaticnoexcept

Just an alias to Empty Base Optimization .

While the specification says "If the value is not well-defined or not computable, returns \c 0", this method returns 1 in this case.

Used as the default value for constructor parameter pWorkersMax.

Returns
Returns the maximum number of threads that can be expected to run concurrently.

Definition at line 402 of file threadpool.hpp.

◆ isConditionMet()

bool isConditionMet ( )
inlineprotected

Mandatory method needed and invoked by templated base type TCondition.

Returns
true if field queue is not empty and either no sync-job is next or all are idle.

Definition at line 278 of file threadpool.hpp.

◆ IsIdle()

bool IsIdle ( )
inline

Checks if all workers are idle.

Returns
true if the number of idle workers equals the number of workers, false otherwise.

Definition at line 416 of file threadpool.hpp.

◆ pop()

QueueEntry pop ( PWorker * worker)
protected

Moves the job of highest priority out of the queue. Blocks the thread until a job is available.

Parameters
workerThe instance that called this method.
Returns
The job with the highest priority.

◆ pushAndRelease()

void pushAndRelease ( QueueEntry && entry)
inlineprotected

Pushes the given cmd into the priority queue that this class implements.

Parameters
entryThe Job and the deletion flag.

Definition at line 284 of file threadpool.hpp.

Here is the call graph for this function:

◆ Release()

void Release ( ALIB_DBG_TAKE_CI )
inline

Releases ownership of this object. If this method is invoked on an object that is not acquired or acquired by a different thread, in debug-compilations an assertion is raised. In release compilations, this leads to undefined behavior.

See also
Method Acquire.
Debug Parameter:
Pass macro ALIB_CALLER_PRUNED with invocations.

Definition at line 259 of file condition.hpp.

◆ Schedule()

template<typename TJob , typename... TArgs>
TJob & Schedule ( TArgs &&... args)
inlinenodiscard

Pushes a job of the custom type TJob into the priority queue.
The job is returned to the caller to be able to await results. It is the responsibility of the caller to pass the job to either method DeleteJob or DeleteJobDeferred for disposal. Note that the latter causes a Sync on this pool, while with use of the former, the fulfilment of the returned job object has to be awaited first.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
argsVariadic arguments forwarded to the constructor of TJob.
Returns
A reference to the job object for the caller to await results.

Definition at line 430 of file threadpool.hpp.

Here is the call graph for this function:

◆ schedule()

template<typename TJob , typename... TArgs>
TJob * schedule ( bool keepJob,
TArgs &&... args )
inlinenodiscardprotected

Implementation of Schedule and ScheduleVoid.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
keepJobDenotes whether the job should be deleted after execution or not.
argsVariadic arguments forwarded to the constructor of TJob.
Returns
The scheduled job.

Definition at line 321 of file threadpool.hpp.

Here is the call graph for this function:

◆ ScheduleVoid()

template<typename TJob , typename... TArgs>
void ScheduleVoid ( TArgs &&... args)
inline

Pushes a job of the custom type TJob into the priority queue. In contrast to the sibling method Schedule, the job is not returned by this method. Instead, it is scheduled for automatic disposal after execution.

Template Parameters
TJobThe job type to create and schedule.
TArgsTypes of the variadic arguments args that construct TJob.
Parameters
argsVariadic arguments forwarded to the constructor of TJob.

Definition at line 440 of file threadpool.hpp.

Here is the call graph for this function:

◆ Shutdown()

ALIB_API void Shutdown ( )

Removes all threads. While this method waits that all jobs are finalized just as the method WaitForAllIdle does, it is recommended to WaitForAllIdle explicitly, before this method is called. This allows a more graceful shutdown with the possibility to take action on timeouts.

If after the call to WaitForAllIdle no jobs were scheduled, this method is not # supposed to block.

◆ StatsCountedScheduledJobs()

uinteger StatsCountedScheduledJobs ( )
inline

Returns the number of Jobs that have been scheduled during the lifetime of this instance. This is a statistics method.

Returns
The number of jobs to process, including any currently processed one.

Definition at line 562 of file threadpool.hpp.

◆ Sync()

void Sync ( )
inline

This method ensures all worker threads in the thread pool complete their currently running jobs and also process all jobs that have been scheduled before a call to this method. This forces synchronization such that no new jobs are processed until the synchronization request is fulfilled.
It is particularly useful for scenarios requiring a consistent state or ensuring all pending asynchronous jobs are complete before proceeding.

Consequently, a call to this method may inherently involve blocking the execution in the pool until all prior tasks are finalized. While it is designed to work efficiently with the thread pool mechanism, unnecessary or frequent calls to this method impose a performance disadvantage.

Invoking Sync() schedules a special synchronization job in the queue of this ThreadPool. Thus, the method is non-blocking and instantly returns.

Necessity for Synchronization Explained with a Sample:
The requirement for synchronization is best illustrated with a practical scenario. Consider a case where the main thread is responsible for scanning the filesystem. For each file that meets certain criteria, a job is scheduled in the ThreadPool to read the content of that file. At this stage, the main thread is focused solely on scheduling file-reading jobs and isn't directly processing the files.
Now, before the application proceeds to schedule further jobs, such as processing, analyzing, or aggregating the file data, it is crucial to ensure that all file-reading jobs have completed. Without a synchronization mechanism, there is a risk that some worker threads are still reading files while other threads - already assigned to the dependent processing tasks — begin before the file data is available.
A call to Sync() resolves this issue by ensuring that all file-reading jobs are completed before any subsequent jobs that rely on their output are scheduled or processed. This guarantees consistency, prevents race conditions, and ensures that no dependent thread gets an incomplete or inconsistent dataset to work with. In summary, synchronization acts as a safeguard in parallelized workflows where the logical order of operations must be maintained across multiple threads, particularly when tasks are interdependent.
See also
Method DeleteJobDeferred, which likewise causes a synchronization.

Definition at line 519 of file threadpool.hpp.

Here is the call graph for this function:

◆ WaitForAllIdle()

ALIB_API bool WaitForAllIdle ( Ticks::Duration timeout,
Ticks::Duration dbgWarnAfter )

Waits until all threads are idle.

Parameters
timeoutThe maximum time to wait.
dbgWarnAfterThe time after which a warning message will be printed to the debug log if the timeout was reached.
This parameter is only available in debug-compilations and thus should be passed using macro ALIB_DBG.
Returns
true if all threads are idle, false otherwise.

The documentation for this class was generated from the following file: