8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
18 module ALib.ThreadModel;
29using namespace std::literals::chrono_literals;
37 target <<
"TPool(Jobs: " << tp.CountedOpenJobs() <<
'/' << tp.StatsCountedScheduledJobs()
38 <<
" Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
39 <<
"/[" << tp.Strategy.WorkersMin
40 <<
" <= " << tp.CountedWorkers()
41 <<
" <= " << tp.Strategy.WorkersMax
54 if ( queueEntry.job ==
nullptr )
59 if ( queueEntry.job->Do() )
64 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
69 if ( !queueEntry.keep )
70 {ALIB_LOCK_WITH(threadPool)
71 auto size= queueEntry.job->SizeOf();
72 queueEntry.job->~Job();
73 threadPool.GetPoolAllocator().free( queueEntry.job, size );
77 ALIB_MESSAGE(
"TMOD/WORKER",
"PoolWorker \"{}\" is stopping (leaving method Run())",
94 Dbg.Name=
A_CHAR(
"DWManager");
96 #if ALIB_DEBUG_CRITICAL_SECTIONS
97 ma.DbgCriticalSectionsPH.Get()->DCSLock=
this;
102 assert::RegisterPrintable(
typeid(ThreadPool*), [](
const std::any& any, std::string& s) {
103 auto& tp= *std::any_cast<ThreadPool*>(any);
109 std::ostringstream oss;
110 oss <<
"TPool(Jobs: " << tp.CountedOpenJobs() <<
'/' << tp.StatsCountedScheduledJobs()
111 <<
" Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
112 <<
"/[" << tp.Strategy.WorkersMin
113 <<
" <= " << tp.CountedWorkers()
114 <<
" <= " << tp.Strategy.WorkersMax
123ThreadPool::~ThreadPool() {
125 "{}: Destruction while not idle. Please call WaitForAllIdle().\n"
126 "There are still {} workers running. Open jobs: ",
this, ctdWorkers - ctdIdle, ctdOpenJobs )
129 "{}: There are still {} threads running (whil in destructor).\n"
130 "While Shutdown is called now, it is recommended to explicitly "
131 "shutdown the pool before destruction.",
this, ctdWorkers )
137 #if ALIB_DEBUG_ALLOCATIONS
139 for (
int i = 2; i < 32; ++i) {
140 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
143 "ThreadPool destructor: There is(are) still " << qtyObjects <<
" object(s) of size "
144 << (1L << i) <<
" in the PoolAllocator.\n";
146 if ( warning.IsNotEmpty() ) {
149 " This indicates that Job-objects have not been deleted during the run.\n"
150 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
151 " This is a potential memory leak.\n"
152 " Known Job-types and their sizes are:\n";
153 DbgDumpKnownJobs(warning,
" ");
155 pool.DbgSuppressNonFreedObjectsWarning();
161#if ALIB_DEBUG_CRITICAL_SECTIONS
162bool ThreadPool::DCSIsAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
163bool ThreadPool::DCSIsSharedAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
167int ThreadPool::DbgDumpKnownJobs(NAString& target,
const NString& linePrefix ) {
169 for (
auto job : DbgKnownJobs ) {
170 target << linePrefix <<
NField( ++i, 2) <<
": "
171 << *job.TID <<
NTab(30,-1)
172 <<
NField( job.JobSize, 3) <<
" (PA "
173 <<
NField( PoolAllocator::GetAllocationSize(
174 PoolAllocator::GetAllocInformation(job.JobSize)), 3) <<
") "
175 "Usage: " <<
NField(job.Usage, 5) <<
"\n";
185 String128(
"PoolWorker") <<
Dec(nextWorkerID++, 3,
nullptr) );
190void ThreadPool::DisposeWorker( PoolWorker* poolWorker ) { pool().Delete(poolWorker); }
192void ThreadPool::addThread() {
193 ALIB_MESSAGE(
"TMOD/STRGY",
"{}: Adding one thread",
this )
196 if ( lastThreadToJoin ) {
198 "{}: Found a last thread to join but there the number of workers is {}\n"
199 "instead of 0. This should never happen!",
this, ctdWorkers )
201 lastThreadToJoin->
Join();
202 delete lastThreadToJoin;
203 lastThreadToJoin=
nullptr;
206 auto* newWorker= CreateWorker();
208 workers.InsertUnique( newWorker );
223 Job( typeid(JobJoin) )
224 , workerToJoin(nullptr) {}
232struct JobStop :
Job {
233 JobStop() :
Job(typeid(JobStop)) {}
241ThreadPool::QueueEntry ThreadPool::pop(PoolWorker* caller) {
247 ALIB_ASSERT_ERROR(ctdOpenJobs != 0,
"TMOD",
"{}: Job pipe empty after wakeup.",
this )
250 if ( queue.back().job == &JOB_JOIN ) {
251 auto& job= queue.back().job->Cast<JobJoin>();
252 job.workerToJoin->Join();
254 DisposeWorker(job.workerToJoin);
261 else if ( queue.back().job == &JOB_STOP ) {
264 ALIB_ASSERT(queue.empty(),
"TMOD")
268 int targetSize= Strategy.GetSize( ctdWorkers, ctdIdle, ctdOpenJobs, timeOfLastSizeChange );
272 if ( targetSize < ctdWorkers ) {
273 ALIB_MESSAGE(
"TMOD/STRGY",
"{}: Leaving pool ({}->{})",
274 this, ctdWorkers, targetSize )
279 if (ctdWorkers > 1) {
281 JOB_JOIN.workerToJoin= caller;
282 queue.push_back({&JOB_JOIN,
false});
286 lastThreadToJoin= caller;
289 workers.erase(workers.Find(caller));
294 caller->state= Thread::State::Done;
297 return QueueEntry{
nullptr,
false};
301 if ( targetSize > ctdWorkers ) {
309 auto entry= queue.back();
311 ALIB_MESSAGE(
"TMOD/QUEUE",
"{}: Job({}) popped",
this, &entry.job->ID )
316 if ( entry.job->Is<JobSyncer>() ) {
317 auto& job= entry.job->Cast<JobSyncer>();
319 if ( job.JobToDelete ) {
320 size_t size= job.JobToDelete->SizeOf();
321 job.JobToDelete->PrepareDeferredDeletion();
322 job.JobToDelete->~Job();
323 GetPoolAllocator().free( job.JobToDelete, size );
326 GetPoolAllocator().free( &job,
sizeof(JobSyncer) );
336bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
337 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
338 ALIB_MESSAGE(
"TMOD",
"{}: Waiting for all jobs to be processed.",
this )
340 Ticks waitStart= Ticks::Now();
341 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
343 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() ) {
344 ALIB_MESSAGE(
"TMOD",
"{}: All workers are idle.",
this )
348 if( nextWarning.Age() > dbgWarnAfter) {
350 "{}: Waiting for all workers to be come idle.",
this )
351 if ( CountedOpenJobs() ) {
352 for (
auto& job : queue )
355 nextWarning= Ticks::Now();
360 if (waitStart.Age() > timeout) {
361 ALIB_WARNING(
"TMOD",
"{} timeout while waiting for idle", this )
366 Thread::Sleep( IdleWaitTime );
369void ThreadPool::Shutdown() {
373 "{}: Shutdown called while {} jobs are open. "
374 "Call WaitForAllIdle() before shutdown.", this, ctdOpenJobs )
376 if (CountedWorkers() == 0) {
377 ALIB_MESSAGE(
"TMOD",
"{}: Shutdown, no workers alive.",
this)
384 Strategy.WorkersMax= 0;
385 queue.push_back( {&JOB_STOP,
false} );
392 while( CountedWorkers() > 0 ) {
393 Thread::SleepMicros( 50 );
394 ALIB_DBG(
if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
395 ALIB_MESSAGE(
"TMOD",
"{}: Waiting for workers to exit.", this)
403 "{}: The 'lastThreadToJoin' is null. This must not happen (internal error)."
406 lastThreadToJoin->Join();
407 DisposeWorker( lastThreadToJoin );
408 lastThreadToJoin=
nullptr;
virtual ALIB_DLL void Join()
virtual const character * GetName() const
#define ALIB_MESSAGE(domain,...)
#define ALIB_ASSERT(cond, domain)
#define ALIB_STRINGS_TO_NARROW( src, dest, bufSize)
#define ALIB_WARNING(domain,...)
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_ERROR(domain,...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
strings::TDec< character > Dec
Type alias in namespace alib.
threadmodel::Job Job
Type alias in namespace alib.
LocalString< 256 > String256
Type alias name for TLocalString<character,256>.
threads::TCondition< T > TCondition
Type alias in namespace alib.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
LocalString< 128 > String128
Type alias name for TLocalString<character,128>.
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
strings::TTab< nchar > NTab
Type alias in namespace alib.
strings::TField< nchar > NField
Type alias in namespace alib.
characters::character character
Type alias in namespace alib.
threadmodel::PoolWorker PoolWorker
Type alias in namespace alib.
virtual void PrepareJob(Job *job)
ThreadPool & threadPool
The pool that this instance belongs to.