8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
29using namespace std::literals::chrono_literals;
34struct PWorker :
protected Thread
40 PWorker(ThreadPool& ptp,
const character* threadName )
43 , nameBuffer(threadName)
44 { SetName(nameBuffer); }
46 PWorker(ThreadPool& ptp )
54 ALIB_MESSAGE(
"MGTHR",
"PWorker \"{}\" is running", GetName() )
58 auto queueEntry= tp.pop(
this);
59 if ( queueEntry.job ==
nullptr )
64 if ( queueEntry.job->Do() )
69 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
74 if ( !queueEntry.keep )
76 auto size= queueEntry.job->SizeOf();
77 queueEntry.job->~Job();
78 tp.GetPoolAllocator().free( queueEntry.job, size );
83 ALIB_MESSAGE(
"MGTHR",
"PWorker \"{}\" is stopping (leaving method Run())",
101 Dbg.Name=
A_CHAR(
"DWManager");
103 #if ALIB_DEBUG_CRITICAL_SECTIONS
104 ma.DbgCriticalSectionsPH.Get()->DCSLock=
this;
108ThreadPool::~ThreadPool()
111 "ThreadPool destruction while not idle. Please call WaitForAllIdle().\n"
112 "There are still {} workers running. Open jobs: ", ctdWorkers-ctdIdle, ctdOpenJobs )
115 "ThreadPool destructor: There are still {} threads running.\n"
116 "While ThreadPool::Shutdown is called now, it is recommended to explicitly "
117 "shutdown the pool before destruction.", ctdWorkers )
123 #if ALIB_DEBUG_ALLOCATIONS
125 for (
int i = 2; i < 32; ++i)
127 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
130 "ThreadPool destructor: There is(are) still " << qtyObjects <<
" object(s) of size "
131 << (1L << i) <<
" in the PoolAllocator.\n";
133 if ( warning.IsNotEmpty() )
137 " This indicates that Job-objects have not been deleted during the run.\n"
138 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
139 " This is a potential memory leak.\n"
140 " Known Job-types and their sizes are:\n";
141 DbgDumpKnownJobs(warning,
" ");
143 pool.DbgSuppressNonFreedObjectsWarning();
149#if ALIB_DEBUG_CRITICAL_SECTIONS
150bool ThreadPool::DCSIsAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
151bool ThreadPool::DCSIsSharedAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
155int ThreadPool::DbgDumpKnownJobs(NAString& target,
const NString& linePrefix )
158 for (
auto job : DbgKnownJobs )
160 target << linePrefix <<
NField( ++i, 2) <<
": "
161 << *job.TID <<
NTab(30,-1)
162 <<
NField( job.JobSize, 3) <<
" (PA "
163 <<
NField( PoolAllocator::GetAllocationSize(
164 PoolAllocator::GetAllocInformation(job.JobSize)), 3) <<
") "
165 "Usage: " <<
NField(job.Usage, 5) <<
"\n";
172void ThreadPool::addThread()
174 ALIB_MESSAGE(
"MGTHR/STRGY",
"Pool({}/{} -> {}/{}) adding one thread",
175 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers )
178 if ( lastThreadToJoin )
181 "ThreadPool::AddThread: Found a last thread to join but there the number of workers is {}\n"
182 "instead of 0. This should never happen!", ctdWorkers )
184 lastThreadToJoin->Join();
185 delete lastThreadToJoin;
186 lastThreadToJoin=
nullptr;
190 auto* newWorker=
new PWorker( *
this,
String128(
"PWorker") <<
Dec(nextWorkerID++, 3,
nullptr) );
192 auto* newWorker=
new PWorker( *
this );
194 workers.InsertUnique( newWorker );
206 PWorker* workerToJoin;
207 JobJoin() :
Job( typeid(JobJoin) ), workerToJoin(nullptr) {}
215 struct JobStop :
Job { JobStop() :
Job(typeid(JobStop)) {} };
221ThreadPool::QueueEntry ThreadPool::pop(PWorker* caller)
230 if ( queue.back().job == &JOB_JOIN )
232 queue.back().job->Cast<JobJoin>().workerToJoin->Join();
239 else if ( queue.back().job == &JOB_STOP )
243 ALIB_ASSERT(queue.empty(),
"MGTHR")
247 int targetSize= Strategy.GetSize( ctdWorkers,
250 timeOfLastSizeChange );
254 if ( targetSize < ctdWorkers )
256 ALIB_MESSAGE(
"MGTHR/STRGY",
"Pool({}/{} -> {}/{}) leaving pool ({}->{})" ,
257 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers,
258 ctdWorkers, targetSize )
266 JOB_JOIN.workerToJoin= caller;
267 queue.push_back({&JOB_JOIN,
false});
271 lastThreadToJoin= caller;
274 workers.erase(workers.Find(caller));
279 caller->state= Thread::State::Done;
282 return QueueEntry{
nullptr,
false};
286 else if ( targetSize > ctdWorkers )
295 auto entry= queue.back();
297 ALIB_MESSAGE(
"MGTHR/QUEUE",
"Pool({}/{} -> {}/{}) Job({}) popped",
298 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers, &entry.job->ID )
303 if ( entry.job->Is<JobSyncer>() )
305 auto& job= entry.job->Cast<JobSyncer>();
307 if ( job.JobToDelete )
309 size_t size= job.JobToDelete->SizeOf();
310 job.JobToDelete->PrepareDeferredDeletion();
311 job.JobToDelete->~Job();
312 GetPoolAllocator().free( job.JobToDelete, size );
315 GetPoolAllocator().free( &job,
sizeof(JobSyncer) );
324bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
325 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
327 ALIB_MESSAGE(
"MGTHR",
"ThreadPool: Waiting for all jobs to be processed." )
329 Ticks waitStart= Ticks::Now();
330 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
333 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() )
335 ALIB_MESSAGE(
"MGTHR",
"ThreadPool: All are idle. Pool({}/{} -> {}/{})",
336 CountedOpenJobs() , StatsCountedScheduledJobs(),
337 CountedIdleWorkers(), CountedWorkers() )
341 if( nextWarning.Age() > dbgWarnAfter)
344 "ThreadPool: Waiting for all workers to be come idle. Pool({}/{} -> {}/{})",
345 CountedOpenJobs() , StatsCountedScheduledJobs(),
346 CountedIdleWorkers(), CountedWorkers() )
347 nextWarning= Ticks::Now();
352 if (waitStart.Age() > timeout)
354 ALIB_WARNING(
"MGTHR",
"ThreadPool: Timeout while waiting for idle" )
359 Thread::SleepMicros( 50 );
363void ThreadPool::Shutdown()
365 ALIB_MESSAGE(
"MGTHR",
"ThreadPool::Shutdown: Pool({}/{} -> {}/{}) ",
366 CountedOpenJobs() , StatsCountedScheduledJobs(),
367 CountedIdleWorkers(), CountedWorkers() )
371 "ThreadPool::Shutdown called while {} jobs are open. "
372 "Call WaitForAllIdle() before shutdown.", ctdOpenJobs )
378 Strategy.WorkersMax= 0;
379 queue.push_back( {&JOB_STOP,
false} );
386 while( ctdWorkers > 0 )
388 Thread::SleepMicros( 50 );
389 ALIB_DBG(
if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
390 ALIB_MESSAGE(
"MGTHR",
391 "ThreadPool::Shutdown. Waiting for workers to exit. Pool({}/{} -> {}/{})",
392 CountedOpenJobs() , StatsCountedScheduledJobs(),
393 CountedIdleWorkers(), CountedWorkers() )
401 "lastThreadToJoin is null. This must not happen (internal error)." )
403 lastThreadToJoin->Join();
404 delete lastThreadToJoin;
405 lastThreadToJoin=
nullptr;
408 ALIB_MESSAGE(
"MGTHR",
"ThreadPool::Shutdown completed. Pool({}/{} -> {}/{})",
409 CountedOpenJobs() , StatsCountedScheduledJobs(),
410 CountedIdleWorkers(), CountedWorkers() )
#define ALIB_MESSAGE(domain,...)
#define ALIB_WARNING(domain,...)
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_ERROR(domain,...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
strings::TDec< character > Dec
Type alias in namespace alib.
threadmodel::Job Job
Type alias in namespace alib.
threads::TCondition< T > TCondition
Type alias in namespace alib.
LocalString< 16 > String16
Type alias name for TLocalString<character,16>.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
LocalString< 128 > String128
Type alias name for TLocalString<character,128>.
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
threads::Thread Thread
Type alias in namespace alib.
strings::TTab< nchar > NTab
Type alias in namespace alib.
strings::TField< nchar > NField
Type alias in namespace alib.
threadmodel::ThreadPool ThreadPool
Type alias in namespace alib.