ALib C++ Library
Library Version: 2511 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
threadpool.cpp
1//##################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2025 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6//##################################################################################################
7#include "alib_precompile.hpp"
8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
10#endif
11#if ALIB_C20_MODULES
12 module;
13#endif
14//========================================= Global Fragment ========================================
15#include "alib/alib.inl"
16//============================================== Module ============================================
17#if ALIB_C20_MODULES
18 module ALib.ThreadModel;
19# if ALIB_STRINGS
20 import ALib.Strings;
21# endif
22 import ALib.Boxing;
23#else
24# include "ALib.Strings.H"
25# include "ALib.Boxing.H"
26# include "ALib.ThreadModel.H"
27#endif
28//========================================== Implementation ========================================
29using namespace std::literals::chrono_literals;
30
31#if ALIB_STRINGS
34 alib::lang::HeapAllocator>::operator()(
37 target << "TPool(Jobs: " << tp.CountedOpenJobs() << '/' << tp.StatsCountedScheduledJobs()
38 << " Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
39 << "/[" << tp.Strategy.WorkersMin
40 << " <= " << tp.CountedWorkers()
41 << " <= " << tp.Strategy.WorkersMax
42 << "]";
43}
44#endif
45
46
47namespace alib::threadmodel {
48
49void PoolWorker::Run() {
50 ALIB_MESSAGE("TMOD/WORKER", "PoolWorker \"{}\" is running", GetName() )
51 for (;;) {
52 // await next job. Break if null.
53 auto queueEntry= threadPool.pop(this);
54 if ( queueEntry.job == nullptr )
55 break;
56
57 // Prepare job (a noop with default pool worker) and call Do()
58 PrepareJob(queueEntry.job);
59 if ( queueEntry.job->Do() )
60 goto CONTINUE;
61
62 // not processed!
63 ALIB_ERROR( "TMOD",
64 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
65 &queueEntry.job->ID )
66
67 CONTINUE:
68 // delete job?
69 if ( !queueEntry.keep )
70 {ALIB_LOCK_WITH(threadPool)
71 auto size= queueEntry.job->SizeOf();
72 queueEntry.job->~Job();
73 threadPool.GetPoolAllocator().free( queueEntry.job, size );
74 } }
75
76 #if ALIB_DEBUG
77 ALIB_MESSAGE( "TMOD/WORKER", "PoolWorker \"{}\" is stopping (leaving method Run())",
78 GetName() )
79 #endif
80}
81
82
84: TCondition (ALIB_DBG(A_CHAR("ThreadPool")))
85, ma (ALIB_DBG( "ThreadPool",) 16)
86, pool (ma)
87, workers (ma)
88#if ALIB_DEBUG
89, DbgKnownJobs (ma)
90#endif
91, queue (pool)
92{
93 #if ALIB_DEBUG
94 Dbg.Name= A_CHAR("DWManager");
95 #endif
96 #if ALIB_DEBUG_CRITICAL_SECTIONS
97 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
98 #endif
99
100
101 #if ALIB_DEBUG
102 assert::RegisterPrintable(typeid(ThreadPool*), [](const std::any& any, std::string& s) {
103 auto& tp= *std::any_cast<ThreadPool*>(any);
104 #if ALIB_STRINGS
105 String256 serialize(tp);
106 ALIB_STRINGS_TO_NARROW(serialize, ns, 256)
107 s+= ns;
108 #else
109 std::ostringstream oss;
110 oss << "TPool(Jobs: " << tp.CountedOpenJobs() << '/' << tp.StatsCountedScheduledJobs()
111 << " Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
112 << "/[" << tp.Strategy.WorkersMin
113 << " <= " << tp.CountedWorkers()
114 << " <= " << tp.Strategy.WorkersMax
115 << "]";
116 s+= oss.str();
117 #endif
118 });
119 #endif
120
121}
122
123ThreadPool::~ThreadPool() {
124 ALIB_ASSERT_ERROR( IsIdle(), "TMOD",
125 "{}: Destruction while not idle. Please call WaitForAllIdle().\n"
126 "There are still {} workers running. Open jobs: ", this, ctdWorkers - ctdIdle, ctdOpenJobs )
127
128 ALIB_ASSERT_WARNING( ctdWorkers == 0, "TMOD",
129 "{}: There are still {} threads running (whil in destructor).\n"
130 "While Shutdown is called now, it is recommended to explicitly "
131 "shutdown the pool before destruction.", this, ctdWorkers )
132
133 if (ctdWorkers > 0)
134 Shutdown();
135
136 // check if there are still objects in the pool allocator
137 #if ALIB_DEBUG_ALLOCATIONS
138 NString2K warning;
139 for (int i = 2; i < 32; ++i) {
140 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
141 if ( qtyObjects > 0)
142 warning <<
143 "ThreadPool destructor: There is(are) still " << qtyObjects << " object(s) of size "
144 << (1L << i) << " in the PoolAllocator.\n";
145 }
146 if ( warning.IsNotEmpty() ) {
147 warning <<
148 " Hint:\n"
149 " This indicates that Job-objects have not been deleted during the run.\n"
150 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
151 " This is a potential memory leak.\n"
152 " Known Job-types and their sizes are:\n";
153 DbgDumpKnownJobs(warning, " ");
154 ALIB_WARNING( "TMOD", warning )
155 pool.DbgSuppressNonFreedObjectsWarning();
156 }
157 #endif
158}
159
160#if ALIB_DEBUG
161#if ALIB_DEBUG_CRITICAL_SECTIONS
162bool ThreadPool::DCSIsAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
163bool ThreadPool::DCSIsSharedAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
164#endif
165
166#if ALIB_STRINGS
167int ThreadPool::DbgDumpKnownJobs(NAString& target, const NString& linePrefix ) {
168 int i= 0;
169 for ( auto job : DbgKnownJobs ) {
170 target << linePrefix << NField( ++i, 2) << ": "
171 << *job.TID << NTab(30,-1)
172 << NField( job.JobSize, 3) << " (PA "
173 << NField( PoolAllocator::GetAllocationSize(
174 PoolAllocator::GetAllocInformation(job.JobSize)), 3) << ") "
175 "Usage: " << NField(job.Usage, 5) << "\n";
176 }
177 return i;
178}
179#endif
180#endif // ALIB_DEBUG
181
182PoolWorker* ThreadPool::CreateWorker() {
183 #if ALIB_STRINGS
184 return pool().New<PoolWorker>( *this,
185 String128("PoolWorker") << Dec(nextWorkerID++, 3, nullptr) );
186 #else
187 return pool().New<PoolWorker>( *this );
188 #endif
189}
190void ThreadPool::DisposeWorker( PoolWorker* poolWorker ) { pool().Delete(poolWorker); }
191
192void ThreadPool::addThread() {
193 ALIB_MESSAGE( "TMOD/STRGY", "{}: Adding one thread", this )
194
195 // first check if the pool was already used once and is now restarted
196 if ( lastThreadToJoin ) {
197 ALIB_ASSERT_ERROR( ctdWorkers == 0, "TMOD",
198 "{}: Found a last thread to join but there the number of workers is {}\n"
199 "instead of 0. This should never happen!", this, ctdWorkers )
200
201 lastThreadToJoin->Join();
202 delete lastThreadToJoin;
203 lastThreadToJoin= nullptr;
204 }
205
206 auto* newWorker= CreateWorker();
207
208 workers.InsertUnique( newWorker );
209 ++ctdWorkers;
210 newWorker->Start();
211}
212
213
214namespace {
215
216// An internal job used to task the next worker to join a thread that stopped.
217// Note: The last thread will add itself to #lastThreadToJoin, which will be joined
218// with the method #Shutdown or when a new thread is added.
219struct JobJoin : Job
220{
221 PoolWorker* workerToJoin;
222 JobJoin() :
223 Job( typeid(JobJoin) )
224 , workerToJoin(nullptr) {}
225};
226
227// we only need one instance
228JobJoin JOB_JOIN;
229
230// An internal job used by #Shutdown. It is only emplaced if the queue is empty and
231// all types are idle
232struct JobStop : Job {
233 JobStop() :Job(typeid(JobStop)) {}
234};
235
236// we only need one instance
237JobStop JOB_STOP;
238
239}
240
241ThreadPool::QueueEntry ThreadPool::pop(PoolWorker* caller) {
242 START:
243 Acquire(ALIB_CALLER_PRUNED);
244 ++ctdIdle;
245 WaitForNotification(ALIB_CALLER_PRUNED);
246 --ctdIdle;
247 ALIB_ASSERT_ERROR(ctdOpenJobs != 0, "TMOD", "{}: Job pipe empty after wakeup.", this )
248
249 // check for JobJoin singleton
250 if ( queue.back().job == &JOB_JOIN ) {
251 auto& job= queue.back().job->Cast<JobJoin>();
252 job.workerToJoin->Join();
253
254 DisposeWorker(job.workerToJoin);
255
256 queue.pop_back();
257 --ctdOpenJobs;
258 }
259
260 // check for JobStop singleton
261 else if ( queue.back().job == &JOB_STOP ) {
262 queue.pop_back();
263 --ctdOpenJobs;
264 ALIB_ASSERT(queue.empty(), "TMOD")
265 }
266
267 // check if we need to change the pool size
268 int targetSize= Strategy.GetSize( ctdWorkers, ctdIdle, ctdOpenJobs, timeOfLastSizeChange );
269
270
271 // leaving pool?
272 if ( targetSize < ctdWorkers ) {
273 ALIB_MESSAGE( "TMOD/STRGY", "{}: Leaving pool ({}->{})",
274 this, ctdWorkers, targetSize )
275
276 // make sure the calling thread is joined. This is either done by the next
277 // worker, or by the (main-) thread that calls Shutdown, or if a new thread
278 // is added later
279 if (ctdWorkers > 1) {
280 // put 'myself' in the JobJoin singleton and add me to the front of the pipe.
281 JOB_JOIN.workerToJoin= caller;
282 queue.push_back({&JOB_JOIN, false});
283 ctdOpenJobs++;
284 }
285 else
286 lastThreadToJoin= caller;
287
288 // remove the caller from the worker list
289 workers.erase(workers.Find(caller));
290 --ctdWorkers;
291
292 // in any case, mark the caller as done already to avoid a warning if joining comes
293 // faster than the exit:
294 caller->state= Thread::State::Done;
295
296 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
297 return QueueEntry{nullptr, false};
298 }
299
300 // increasing pool?
301 if ( targetSize > ctdWorkers ) {
302 addThread();
303 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
304 goto START;
305 }
306
307
308 // start working
309 auto entry= queue.back();
310 queue.pop_back();
311 ALIB_MESSAGE( "TMOD/QUEUE", "{}: Job({}) popped", this, &entry.job->ID )
312
313 --ctdOpenJobs;
314
315 // Sync-job with optional deferred job deletion
316 if ( entry.job->Is<JobSyncer>() ) {
317 auto& job= entry.job->Cast<JobSyncer>();
318
319 if ( job.JobToDelete ) {
320 size_t size= job.JobToDelete->SizeOf();
321 job.JobToDelete->PrepareDeferredDeletion();
322 job.JobToDelete->~Job();
323 GetPoolAllocator().free( job.JobToDelete, size );
324 }
325
326 GetPoolAllocator().free( &job, sizeof(JobSyncer) );
327 ALIB_ASSERT(ctdIdle + 1 == ctdWorkers, "TMOD")
328 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED); // wakeup others (all are idle)
329 goto START;
330 }
331
332 Release(ALIB_CALLER_PRUNED);
333 return entry;
334}
335
336bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
337 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
338 ALIB_MESSAGE("TMOD", "{}: Waiting for all jobs to be processed.", this )
339
340 Ticks waitStart= Ticks::Now();
341 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
342 while(true) {
343 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() ) {
344 ALIB_MESSAGE("TMOD", "{}: All workers are idle.", this )
345 return true;
346 }
347 #if ALIB_DEBUG
348 if( nextWarning.Age() > dbgWarnAfter) {
349 ALIB_MESSAGE( "TMOD",
350 "{}: Waiting for all workers to be come idle.", this )
351 if ( CountedOpenJobs() ) {
352 for (auto& job : queue )
353 ALIB_MESSAGE( "TMOD", " Job: {}", &job.job->ID)
354 }
355 nextWarning= Ticks::Now();
356 }
357 #endif
358
359 // check timeout
360 if (waitStart.Age() > timeout) {
361 ALIB_WARNING("TMOD", "{} timeout while waiting for idle", this )
362 return false;
363 }
364
365 // sleep
366 Thread::Sleep( IdleWaitTime );
367} }
368
369void ThreadPool::Shutdown() {
370 ALIB_MESSAGE("TMOD", "{}: Shutdown", this)
371
372 ALIB_ASSERT_ERROR( ctdOpenJobs == 0, "TMOD",
373 "{}: Shutdown called while {} jobs are open. "
374 "Call WaitForAllIdle() before shutdown.", this, ctdOpenJobs )
375
376 if (CountedWorkers() == 0) {
377 ALIB_MESSAGE("TMOD", "{}: Shutdown, no workers alive.", this)
378 return;
379 }
380 // Schedule a stop-job.
381 // We do this here to meet the wakeup condition without adding
382 // another term to that condition.
383 Acquire(ALIB_CALLER_PRUNED);
384 Strategy.WorkersMax= 0;
385 queue.push_back( {&JOB_STOP, false} );
386 ++ctdOpenJobs;
387 ReleaseAndNotify(ALIB_CALLER_PRUNED);
388
389
390 // wait for all workers to exit
391 ALIB_DBG(Ticks waitTime);
392 while( CountedWorkers() > 0 ) {
393 Thread::SleepMicros( 50 );
394 ALIB_DBG( if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
395 ALIB_MESSAGE("TMOD", "{}: Waiting for workers to exit.", this)
396
397 ALIB_DBG( } )
398
399 }
400
401 // join the last thread
402 ALIB_ASSERT_ERROR(lastThreadToJoin != nullptr, "TMOD",
403 "{}: The 'lastThreadToJoin' is null. This must not happen (internal error)."
404 , this )
405
406 lastThreadToJoin->Join();
407 DisposeWorker( lastThreadToJoin );
408 lastThreadToJoin= nullptr;
409
410 // Success
411 ALIB_MESSAGE("TMOD", "{}: Shutdown completed.) ", this)
412}
413
414
415} // namespace [alib::threadmodel]
virtual ALIB_DLL void Join()
Definition thread.cpp:244
virtual const character * GetName() const
Definition thread.inl:241
#define ALIB_MESSAGE(domain,...)
Definition alib.inl:1064
#define A_CHAR(STR)
#define ALIB_ASSERT(cond, domain)
Definition alib.inl:1065
#define ALIB_STRINGS_TO_NARROW( src, dest, bufSize)
#define ALIB_WARNING(domain,...)
Definition alib.inl:1063
#define ALIB_ASSERT_WARNING(cond, domain,...)
Definition alib.inl:1067
#define ALIB_ERROR(domain,...)
Definition alib.inl:1062
#define ALIB_DBG(...)
Definition alib.inl:853
#define ALIB_ASSERT_ERROR(cond, domain,...)
Definition alib.inl:1066
#define ALIB_CALLER_PRUNED
Definition alib.inl:1024
#define ALIB_DEBUG
Definition prepro.dox.md:21
strings::TDec< character > Dec
Type alias in namespace alib.
Definition format.inl:541
threadmodel::Job Job
Type alias in namespace alib.
Definition jobs.inl:180
LocalString< 256 > String256
Type alias name for TLocalString<character,256>.
threads::TCondition< T > TCondition
Type alias in namespace alib.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
LocalString< 128 > String128
Type alias name for TLocalString<character,128>.
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
strings::TTab< nchar > NTab
Type alias in namespace alib.
Definition format.inl:515
strings::TField< nchar > NField
Type alias in namespace alib.
characters::character character
Type alias in namespace alib.
threadmodel::PoolWorker PoolWorker
Type alias in namespace alib.
virtual void PrepareJob(Job *job)
ThreadPool & threadPool
The pool that this instance belongs to.