8#ifndef HPP_ALIB_THREADMODEL_THREADPOOL
9#define HPP_ALIB_THREADMODEL_THREADPOOL
15#include "alib/threadmodel/jobs.hpp"
23#include "alib/threads/recursivelock.hpp"
39namespace threadmodel {
85#if ALIB_DEBUG_CRITICAL_SECTIONS
90 friend struct PWorker;
157 int GetSize(
int currentWorkers ,
int idleWorkers,
int load,
158 Ticks& lastChangeTime )
160 int target= currentWorkers;
172 { target = (std::min)(
WorkersMax, currentWorkers + 1); }
176 && (currentWorkers - idleWorkers) <= (currentWorkers *
DecreaseThreshold / 100) )
177 { target = (std::max)(
WorkersMin, currentWorkers - 1); }
180 if (target != currentWorkers)
181 lastChangeTime.
Reset();
214 const std::type_info*
TID;
221 const std::type_info* >
287 queue.EmplaceFront( entry );
294 "Job(" << entry.job->ID <<
") pushed" )
319 template<
typename TJob,
typename... TArgs>
330 "No threads to schedule job. Strategy values:\n"
337 TJob* job=
pool().New<TJob>( std::forward<TArgs>(args)... );
339 "Error in ThreadPool::schedule: Job size mismatch. Expected " <<
sizeof(TJob) <<
340 " while virtual method SizeOf returns "<< job->SizeOf() <<
".\n"
341 "Override this method for job-type <" <<
typeid(*job) <<
">" )
344 "Error in ThreadPool::schedule: Job pushed while this pool is shut down already. "
345 "(Strategy.WorkersMax == 0) " )
350 ++pair.first.Value().Usage;
366 #if ALIB_DEBUG_CRITICAL_SECTIONS
403 {
return int(std::thread::hardware_concurrency()); }
428 template<
typename TJob,
typename... TArgs>
431 {
return *
schedule<TJob, TArgs...>(
true, std::forward<TArgs>(args)... ); }
439 template<
typename TJob,
typename... TArgs>
531 Ticks::Duration dbgWarnAfter );
534 ALIB_DBG(, Ticks::Duration dbgWarnAfter) );
536 ALIB_DBG(, Ticks::Duration::TDuration dbgWarnAfter) )
void free(void *mem, size_t size)
ALIB_API ~ThreadPool() override
Destructor. Cleans up and shuts down the thread pool.
int ctdOpenJobs
The number of jobs in the queue.
TJob * schedule(bool keepJob, TArgs &&... args)
int nextWorkerID
A number that is increased with the creation of new workers and added to their.
HashSet< MonoAllocator, PWorker * > workers
The list of worker threads.
uinteger StatsCountedScheduledJobs()
ALIB_API bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
static int HardwareConcurrency() noexcept
HashTable< MonoAllocator, DbgKnownJobsVD > DbgKnownJobs
Table of known job types and their sizes.
virtual ALIB_API bool DCSIsSharedAcquired() const override
void pushAndRelease(QueueEntry &&entry)
ALIB_API int DbgDumpKnownJobs(NAString &target, const NString &linePrefix=" ")
virtual ALIB_API bool DCSIsAcquired() const override
ALIB_API void addThread()
Internal method that adds a thread. Must only be called when acquired.
uinteger ctdStatJobsScheduled
The number of jobs in the queue.
PoolAllocator & GetPoolAllocator()
void ScheduleVoid(TArgs &&... args)
TJob & Schedule(TArgs &&... args)
void Acquire(ALIB_DBG_TAKE_CI)
List< PoolAllocator, QueueEntry, Recycling::None > queue
The queue of jobs.
MonoAllocator ma
Mono allocator. Used for jobs and by PWorkers.
MonoAllocator & GetAllocator()
int ctdWorkers
The counted number of currently of workers.
int ctdIdle
The counted number of currently idle workers.
void DeleteJobDeferred(Job &job)
QueueEntry pop(PWorker *worker)
PWorker * lastThreadToJoin
Ticks timeOfLastSizeChange
The point in time of the last change of thread size.
PoolAllocator pool
Pool allocator. Used for job objects.
#define ALIB_ASSERT_MODULE(modulename)
#define ALIB_MESSAGE(...)
#define ALIB_ASSERT_ERROR(cond,...)
#define ALIB_CALLER_PRUNED
lang::uinteger uinteger
Type alias in namespace alib.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
monomem::TMonoAllocator< lang::HeapAllocator > MonoAllocator
monomem::TPoolAllocator< MonoAllocator, ALIB_MONOMEM_POOLALLOCATOR_DEFAULT_ALIGNMENT > PoolAllocator
virtual ~Job()=default
Protected destructor.
Entry in the field DbgKnownJobs.
size_t Usage
Counter of scheduled jobs of this type.
const std::type_info * TID
The job type.
size_t JobSize
The size of the job object.
Serves as template parameter TValueDescriptor of field DbgKnownJobs.
const std::type_info * Key(DbgKnownJobsEntry &entry) const
virtual size_t SizeOf() override
Job * JobToDelete
Optionally a job to be deleted.
Container element of the queue.
Ticks::Duration DecreaseSchedule
Modes
The modes, fixed or automatic.
@ Fixed
The number of threads is fixed.
Ticks::Duration IncreaseSchedule
int GetSize(int currentWorkers, int idleWorkers, int load, Ticks &lastChangeTime)
int WorkersMin
The minimum number of threads to keep alive.
int WorkersMax
The maximum number of threads to create.
Modes Mode
The mode of operation.
void Acquire(ALIB_DBG_TAKE_CI)
void Release(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)