ALib C++ Library
Library Version: 2510 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
threadpool.cpp
1// #################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2025 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6// #################################################################################################
7#include "alib_precompile.hpp"
8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
10#endif
11#if ALIB_C20_MODULES
12 module;
13#endif
14// ====================================== Global Fragment ======================================
15#include "alib/alib.inl"
16// =========================================== Module ==========================================
17#if ALIB_C20_MODULES
18 module ALib.ThreadModel;
19# if ALIB_STRINGS
20 import ALib.Strings;
21# endif
22 import ALib.Boxing;
23#else
24# include "ALib.Strings.H"
25# include "ALib.Boxing.H"
26# include "ALib.ThreadModel.H"
27#endif
28// ====================================== Implementation =======================================
29using namespace std::literals::chrono_literals;
30
31
32namespace alib::threadmodel {
33
34struct PWorker : protected Thread
35{
36 friend class ThreadPool;
37 ThreadPool& tp;
38#if ALIB_STRINGS
39 String16 nameBuffer;
40 PWorker(ThreadPool& ptp, const character* threadName )
41 : Thread(threadName)
42 , tp(ptp)
43 , nameBuffer(threadName)
44 { SetName(nameBuffer); } // fix the name to local pointer
45#else
46 PWorker(ThreadPool& ptp )
47 : Thread("Poolworker")
48 , tp(ptp)
49 {}
50#endif
51
52 void Run() override
53 {
54 ALIB_MESSAGE("MGTHR", "PWorker \"{}\" is running", GetName() )
55 for (;;)
56 {
57 // await next job. Break if null.
58 auto queueEntry= tp.pop(this);
59 if ( queueEntry.job == nullptr )
60 break;
61
62
63 // Custom method, implemented with Job::Do()
64 if ( queueEntry.job->Do() )
65 goto CONTINUE;
66
67 // not processed!
68 ALIB_ERROR( "MGTHR",
69 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
70 &queueEntry.job->ID )
71
72 CONTINUE:
73 // delete job?
74 if ( !queueEntry.keep )
75 {ALIB_LOCK_WITH(tp)
76 auto size= queueEntry.job->SizeOf();
77 queueEntry.job->~Job();
78 tp.GetPoolAllocator().free( queueEntry.job, size );
79 }
80 }
81
82 #if ALIB_DEBUG
83 ALIB_MESSAGE( "MGTHR", "PWorker \"{}\" is stopping (leaving method Run())",
84 GetName() )
85 #endif
86 }
87}; // class PWorker
88
89
91: TCondition (ALIB_DBG(A_CHAR("ThreadPool")))
92, ma (ALIB_DBG( "ThreadPool",) 16)
93, pool (ma)
94, workers (ma)
95#if ALIB_DEBUG
96, DbgKnownJobs (ma)
97#endif
98, queue (pool)
99{
100 #if ALIB_DEBUG
101 Dbg.Name= A_CHAR("DWManager");
102 #endif
103 #if ALIB_DEBUG_CRITICAL_SECTIONS
104 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
105 #endif
106}
107
108ThreadPool::~ThreadPool()
109{
110 ALIB_ASSERT_ERROR( IsIdle(), "MGTHR",
111 "ThreadPool destruction while not idle. Please call WaitForAllIdle().\n"
112 "There are still {} workers running. Open jobs: ", ctdWorkers-ctdIdle, ctdOpenJobs )
113
114 ALIB_ASSERT_WARNING( ctdWorkers == 0, "MGTHR",
115 "ThreadPool destructor: There are still {} threads running.\n"
116 "While ThreadPool::Shutdown is called now, it is recommended to explicitly "
117 "shutdown the pool before destruction.", ctdWorkers )
118
119 if (ctdWorkers > 0)
120 Shutdown();
121
122 // check if there are still objects in the pool allocator
123 #if ALIB_DEBUG_ALLOCATIONS
124 NString2K warning;
125 for (int i = 2; i < 32; ++i)
126 {
127 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
128 if ( qtyObjects > 0)
129 warning <<
130 "ThreadPool destructor: There is(are) still " << qtyObjects << " object(s) of size "
131 << (1L << i) << " in the PoolAllocator.\n";
132 }
133 if ( warning.IsNotEmpty() )
134 {
135 warning <<
136 " Hint:\n"
137 " This indicates that Job-objects have not been deleted during the run.\n"
138 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
139 " This is a potential memory leak.\n"
140 " Known Job-types and their sizes are:\n";
141 DbgDumpKnownJobs(warning, " ");
142 ALIB_WARNING( "MGTHR", warning )
143 pool.DbgSuppressNonFreedObjectsWarning();
144 }
145 #endif
146}
147
148#if ALIB_DEBUG
149#if ALIB_DEBUG_CRITICAL_SECTIONS
150bool ThreadPool::DCSIsAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
151bool ThreadPool::DCSIsSharedAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
152#endif
153
154#if ALIB_STRINGS
155int ThreadPool::DbgDumpKnownJobs(NAString& target, const NString& linePrefix )
156{
157 int i= 0;
158 for ( auto job : DbgKnownJobs )
159 {
160 target << linePrefix << NField( ++i, 2) << ": "
161 << *job.TID << NTab(30,-1)
162 << NField( job.JobSize, 3) << " (PA "
163 << NField( PoolAllocator::GetAllocationSize(
164 PoolAllocator::GetAllocInformation(job.JobSize)), 3) << ") "
165 "Usage: " << NField(job.Usage, 5) << "\n";
166 }
167 return i;
168}
169#endif
170#endif // ALIB_DEBUG
171
172void ThreadPool::addThread()
173{
174 ALIB_MESSAGE( "MGTHR/STRGY", "Pool({}/{} -> {}/{}) adding one thread",
175 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers )
176
177 // first check if the pool was already used once and is now restarted
178 if ( lastThreadToJoin )
179 {
180 ALIB_ASSERT_ERROR( ctdWorkers == 0, "MGTHR",
181 "ThreadPool::AddThread: Found a last thread to join but there the number of workers is {}\n"
182 "instead of 0. This should never happen!", ctdWorkers )
183
184 lastThreadToJoin->Join();
185 delete lastThreadToJoin;
186 lastThreadToJoin= nullptr;
187 }
188
189#if ALIB_STRINGS
190 auto* newWorker= new PWorker( *this, String128("PWorker") << Dec(nextWorkerID++, 3, nullptr) );
191#else
192 auto* newWorker= new PWorker( *this );
193#endif
194 workers.InsertUnique( newWorker );
195 ++ctdWorkers;
196 newWorker->Start();
197}
198
199
200namespace {
201 // An internal job used to task the next worker to join a thread that stopped.
202 // Note: The last thread will add itself to #lastThreadToJoin, which will be joined
203 // with the method #Shutdown or when a new thread is added.
204 struct JobJoin : Job
205 {
206 PWorker* workerToJoin;
207 JobJoin() : Job( typeid(JobJoin) ), workerToJoin(nullptr) {}
208 };
209
210 // we only need one instance
211 JobJoin JOB_JOIN;
212
213 // An internal job used by #Shutdown. It is only emplaced if the queue is empty and
214 // all types are idle
215 struct JobStop : Job { JobStop() :Job(typeid(JobStop)) {} };
216
217 // we only need one instance
218 JobStop JOB_STOP;
219}
220
221ThreadPool::QueueEntry ThreadPool::pop(PWorker* caller)
222{
223 START:
224 Acquire(ALIB_CALLER_PRUNED);
225 ++ctdIdle;
226 WaitForNotification(ALIB_CALLER_PRUNED);
227 ALIB_ASSERT_ERROR(ctdOpenJobs != 0, "MGTHR", "Job pipe empty after wakeup" )
228
229 // check for JobJoin singleton
230 if ( queue.back().job == &JOB_JOIN )
231 {
232 queue.back().job->Cast<JobJoin>().workerToJoin->Join();
233 queue.pop_back();
234 --ctdOpenJobs;
235// ALIB_ASSERT(queue.IsNotEmpty())
236 }
237
238 // check for JobStop singleton
239 else if ( queue.back().job == &JOB_STOP )
240 {
241 queue.pop_back();
242 --ctdOpenJobs;
243 ALIB_ASSERT(queue.empty(), "MGTHR")
244 }
245
246 // check if we need to change the pool size
247 int targetSize= Strategy.GetSize( ctdWorkers,
248 ctdIdle--,
249 ctdOpenJobs,
250 timeOfLastSizeChange );
251
252
253 // leaving pool?
254 if ( targetSize < ctdWorkers )
255 {
256 ALIB_MESSAGE( "MGTHR/STRGY", "Pool({}/{} -> {}/{}) leaving pool ({}->{})" ,
257 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers,
258 ctdWorkers, targetSize )
259
260 // make sure the calling thread is joined. This is either done by the next
261 // worker, or by the (main-) thread that calls Shutdown, or if a new thread
262 // is added later
263 if (ctdWorkers > 1)
264 {
265 // put 'myself' in the JobJoin singleton and add me to the front of the pipe.
266 JOB_JOIN.workerToJoin= caller;
267 queue.push_back({&JOB_JOIN, false});
268 ctdOpenJobs++;
269 }
270 else
271 lastThreadToJoin= caller;
272
273 // remove myself from the worker list
274 workers.erase(workers.Find(caller));
275 --ctdWorkers;
276
277 // in any case, mark the caller as done already to avoid a warning if joining comes
278 // faster than the exit:
279 caller->state= Thread::State::Done;
280
281 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
282 return QueueEntry{nullptr, false};
283 }
284
285 // increasing pool?
286 else if ( targetSize > ctdWorkers )
287 {
288 addThread();
289 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
290 goto START;
291 }
292
293
294 // start working
295 auto entry= queue.back();
296 queue.pop_back();
297 ALIB_MESSAGE( "MGTHR/QUEUE", "Pool({}/{} -> {}/{}) Job({}) popped",
298 ctdOpenJobs, ctdStatJobsScheduled, ctdIdle, ctdWorkers, &entry.job->ID )
299
300 --ctdOpenJobs;
301
302 // Sync-job with optional deferred job deletion
303 if ( entry.job->Is<JobSyncer>() )
304 {
305 auto& job= entry.job->Cast<JobSyncer>();
306
307 if ( job.JobToDelete )
308 {
309 size_t size= job.JobToDelete->SizeOf();
310 job.JobToDelete->PrepareDeferredDeletion();
311 job.JobToDelete->~Job();
312 GetPoolAllocator().free( job.JobToDelete, size );
313 }
314
315 GetPoolAllocator().free( &job, sizeof(JobSyncer) );
316 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED); // wakeup others (all are idle)
317 goto START;
318 }
319
320 Release(ALIB_CALLER_PRUNED);
321 return entry;
322}
323
324bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
325 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
326{
327 ALIB_MESSAGE("MGTHR", "ThreadPool: Waiting for all jobs to be processed." )
328
329 Ticks waitStart= Ticks::Now();
330 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
331 while(true)
332 {
333 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() )
334 {
335 ALIB_MESSAGE("MGTHR", "ThreadPool: All are idle. Pool({}/{} -> {}/{})",
336 CountedOpenJobs() , StatsCountedScheduledJobs(),
337 CountedIdleWorkers(), CountedWorkers() )
338 return true;
339 }
340 #if ALIB_DEBUG
341 if( nextWarning.Age() > dbgWarnAfter)
342 {
343 ALIB_WARNING( "MGTHR",
344 "ThreadPool: Waiting for all workers to be come idle. Pool({}/{} -> {}/{})",
345 CountedOpenJobs() , StatsCountedScheduledJobs(),
346 CountedIdleWorkers(), CountedWorkers() )
347 nextWarning= Ticks::Now();
348 }
349 #endif
350
351 // check timeout
352 if (waitStart.Age() > timeout)
353 {
354 ALIB_WARNING("MGTHR", "ThreadPool: Timeout while waiting for idle" )
355 return false;
356 }
357
358 // sleep
359 Thread::SleepMicros( 50 );
360 }
361}
362
363void ThreadPool::Shutdown()
364{
365 ALIB_MESSAGE("MGTHR", "ThreadPool::Shutdown: Pool({}/{} -> {}/{}) ",
366 CountedOpenJobs() , StatsCountedScheduledJobs(),
367 CountedIdleWorkers(), CountedWorkers() )
368
369
370 ALIB_ASSERT_ERROR( ctdOpenJobs == 0, "MGTHR",
371 "ThreadPool::Shutdown called while {} jobs are open. "
372 "Call WaitForAllIdle() before shutdown.", ctdOpenJobs )
373
374 // Schedule a stop-job.
375 // We do this here to meet the wakeup condition without adding
376 // another term to that condition.
377 Acquire(ALIB_CALLER_PRUNED);
378 Strategy.WorkersMax= 0;
379 queue.push_back( {&JOB_STOP, false} );
380 ++ctdOpenJobs;
381 ReleaseAndNotify(ALIB_CALLER_PRUNED);
382
383
384 // wait for all workers to exit
385 ALIB_DBG(Ticks waitTime);
386 while( ctdWorkers > 0 )
387 {
388 Thread::SleepMicros( 50 );
389 ALIB_DBG( if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
390 ALIB_MESSAGE("MGTHR",
391 "ThreadPool::Shutdown. Waiting for workers to exit. Pool({}/{} -> {}/{})",
392 CountedOpenJobs() , StatsCountedScheduledJobs(),
393 CountedIdleWorkers(), CountedWorkers() )
394
395 ALIB_DBG( } )
396
397 }
398
399 // join the last thread
400 ALIB_ASSERT_ERROR(lastThreadToJoin != nullptr, "MGTHR",
401 "lastThreadToJoin is null. This must not happen (internal error)." )
402
403 lastThreadToJoin->Join();
404 delete lastThreadToJoin;
405 lastThreadToJoin= nullptr;
406
407 // Success
408 ALIB_MESSAGE("MGTHR", "ThreadPool::Shutdown completed. Pool({}/{} -> {}/{})",
409 CountedOpenJobs() , StatsCountedScheduledJobs(),
410 CountedIdleWorkers(), CountedWorkers() )
411}
412
413
414} // namespace [alib::threadmodel]
415
416
#define ALIB_MESSAGE(domain,...)
Definition alib.inl:1047
#define A_CHAR(STR)
#define ALIB_WARNING(domain,...)
Definition alib.inl:1046
#define ALIB_ASSERT_WARNING(cond, domain,...)
Definition alib.inl:1050
#define ALIB_ERROR(domain,...)
Definition alib.inl:1045
#define ALIB_DBG(...)
Definition alib.inl:836
#define ALIB_ASSERT_ERROR(cond, domain,...)
Definition alib.inl:1049
#define ALIB_CALLER_PRUNED
Definition alib.inl:1007
#define ALIB_DEBUG
Definition prepro.md:21
strings::TDec< character > Dec
Type alias in namespace alib.
Definition format.inl:545
threadmodel::Job Job
Type alias in namespace alib.
Definition jobs.inl:183
threads::TCondition< T > TCondition
Type alias in namespace alib.
LocalString< 16 > String16
Type alias name for TLocalString<character,16>.
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
LocalString< 128 > String128
Type alias name for TLocalString<character,128>.
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
threads::Thread Thread
Type alias in namespace alib.
Definition thread.inl:389
strings::TTab< nchar > NTab
Type alias in namespace alib.
Definition format.inl:519
strings::TField< nchar > NField
Type alias in namespace alib.
threadmodel::ThreadPool ThreadPool
Type alias in namespace alib.