1using namespace std::literals::chrono_literals;
9 target <<
"TPool(Jobs: " << tp.CountedOpenJobs() <<
'/' << tp.StatsCountedScheduledJobs()
10 <<
" Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
11 <<
"/[" << tp.Strategy.WorkersMin
12 <<
" <= " << tp.CountedWorkers()
13 <<
" <= " << tp.Strategy.WorkersMax
26 if ( queueEntry.job ==
nullptr )
31 if ( queueEntry.job->Do() )
36 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
41 if ( !queueEntry.keep )
42 {ALIB_LOCK_WITH(threadPool)
43 auto size= queueEntry.job->SizeOf();
44 queueEntry.job->~Job();
45 threadPool.GetPoolAllocator().free( queueEntry.job, size );
49 ALIB_MESSAGE(
"TMOD/WORKER",
"PoolWorker \"{}\" is stopping (leaving method Run())",
66 Dbg.Name=
A_CHAR(
"DWManager");
68 #if ALIB_DEBUG_CRITICAL_SECTIONS
69 ma.DbgCriticalSectionsPH.Get()->DCSLock=
this;
74 assert::RegisterPrintable(
typeid(ThreadPool*), [](
const std::any& any, std::string& s) {
75 auto& tp= *std::any_cast<ThreadPool*>(any);
81 std::ostringstream oss;
82 oss <<
"TPool(Jobs: " << tp.CountedOpenJobs() <<
'/' << tp.StatsCountedScheduledJobs()
83 <<
" Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
84 <<
"/[" << tp.Strategy.WorkersMin
85 <<
" <= " << tp.CountedWorkers()
86 <<
" <= " << tp.Strategy.WorkersMax
95ThreadPool::~ThreadPool() {
97 "{}: Destruction while not idle. Please call WaitForAllIdle().\n"
98 "There are still {} workers running. Open jobs: ",
this, ctdWorkers - ctdIdle, ctdOpenJobs )
101 "{}: There are still {} threads running (whil in destructor).\n"
102 "While Shutdown is called now, it is recommended to explicitly "
103 "shutdown the pool before destruction.",
this, ctdWorkers )
109 #if ALIB_DEBUG_ALLOCATIONS
111 for (
int i = 2; i < 32; ++i) {
112 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
115 "ThreadPool destructor: There is(are) still " << qtyObjects <<
" object(s) of size "
116 << (1L << i) <<
" in the PoolAllocator.\n";
118 if ( warning.IsNotEmpty() ) {
121 " This indicates that Job-objects have not been deleted during the run.\n"
122 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
123 " This is a potential memory leak.\n"
124 " Known Job-types and their sizes are:\n";
125 DbgDumpKnownJobs(warning,
" ");
127 pool.DbgSuppressNonFreedObjectsWarning();
133#if ALIB_DEBUG_CRITICAL_SECTIONS
134bool ThreadPool::DCSIsAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
135bool ThreadPool::DCSIsSharedAcquired()
const {
return Dbg.IsOwnedByCurrentThread(); }
139int ThreadPool::DbgDumpKnownJobs(NAString& target,
const NString& linePrefix ) {
141 for (
auto jobIt : DbgKnownJobs ) {
142 target << linePrefix <<
NField( ++i, 2) <<
": "
143 << *jobIt.TID <<
NTab(30,-1)
144 <<
NField( jobIt.JobSize, 3) <<
" (PA "
145 <<
NField( PoolAllocator::GetAllocationSize(
146 PoolAllocator::GetAllocInformation(jobIt.JobSize)), 3) <<
") "
147 "Usage: " <<
NField(jobIt.Usage, 5) <<
"\n";
157 String128(
"PoolWorker") <<
Dec(nextWorkerID++, 3,
nullptr) );
162void ThreadPool::DisposeWorker( PoolWorker* poolWorker ) { pool().Delete(poolWorker); }
164void ThreadPool::addThread() {
165 ALIB_MESSAGE(
"TMOD/STRGY",
"{}: Adding one thread",
this )
168 if ( lastThreadToJoin ) {
170 "{}: Found a last thread to join but there the number of workers is {}\n"
171 "instead of 0. This should never happen!",
this, ctdWorkers )
173 lastThreadToJoin->
Join();
174 delete lastThreadToJoin;
175 lastThreadToJoin=
nullptr;
178 auto* newWorker= CreateWorker();
180 workers.InsertUnique( newWorker );
191struct JobJoin :
Job {
194 Job( typeid(JobJoin) )
195 , workerToJoin(nullptr) {}
203struct JobStop :
Job {
204 JobStop() :
Job(typeid(JobStop)) {}
212ThreadPool::QueueEntry ThreadPool::pop(PoolWorker* caller) {
218 ALIB_ASSERT_ERROR(ctdOpenJobs != 0,
"TMOD",
"{}: Job pipe empty after wakeup.",
this )
221 if ( queue.back().job == &JOB_JOIN ) {
222 auto& job= queue.back().job->Cast<JobJoin>();
223 job.workerToJoin->Join();
225 DisposeWorker(job.workerToJoin);
232 else if ( queue.back().job == &JOB_STOP ) {
235 ALIB_ASSERT(queue.empty(),
"TMOD")
239 int targetSize= Strategy.GetSize( ctdWorkers, ctdIdle, ctdOpenJobs, timeOfLastSizeChange );
243 if ( targetSize < ctdWorkers ) {
244 ALIB_MESSAGE(
"TMOD/STRGY",
"{}: Leaving pool ({}->{})",
245 this, ctdWorkers, targetSize )
250 if (ctdWorkers > 1) {
252 JOB_JOIN.workerToJoin= caller;
253 queue.push_back({&JOB_JOIN,
false});
257 lastThreadToJoin= caller;
260 workers.erase(workers.Find(caller));
265 caller->state= Thread::State::Done;
268 return QueueEntry{
nullptr,
false};
272 if ( targetSize > ctdWorkers ) {
280 auto entry= queue.back();
282 ALIB_MESSAGE(
"TMOD/QUEUE",
"{}: Job({}) popped",
this, &entry.job->ID )
287 if ( entry.job->Is<JobSyncer>() ) {
288 auto& job= entry.job->Cast<JobSyncer>();
290 if ( job.JobToDelete ) {
291 size_t size= job.JobToDelete->SizeOf();
292 job.JobToDelete->PrepareDeferredDeletion();
293 job.JobToDelete->~Job();
294 GetPoolAllocator().free( job.JobToDelete, size );
297 GetPoolAllocator().free( &job,
sizeof(JobSyncer) );
307bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
308 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
309 ALIB_MESSAGE(
"TMOD",
"{}: Waiting for all jobs to be processed.",
this )
311 Ticks waitStart= Ticks::Now();
312 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
314 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() ) {
315 ALIB_MESSAGE(
"TMOD",
"{}: All workers are idle.",
this )
319 if( nextWarning.Age() > dbgWarnAfter) {
321 "{}: Waiting for all workers to be come idle.",
this )
322 if ( CountedOpenJobs() ) {
323 for (
auto& job : queue )
326 nextWarning= Ticks::Now();
331 if (waitStart.Age() > timeout) {
332 ALIB_WARNING(
"TMOD",
"{} timeout while waiting for idle", this )
337 Thread::Sleep( IdleWaitTime );
340void ThreadPool::Shutdown() {
344 "{}: Shutdown called while {} jobs are open. "
345 "Call WaitForAllIdle() before shutdown.", this, ctdOpenJobs )
347 if (CountedWorkers() == 0) {
348 ALIB_MESSAGE(
"TMOD",
"{}: Shutdown, no workers alive.",
this)
355 Strategy.WorkersMax= 0;
356 queue.push_back( {&JOB_STOP,
false} );
363 while( CountedWorkers() > 0 ) {
364 Thread::SleepMicros( 50 );
365 ALIB_DBG(
if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
366 ALIB_MESSAGE(
"TMOD",
"{}: Waiting for workers to exit.", this)
374 "{}: The 'lastThreadToJoin' is null. This must not happen (internal error)."
377 lastThreadToJoin->Join();
378 DisposeWorker( lastThreadToJoin );
379 lastThreadToJoin=
nullptr;
#define ALIB_MESSAGE(domain,...)
#define ALIB_ASSERT(cond, domain)
#define ALIB_WARNING(domain,...)
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_ERROR(domain,...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
virtual const character * GetName() const
threads::TCondition< T > TCondition
Type alias in namespace #"%alib".
strings::TField< nchar > NField
Type alias in namespace #"%alib".
threadmodel::Job Job
Type alias in namespace #"%alib".
strings::TTab< nchar > NTab
Type alias in namespace #"%alib".
strings::TDec< character > Dec
Type alias in namespace #"%alib".
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
NLocalString< 2048 > NString2K
Type alias name for #"TLocalString;TLocalString<nchar,2048>".
LocalString< 128 > String128
Type alias name for #"TLocalString;TLocalString<character,128>".
LocalString< 256 > String256
Type alias name for #"TLocalString;TLocalString<character,256>".
characters::character character
Type alias in namespace #"%alib".
threadmodel::PoolWorker PoolWorker
Type alias in namespace #"%alib".
#define ALIB_STRINGS_TO_NARROW( src, dest, bufSize)
virtual void PrepareJob(Job *job)
ThreadPool & threadPool
The pool that this instance belongs to.