ALib C++ Library
Library Version: 2412 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
threadpool.cpp
1// #################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2024 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6// #################################################################################################
8
12#include "alib/alox.hpp"
13
14
15using namespace std::literals::chrono_literals;
16
17
18namespace alib::threadmodel {
19
20struct PWorker : protected Thread
21{
22 friend class ThreadPool;
23 ThreadPool& tp;
24 PWorker(ThreadPool& ptp, const String& threadName) : Thread(threadName ), tp(ptp) {}
25
26 void Run() override
27 {
28 ALIB_MESSAGE("MGTHR", NString256( "PWorker \"") << GetName() << "\" is running" )
29 for (;;)
30 {
31 // await next job. Break if null.
32 auto queueEntry= tp.pop(this);
33 if ( queueEntry.job == nullptr )
34 break;
35
36
37 // Custom method, implemented with Job::Do()
38 if ( queueEntry.job->Do() )
39 goto CONTINUE;
40
41 // not processed!
42 ALIB_ERROR( "MGTHR", NString512()
43 << "Job of type <" << queueEntry.job->ID
44 << "> passed to thread pool has no Job::Do() implementation!" )
45
46 CONTINUE:
47 // delete job?
48 if ( !queueEntry.keep )
49 {ALIB_LOCK_WITH(tp)
50 auto size= queueEntry.job->SizeOf();
51 queueEntry.job->~Job();
52 tp.GetPoolAllocator().free( queueEntry.job, size );
53 }
54 }
55
56 #if ALIB_DEBUG
57 NString4K msg( "PWorker \""); msg << GetName() << "\" is stopping (leaving method Run()).";
58 ALIB_MESSAGE( "MGTHR", msg )
59 #endif
60 }
61}; // class PWorker
62
63
65: TCondition (ALIB_DBG(A_CHAR("ThreadPool")))
66, ma (ALIB_DBG( "ThreadPool",) 16)
67, pool (ma)
68, workers (ma)
69#if ALIB_DEBUG
70, DbgKnownJobs (ma)
71#endif
72, queue (pool)
73{
74 #if ALIB_DEBUG
75 Dbg.Name= A_CHAR("DWManager");
76 #endif
77 #if ALIB_DEBUG_CRITICAL_SECTIONS
78 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
79 #endif
80}
81
82ThreadPool::~ThreadPool()
83{
84 ALIB_ASSERT_ERROR( IsIdle(), "MGTHR", NString256() <<
85 "ThreadPool destruction while not idle. Please call WaitForAllIdle().\n"
86 "There are still " << (ctdWorkers-ctdIdle)<< " workers running. "
87 "Open jobs: " << ctdOpenJobs )
88
89 ALIB_ASSERT_WARNING( ctdWorkers == 0, "MGTHR", NString256() <<
90 "ThreadPool destructor: There are still " << ctdWorkers<< " threads running.\n"
91 "While ThreadPool::Shudown is called now, it is recommended to explicitly "
92 "shutdown the pool before destruction." )
93
94 if (ctdWorkers > 0)
95 Shutdown();
96
97 // check if there are still objects in the pool allocator
98 #if ALIB_DEBUG_ALLOCATIONS
99 NString2K warning;
100 for (int i = 2; i < 32; ++i)
101 {
102 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
103 if ( qtyObjects > 0)
104 warning <<
105 "ThreadPool destructor: There is(are) still " << qtyObjects << " object(s) of size "
106 << (1L << i) << " in the PoolAllocator.\n";
107 }
108 if ( warning.IsNotEmpty() )
109 {
110 warning <<
111 " Hint:\n"
112 " This indicates that Job-objects have not been deleted during run.\n"
113 " Alternatively, certain jobs used the pool allocator without freeing the their data\n"
114 " This is a potential memory leak.\n"
115 " Known Job-types and their sizes are:\n";
116 DbgDumpKnownJobs(warning, " ");
117 ALIB_WARNING( "MGTHR", warning )
118 pool.DbgSuppressNonFreedObjectsWarning();
119 }
120 #endif
121}
122
123#if ALIB_DEBUG
124#if ALIB_DEBUG_CRITICAL_SECTIONS
125bool ThreadPool::DCSIsAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
126bool ThreadPool::DCSIsSharedAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
127#endif
128
129int ThreadPool::DbgDumpKnownJobs(NAString& target, const NString& linePrefix )
130{
131 int i= 0;
132 for ( auto job : DbgKnownJobs )
133 {
134 target << linePrefix << NFormat::Field( ++i, 2) << ": "
135 << *job.TID << NFormat::Tab(30,-1)
136 << NFormat::Field( job.JobSize, 3) << " (PA "
137 << NFormat::Field( PoolAllocator::GetAllocationSize(
138 PoolAllocator::GetAllocInformation(job.JobSize)), 3) << ") "
139 "Usage: " << NFormat::Field(job.Usage, 5) << "\n";
140 }
141 return i;
142}
143#endif // ALIB_DEBUG
144
145void ThreadPool::addThread()
146{
147 ALIB_MESSAGE( "MGTHR/STRGY", NString2K() <<
148 "Pool(" << ctdOpenJobs << "/" << ctdStatJobsScheduled <<
149 " -> " << ctdIdle << "/" << ctdWorkers << ") "
150 " adding one thread" )
151
152 // first check if the pool was already used once and is now restarted
153 if ( lastThreadToJoin )
154 {
155 ALIB_ASSERT_ERROR( ctdWorkers == 0, "MGTHR", NString2K() <<
156 "ThreadPool::AddThread: Found a last thread to join but there the number of workers is "
157 << ctdWorkers << "\ninstead of 0. This should never happen" )
158
159 lastThreadToJoin->Join();
160 delete lastThreadToJoin;
161 lastThreadToJoin= nullptr;
162 }
163
164 auto* newWorker= new PWorker( *this, String128("PWorker") << Format(nextWorkerID++, 3, nullptr) );
165 workers.InsertUnique( newWorker );
166 ++ctdWorkers;
167 newWorker->Start();
168}
169
170
171namespace {
172 // An internal job used to task the next worker to join a thread that stopped.
173 // Note: The last thread will add itself to #lastThreadToJoin, which will be joined
174 // with the method #Shudown or when a new thread is added.
175 struct JobJoin : Job
176 {
177 PWorker* workerToJoin;
178 JobJoin() : Job( typeid(JobJoin) ), workerToJoin(nullptr) {}
179 };
180
181 // we only need one instance
182 JobJoin JOB_JOIN;
183
184 // An internal job used by #Shudown. It is only emplaced if the queue is empty and
185 // all types are idle
186 struct JobStop : Job { JobStop() :Job(typeid(JobStop)) {} };
187
188 // we only need one instance
189 JobStop JOB_STOP;
190}
191
192ThreadPool::QueueEntry ThreadPool::pop(PWorker* caller)
193{
194 START:
195 Acquire(ALIB_CALLER_PRUNED);
196 ++ctdIdle;
197 WaitForNotification(ALIB_CALLER_PRUNED);
198 ALIB_ASSERT_ERROR(ctdOpenJobs != 0, "MGTHR", "Job pipe empty after wakeup" )
199
200 // check for JobJoin singleton
201 if ( queue.Back().job == &JOB_JOIN )
202 {
203 queue.Back().job->Cast<JobJoin>().workerToJoin->Join();
204 queue.PopBack();
205 --ctdOpenJobs;
206// ALIB_ASSERT(queue.IsNotEmpty())
207 }
208
209 // check for JobStop singleton
210 else if ( queue.Back().job == &JOB_STOP )
211 {
212 queue.PopBack();
213 --ctdOpenJobs;
214 ALIB_ASSERT(queue.IsEmpty())
215 }
216
217 // check if we need to change the pool size
218 int targetSize= Strategy.GetSize( ctdWorkers,
219 ctdIdle--,
220 ctdOpenJobs,
221 timeOfLastSizeChange );
222
223
224 // leaving pool?
225 if ( targetSize < ctdWorkers )
226 {
227 ALIB_MESSAGE( "MGTHR/STRGY", NString2K() <<
228 "Pool(" << ctdOpenJobs << "/" << ctdStatJobsScheduled <<
229 " -> " << ctdIdle << "/" << ctdWorkers << ") "
230 " leaving pool (" << ctdWorkers << "->" << targetSize << ")" )
231
232
233 // make sure the calling thread is joined. This is either done by the next
234 // worker, or by the (main-) thread that calls Shutdown, or if a new thread
235 // is added later
236 if (ctdWorkers > 1)
237 {
238 // put 'myself' in the JobJoin singleton and add me to the front of the pipe.
239 JOB_JOIN.workerToJoin= caller;
240 queue.PushBack({&JOB_JOIN, false});
241 ctdOpenJobs++;
242 }
243 else
244 lastThreadToJoin= caller;
245
246 // remove myself from the worker list
247 workers.Erase(workers.Find(caller));
248 --ctdWorkers;
249
250 // in any case, mark the caller as done already to avoid a warning if joining comes
251 // faster than the exit:
252 caller->state= Thread::State::Done;
253
254 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
255 return QueueEntry{nullptr, false};
256 }
257
258 // increasing pool?
259 else if ( targetSize > ctdWorkers )
260 {
261 addThread();
262 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
263 goto START;
264 }
265
266
267 // start working
268 auto entry= queue.Back();
269 queue.PopBack();
270 ALIB_MESSAGE( "MGTHR/QUEUE", NString2K() <<
271 "Pool(" << ctdOpenJobs << "/" << ctdStatJobsScheduled <<
272 " -> " << ctdIdle << "/" << ctdWorkers << ") "
273 "Job(" << entry.job->ID << ") popped" )
274
275 --ctdOpenJobs;
276
277 // Sync-job with optional deferred job deletion
278 if ( entry.job->Is<JobSyncer>() )
279 {
280 auto& job= entry.job->Cast<JobSyncer>();
281
282 if ( job.JobToDelete )
283 {
284 size_t size= job.JobToDelete->SizeOf();
285 job.JobToDelete->PrepareDeferredDeletion();
286 job.JobToDelete->~Job();
287 GetPoolAllocator().free( job.JobToDelete, size );
288 }
289
290 GetPoolAllocator().free( &job, sizeof(JobSyncer) );
291 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED); // wakeup others (all are idle)
292 goto START;
293 }
294
295 Release(ALIB_CALLER_PRUNED);
296 return entry;
297}
298
299bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
300 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
301{
302 ALIB_MESSAGE("MGTHR", "ThreadPool: Waiting for all jobs to be processed." )
303
304 Ticks waitStart= Ticks::Now();
305 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
306 while(true)
307 {
308 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() )
309 {
310 ALIB_MESSAGE("MGTHR", NString256() <<
311 "ThreadPool: All are idle. Pool("
312 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
313 << CountedIdleWorkers() << "/"
314 << CountedWorkers() << ")" )
315 return true;
316 }
317 #if ALIB_DEBUG
318 if( nextWarning.Age() > dbgWarnAfter)
319 {
320 ALIB_WARNING( "MGTHR", NString256() <<
321 "Waiting for all workers to be come idle. Pool("
322 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
323 << CountedIdleWorkers() << "/"
324 << CountedWorkers() << ")" )
325 nextWarning= Ticks::Now();
326 }
327 #endif
328
329 // check timeout
330 if (waitStart.Age() > timeout)
331 {
332 ALIB_WARNING("MGTHR", "ThreadPool: Timeout while waiting for idle" )
333 return false;
334 }
335
336 // sleep
337 Thread::SleepMicros( 50 );
338 }
339}
340
341void ThreadPool::Shutdown()
342{
343 ALIB_MESSAGE("MGTHR", NString256() <<
344 "ThreadPool::Shutdown: Pool("
345 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
346 << CountedIdleWorkers() << "/"
347 << CountedWorkers() << ")" )
348
349 ALIB_ASSERT_ERROR( ctdOpenJobs == 0, "MGTHR", NString256() <<
350 "ThreadPool::Shutdown called while " << ctdOpenJobs << " jobs are open. "
351 " Call WaitForAllIdle() before shutdown." )
352
353 // Schedule a stop-job.
354 // We do this here to meet the wakeup condition without adding
355 // another term to that condition.
356 Acquire(ALIB_CALLER_PRUNED);
357 Strategy.WorkersMax= 0;
358 queue.PushBack( {&JOB_STOP, false} );
359 ++ctdOpenJobs;
360 ReleaseAndNotify(ALIB_CALLER_PRUNED);
361
362
363 // wait for all workers to exit
364 ALIB_DBG(Ticks waitTime);
365 while( ctdWorkers > 0 )
366 {
367 Thread::SleepMicros( 50 );
368 ALIB_DBG( if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
369 ALIB_MESSAGE("MGTHR", NString256() <<
370 "ThreadPool::Shutdown. Waiting for workers to exit. Pool("
371 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
372 << CountedIdleWorkers() << "/"
373 << CountedWorkers() << ")" )
374 ALIB_DBG( } )
375
376 }
377
378 // join the last thread
379 ALIB_ASSERT_ERROR(lastThreadToJoin != nullptr, "MGTHR",
380 "ThreadPool::Shutdown: lastThreadToJoin is null. This must not happen (internal error)." )
381
382 lastThreadToJoin->Join();
383 delete lastThreadToJoin;
384 lastThreadToJoin= nullptr;
385
386 // Success
387 ALIB_MESSAGE("MGTHR", NString256() <<
388 "ThreadPool::Shutdown completed. Pool("
389 << CountedOpenJobs() << "/" << StatsCountedScheduledJobs() << "->"
390 << CountedIdleWorkers() << "/"
391 << CountedWorkers() << ")" )
392}
393
394
395} // namespace [alib::threadmodel]
396
397
#define ALIB_WARNING(...)
Definition alib.hpp:1268
#define A_CHAR(STR)
#define ALIB_MESSAGE(...)
Definition alib.hpp:1269
#define ALIB_ERROR(...)
Definition alib.hpp:1267
#define ALIB_ASSERT_ERROR(cond,...)
Definition alib.hpp:1271
#define ALIB_ASSERT_WARNING(cond,...)
Definition alib.hpp:1272
#define ALIB_DBG(...)
Definition alib.hpp:390
#define ALIB_CALLER_PRUNED
Definition alib.hpp:1170
#define ALIB_DEBUG
Definition prepro.md:21
threadmodel::ThreadPool ThreadPool
Type alias in namespace alib.
threads::TCondition< T > TCondition
Type alias in namespace alib.
threads::Thread Thread
Type alias in namespace alib.
Definition thread.hpp:379
NLocalString< 2048 > NString2K
Type alias name for TLocalString<nchar,2048>.
NLocalString< 256 > NString256
Type alias name for TLocalString<nchar,256>.
threadmodel::Job Job
Type alias in namespace alib.
Definition jobs.hpp:228
NLocalString< 512 > NString512
Type alias name for TLocalString<nchar,512>.
NLocalString< 4096 > NString4K
Type alias name for TLocalString<nchar,8192>.