15using namespace std::literals::chrono_literals;
20struct PWorker :
protected Thread
24 PWorker(ThreadPool& ptp,
const String& threadName) :
Thread(threadName ), tp(ptp) {}
32 auto queueEntry= tp.pop(
this);
33 if ( queueEntry.job ==
nullptr )
38 if ( queueEntry.job->Do() )
43 <<
"Job of type <" << queueEntry.job->ID
44 <<
"> passed to thread pool has no Job::Do() implementation!" )
48 if ( !queueEntry.keep )
50 auto size= queueEntry.job->SizeOf();
51 queueEntry.job->~Job();
52 tp.GetPoolAllocator().free( queueEntry.job, size );
57 NString4K msg(
"PWorker \""); msg << GetName() <<
"\" is stopping (leaving method Run()).";
75 Dbg.Name=
A_CHAR(
"DWManager");
77 #if ALIB_DEBUG_CRITICAL_SECTIONS
78 ma.DbgCriticalSectionsPH.Get()->DCSLock=
this;
82ThreadPool::~ThreadPool()
85 "ThreadPool destruction while not idle. Please call WaitForAllIdle().\n"
86 "There are still " << (ctdWorkers-ctdIdle)<<
" workers running. "
87 "Open jobs: " << ctdOpenJobs )
90 "ThreadPool destructor: There are still " << ctdWorkers<<
" threads running.\n"
91 "While ThreadPool::Shudown is called now, it is recommended to explicitly "
92 "shutdown the pool before destruction." )
98 #if ALIB_DEBUG_ALLOCATIONS
100 for (
int i = 2; i < 32; ++i)
102 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
105 "ThreadPool destructor: There is(are) still " << qtyObjects <<
" object(s) of size "
106 << (1L << i) <<
" in the PoolAllocator.\n";
108 if ( warning.IsNotEmpty() )
112 " This indicates that Job-objects have not been deleted during run.\n"
113 " Alternatively, certain jobs used the pool allocator without freeing the their data\n"
114 " This is a potential memory leak.\n"
115 " Known Job-types and their sizes are:\n";
116 DbgDumpKnownJobs(warning,
" ");
117 ALIB_WARNING(
"MGTHR", warning )
118 pool.DbgSuppressNonFreedObjectsWarning();
124#if ALIB_DEBUG_CRITICAL_SECTIONS
125bool ThreadPool::DCSIsAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
126bool ThreadPool::DCSIsSharedAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
129int ThreadPool::DbgDumpKnownJobs(NAString& target,
const NString& linePrefix )
132 for (
auto job : DbgKnownJobs )
134 target << linePrefix << NFormat::Field( ++i, 2) <<
": "
135 << *job.TID << NFormat::Tab(30,-1)
136 << NFormat::Field( job.JobSize, 3) <<
" (PA "
137 << NFormat::Field( PoolAllocator::GetAllocationSize(
138 PoolAllocator::GetAllocInformation(job.JobSize)), 3) <<
") "
139 "Usage: " << NFormat::Field(job.Usage, 5) <<
"\n";
145void ThreadPool::addThread()
148 "Pool(" << ctdOpenJobs <<
"/" << ctdStatJobsScheduled <<
149 " -> " << ctdIdle <<
"/" << ctdWorkers <<
") "
150 " adding one thread" )
153 if ( lastThreadToJoin )
156 "ThreadPool::AddThread: Found a last thread to join but there the number of workers is "
157 << ctdWorkers <<
"\ninstead of 0. This should never happen" )
159 lastThreadToJoin->Join();
160 delete lastThreadToJoin;
161 lastThreadToJoin=
nullptr;
164 auto* newWorker= new PWorker( *this, String128("PWorker") << Format(nextWorkerID++, 3,
nullptr) );
165 workers.InsertUnique( newWorker );
177 PWorker* workerToJoin;
178 JobJoin() :
Job( typeid(JobJoin) ), workerToJoin(nullptr) {}
186 struct JobStop :
Job { JobStop() :
Job(typeid(JobStop)) {} };
192ThreadPool::QueueEntry ThreadPool::pop(PWorker* caller)
201 if ( queue.Back().job == &JOB_JOIN )
203 queue.Back().job->Cast<JobJoin>().workerToJoin->Join();
210 else if ( queue.Back().job == &JOB_STOP )
214 ALIB_ASSERT(queue.IsEmpty())
218 int targetSize= Strategy.GetSize( ctdWorkers,
221 timeOfLastSizeChange );
225 if ( targetSize < ctdWorkers )
228 "Pool(" << ctdOpenJobs <<
"/" << ctdStatJobsScheduled <<
229 " -> " << ctdIdle <<
"/" << ctdWorkers <<
") "
230 " leaving pool (" << ctdWorkers <<
"->" << targetSize <<
")" )
239 JOB_JOIN.workerToJoin= caller;
240 queue.PushBack({&JOB_JOIN,
false});
244 lastThreadToJoin= caller;
247 workers.Erase(workers.Find(caller));
252 caller->state= Thread::State::Done;
255 return QueueEntry{
nullptr,
false};
259 else if ( targetSize > ctdWorkers )
268 auto entry= queue.Back();
271 "Pool(" << ctdOpenJobs <<
"/" << ctdStatJobsScheduled <<
272 " -> " << ctdIdle <<
"/" << ctdWorkers <<
") "
273 "Job(" << entry.job->ID <<
") popped" )
278 if ( entry.job->Is<JobSyncer>() )
280 auto& job= entry.job->Cast<JobSyncer>();
282 if ( job.JobToDelete )
284 size_t size= job.JobToDelete->SizeOf();
285 job.JobToDelete->PrepareDeferredDeletion();
286 job.JobToDelete->~Job();
287 GetPoolAllocator().free( job.JobToDelete, size );
290 GetPoolAllocator().free( &job,
sizeof(JobSyncer) );
299bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
300 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
302 ALIB_MESSAGE(
"MGTHR",
"ThreadPool: Waiting for all jobs to be processed." )
304 Ticks waitStart= Ticks::Now();
305 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
308 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() )
311 "ThreadPool: All are idle. Pool("
312 << CountedOpenJobs() <<
"/" << StatsCountedScheduledJobs() <<
"->"
313 << CountedIdleWorkers() <<
"/"
314 << CountedWorkers() <<
")" )
318 if( nextWarning.Age() > dbgWarnAfter)
321 "Waiting for all workers to be come idle. Pool("
322 << CountedOpenJobs() <<
"/" << StatsCountedScheduledJobs() <<
"->"
323 << CountedIdleWorkers() <<
"/"
324 << CountedWorkers() <<
")" )
325 nextWarning= Ticks::Now();
330 if (waitStart.Age() > timeout)
332 ALIB_WARNING(
"MGTHR",
"ThreadPool: Timeout while waiting for idle" )
337 Thread::SleepMicros( 50 );
341void ThreadPool::Shutdown()
344 "ThreadPool::Shutdown: Pool("
345 << CountedOpenJobs() <<
"/" << StatsCountedScheduledJobs() <<
"->"
346 << CountedIdleWorkers() <<
"/"
347 << CountedWorkers() <<
")" )
350 "ThreadPool::Shutdown called while " << ctdOpenJobs << " jobs are open. "
351 " Call WaitForAllIdle() before shutdown." )
357 Strategy.WorkersMax= 0;
358 queue.PushBack( {&JOB_STOP,
false} );
365 while( ctdWorkers > 0 )
367 Thread::SleepMicros( 50 );
368 ALIB_DBG(
if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
369 ALIB_MESSAGE(
"MGTHR", NString256() <<
370 "ThreadPool::Shutdown. Waiting for workers to exit. Pool("
371 << CountedOpenJobs() <<
"/" << StatsCountedScheduledJobs() <<
"->"
372 << CountedIdleWorkers() <<
"/"
373 << CountedWorkers() <<
")" )
380 "ThreadPool::Shutdown: lastThreadToJoin is null. This must not happen (internal error)." )
382 lastThreadToJoin->Join();
383 delete lastThreadToJoin;
384 lastThreadToJoin=
nullptr;
388 "ThreadPool::Shutdown completed. Pool("
389 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
390 << CountedIdleWorkers() << "/"
391 << CountedWorkers() << ")" )
#define ALIB_WARNING(...)
#define ALIB_MESSAGE(...)
#define ALIB_ASSERT_ERROR(cond,...)
#define ALIB_ASSERT_WARNING(cond,...)
#define ALIB_CALLER_PRUNED
threadmodel::ThreadPool ThreadPool
Type alias in namespace alib.
threads::TCondition< T > TCondition
Type alias in namespace alib.
threads::Thread Thread
Type alias in namespace alib.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
NLocalString< 256 > NString256
Type alias name for TLocalString<nchar,256>.
threadmodel::Job Job
Type alias in namespace alib.
NLocalString< 512 > NString512
Type alias name for TLocalString<nchar,512>.
NLocalString< 4096 > NString4K
Type alias name for TLocalString<nchar,8192>.