ALib C++ Framework
by
Library Version: 2605 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
threadpool.cpp
1using namespace std::literals::chrono_literals;
2
3#if ALIB_STRINGS
6 alib::lang::HeapAllocator>::operator()(
9 target << "TPool(Jobs: " << tp.CountedOpenJobs() << '/' << tp.StatsCountedScheduledJobs()
10 << " Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
11 << "/[" << tp.Strategy.WorkersMin
12 << " <= " << tp.CountedWorkers()
13 << " <= " << tp.Strategy.WorkersMax
14 << "]";
15}
16#endif
17
18
19namespace alib::threadmodel {
20
21void PoolWorker::Run() {
22 ALIB_MESSAGE("TMOD/WORKER", "PoolWorker \"{}\" is running", GetName() )
23 for (;;) {
24 // await next job. Break if null.
25 auto queueEntry= threadPool.pop(this);
26 if ( queueEntry.job == nullptr )
27 break;
28
29 // Prepare job (a no-op with default pool worker) and call Do()
30 PrepareJob(queueEntry.job);
31 if ( queueEntry.job->Do() )
32 goto CONTINUE;
33
34 // not processed!
35 ALIB_ERROR( "TMOD",
36 "Job of type <{}> passed to thread pool has no Job::Do() implementation!",
37 &queueEntry.job->ID )
38
39 CONTINUE:
40 // delete job?
41 if ( !queueEntry.keep )
42 {ALIB_LOCK_WITH(threadPool)
43 auto size= queueEntry.job->SizeOf();
44 queueEntry.job->~Job();
45 threadPool.GetPoolAllocator().free( queueEntry.job, size );
46 } }
47
48 #if ALIB_DEBUG
49 ALIB_MESSAGE( "TMOD/WORKER", "PoolWorker \"{}\" is stopping (leaving method Run())",
50 GetName() )
51 #endif
52}
53
54
56: TCondition (ALIB_DBG(A_CHAR("ThreadPool")))
57, ma (ALIB_DBG( "ThreadPool",) 16)
58, pool (ma)
59, workers (ma)
60#if ALIB_DEBUG
61, DbgKnownJobs (ma)
62#endif
63, queue (pool)
64{
65 #if ALIB_DEBUG
66 Dbg.Name= A_CHAR("DWManager");
67 #endif
68 #if ALIB_DEBUG_CRITICAL_SECTIONS
69 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
70 #endif
71
72
73 #if ALIB_DEBUG
74 assert::RegisterPrintable(typeid(ThreadPool*), [](const std::any& any, std::string& s) {
75 auto& tp= *std::any_cast<ThreadPool*>(any);
76 #if ALIB_STRINGS
77 String256 serialize(tp);
78 ALIB_STRINGS_TO_NARROW(serialize, ns, 256)
79 s+= ns;
80 #else
81 std::ostringstream oss;
82 oss << "TPool(Jobs: " << tp.CountedOpenJobs() << '/' << tp.StatsCountedScheduledJobs()
83 << " Wrks: " << tp.CountedWorkers() - tp.CountedIdleWorkers()
84 << "/[" << tp.Strategy.WorkersMin
85 << " <= " << tp.CountedWorkers()
86 << " <= " << tp.Strategy.WorkersMax
87 << "]";
88 s+= oss.str();
89 #endif
90 });
91 #endif
92
93}
94
95ThreadPool::~ThreadPool() {
96 ALIB_ASSERT_ERROR( IsIdle(), "TMOD",
97 "{}: Destruction while not idle. Please call WaitForAllIdle().\n"
98 "There are still {} workers running. Open jobs: ", this, ctdWorkers - ctdIdle, ctdOpenJobs )
99
100 ALIB_ASSERT_WARNING( ctdWorkers == 0, "TMOD",
101 "{}: There are still {} threads running (whil in destructor).\n"
102 "While Shutdown is called now, it is recommended to explicitly "
103 "shutdown the pool before destruction.", this, ctdWorkers )
104
105 if (ctdWorkers > 0)
106 Shutdown();
107
108 // check if there are still objects in the pool allocator
109 #if ALIB_DEBUG_ALLOCATIONS
110 NString2K warning;
111 for (int i = 2; i < 32; ++i) {
112 auto qtyObjects= pool.DbgCountedOpenAllocations(1L << i);
113 if ( qtyObjects > 0)
114 warning <<
115 "ThreadPool destructor: There is(are) still " << qtyObjects << " object(s) of size "
116 << (1L << i) << " in the PoolAllocator.\n";
117 }
118 if ( warning.IsNotEmpty() ) {
119 warning <<
120 " Hint:\n"
121 " This indicates that Job-objects have not been deleted during the run.\n"
122 " Alternatively, certain jobs used the pool allocator without freeing their data\n"
123 " This is a potential memory leak.\n"
124 " Known Job-types and their sizes are:\n";
125 DbgDumpKnownJobs(warning, " ");
126 ALIB_WARNING( "TMOD", warning )
127 pool.DbgSuppressNonFreedObjectsWarning();
128 }
129 #endif
130}
131
132#if ALIB_DEBUG
133#if ALIB_DEBUG_CRITICAL_SECTIONS
134bool ThreadPool::DCSIsAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
135bool ThreadPool::DCSIsSharedAcquired() const { return Dbg.IsOwnedByCurrentThread(); }
136#endif
137
138#if ALIB_STRINGS
139int ThreadPool::DbgDumpKnownJobs(NAString& target, const NString& linePrefix ) {
140 int i= 0;
141 for ( auto jobIt : DbgKnownJobs ) {
142 target << linePrefix << NField( ++i, 2) << ": "
143 << *jobIt.TID << NTab(30,-1)
144 << NField( jobIt.JobSize, 3) << " (PA "
145 << NField( PoolAllocator::GetAllocationSize(
146 PoolAllocator::GetAllocInformation(jobIt.JobSize)), 3) << ") "
147 "Usage: " << NField(jobIt.Usage, 5) << "\n";
148 }
149 return i;
150}
151#endif
152#endif // ALIB_DEBUG
153
154PoolWorker* ThreadPool::CreateWorker() {
155 #if ALIB_STRINGS
156 return pool().New<PoolWorker>( *this,
157 String128("PoolWorker") << Dec(nextWorkerID++, 3, nullptr) );
158 #else
159 return pool().New<PoolWorker>( *this );
160 #endif
161}
162void ThreadPool::DisposeWorker( PoolWorker* poolWorker ) { pool().Delete(poolWorker); }
163
164void ThreadPool::addThread() {
165 ALIB_MESSAGE( "TMOD/STRGY", "{}: Adding one thread", this )
166
167 // first check if the pool was already used once and is now restarted
168 if ( lastThreadToJoin ) {
169 ALIB_ASSERT_ERROR( ctdWorkers == 0, "TMOD",
170 "{}: Found a last thread to join but there the number of workers is {}\n"
171 "instead of 0. This should never happen!", this, ctdWorkers )
172
173 lastThreadToJoin->Join();
174 delete lastThreadToJoin;
175 lastThreadToJoin= nullptr;
176 }
177
178 auto* newWorker= CreateWorker();
179
180 workers.InsertUnique( newWorker );
181 ++ctdWorkers;
182 newWorker->Start();
183}
184
185
186namespace {
187
188// An internal job used to task the next worker to join a thread that stopped.
189// Note: The last thread will add itself to lastThreadToJoin, which will be joined
190// with the method #".Shutdown" or when a new thread is added.
191struct JobJoin : Job {
192 PoolWorker* workerToJoin;
193 JobJoin() :
194 Job( typeid(JobJoin) )
195 , workerToJoin(nullptr) {}
196};
197
198// we only need one instance
199JobJoin JOB_JOIN;
200
201// An internal job used by #".Shutdown". It is only emplaced if the queue is empty and
202// all types are idle
203struct JobStop : Job {
204 JobStop() :Job(typeid(JobStop)) {}
205};
206
207// we only need one instance
208JobStop JOB_STOP;
209
210}
211
212ThreadPool::QueueEntry ThreadPool::pop(PoolWorker* caller) {
213 START:
214 Acquire(ALIB_CALLER_PRUNED);
215 ++ctdIdle;
216 WaitForNotification(ALIB_CALLER_PRUNED);
217 --ctdIdle;
218 ALIB_ASSERT_ERROR(ctdOpenJobs != 0, "TMOD", "{}: Job pipe empty after wakeup.", this )
219
220 // check for JobJoin singleton
221 if ( queue.back().job == &JOB_JOIN ) {
222 auto& job= queue.back().job->Cast<JobJoin>();
223 job.workerToJoin->Join();
224
225 DisposeWorker(job.workerToJoin);
226
227 queue.pop_back();
228 --ctdOpenJobs;
229 }
230
231 // check for JobStop singleton
232 else if ( queue.back().job == &JOB_STOP ) {
233 queue.pop_back();
234 --ctdOpenJobs;
235 ALIB_ASSERT(queue.empty(), "TMOD")
236 }
237
238 // check if we need to change the pool size
239 int targetSize= Strategy.GetSize( ctdWorkers, ctdIdle, ctdOpenJobs, timeOfLastSizeChange );
240
241
242 // leaving pool?
243 if ( targetSize < ctdWorkers ) {
244 ALIB_MESSAGE( "TMOD/STRGY", "{}: Leaving pool ({}->{})",
245 this, ctdWorkers, targetSize )
246
247 // make sure the calling thread is joined. This is either done by the next
248 // worker, or by the (main-) thread that calls Shutdown, or if a new thread
249 // is added later
250 if (ctdWorkers > 1) {
251 // put 'myself' in the JobJoin singleton and add me to the front of the pipe.
252 JOB_JOIN.workerToJoin= caller;
253 queue.push_back({&JOB_JOIN, false});
254 ctdOpenJobs++;
255 }
256 else
257 lastThreadToJoin= caller;
258
259 // remove the caller from the worker list
260 workers.erase(workers.Find(caller));
261 --ctdWorkers;
262
263 // in any case, mark the caller as done already to avoid a warning if joining comes
264 // faster than the exit:
265 caller->state= Thread::State::Done;
266
267 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
268 return QueueEntry{nullptr, false};
269 }
270
271 // increasing pool?
272 if ( targetSize > ctdWorkers ) {
273 addThread();
274 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED);
275 goto START;
276 }
277
278
279 // start working
280 auto entry= queue.back();
281 queue.pop_back();
282 ALIB_MESSAGE( "TMOD/QUEUE", "{}: Job({}) popped", this, &entry.job->ID )
283
284 --ctdOpenJobs;
285
286 // Sync-job with optional deferred job deletion
287 if ( entry.job->Is<JobSyncer>() ) {
288 auto& job= entry.job->Cast<JobSyncer>();
289
290 if ( job.JobToDelete ) {
291 size_t size= job.JobToDelete->SizeOf();
292 job.JobToDelete->PrepareDeferredDeletion();
293 job.JobToDelete->~Job();
294 GetPoolAllocator().free( job.JobToDelete, size );
295 }
296
297 GetPoolAllocator().free( &job, sizeof(JobSyncer) );
298 ALIB_ASSERT(ctdIdle + 1 == ctdWorkers, "TMOD")
299 ReleaseAndNotifyAll(ALIB_CALLER_PRUNED); // wakeup others (all are idle)
300 goto START;
301 }
302
303 Release(ALIB_CALLER_PRUNED);
304 return entry;
305}
306
307bool ThreadPool::WaitForAllIdle( Ticks::Duration timeout
308 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
309 ALIB_MESSAGE("TMOD", "{}: Waiting for all jobs to be processed.", this )
310
311 Ticks waitStart= Ticks::Now();
312 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
313 while(true) {
314 if ( CountedOpenJobs() == 0 && CountedIdleWorkers() == CountedWorkers() ) {
315 ALIB_MESSAGE("TMOD", "{}: All workers are idle.", this )
316 return true;
317 }
318 #if ALIB_DEBUG
319 if( nextWarning.Age() > dbgWarnAfter) {
320 ALIB_MESSAGE( "TMOD",
321 "{}: Waiting for all workers to be come idle.", this )
322 if ( CountedOpenJobs() ) {
323 for (auto& job : queue )
324 ALIB_MESSAGE( "TMOD", " Job: {}", &job.job->ID)
325 }
326 nextWarning= Ticks::Now();
327 }
328 #endif
329
330 // check timeout
331 if (waitStart.Age() > timeout) {
332 ALIB_WARNING("TMOD", "{} timeout while waiting for idle", this )
333 return false;
334 }
335
336 // sleep
337 Thread::Sleep( IdleWaitTime );
338} }
339
340void ThreadPool::Shutdown() {
341 ALIB_MESSAGE("TMOD", "{}: Shutdown", this)
342
343 ALIB_ASSERT_ERROR( ctdOpenJobs == 0, "TMOD",
344 "{}: Shutdown called while {} jobs are open. "
345 "Call WaitForAllIdle() before shutdown.", this, ctdOpenJobs )
346
347 if (CountedWorkers() == 0) {
348 ALIB_MESSAGE("TMOD", "{}: Shutdown, no workers alive.", this)
349 return;
350 }
351 // Schedule a stop-job.
352 // We do this here to meet the wakeup condition without adding
353 // another term to that condition.
354 Acquire(ALIB_CALLER_PRUNED);
355 Strategy.WorkersMax= 0;
356 queue.push_back( {&JOB_STOP, false} );
357 ++ctdOpenJobs;
358 ReleaseAndNotify(ALIB_CALLER_PRUNED);
359
360
361 // wait for all workers to exit
362 ALIB_DBG(Ticks waitTime);
363 while( CountedWorkers() > 0 ) {
364 Thread::SleepMicros( 50 );
365 ALIB_DBG( if (waitTime.Age().InAbsoluteSeconds() == 1) { waitTime.Reset(); )
366 ALIB_MESSAGE("TMOD", "{}: Waiting for workers to exit.", this)
367
368 ALIB_DBG( } )
369
370 }
371
372 // join the last thread
373 ALIB_ASSERT_ERROR(lastThreadToJoin != nullptr, "TMOD",
374 "{}: The 'lastThreadToJoin' is null. This must not happen (internal error)."
375 , this )
376
377 lastThreadToJoin->Join();
378 DisposeWorker( lastThreadToJoin );
379 lastThreadToJoin= nullptr;
380
381 // Success
382 ALIB_MESSAGE("TMOD", "{}: Shutdown completed.) ", this)
383}
384
385
386} // namespace [alib::threadmodel]
#define ALIB_MESSAGE(domain,...)
#define A_CHAR(STR)
#define ALIB_ASSERT(cond, domain)
#define ALIB_WARNING(domain,...)
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_ERROR(domain,...)
#define ALIB_DEBUG
#define ALIB_DBG(...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
virtual void Join()
Definition thread.cpp:183
virtual const character * GetName() const
Definition thread.hpp:237
threads::TCondition< T > TCondition
Type alias in namespace #"%alib".
strings::TField< nchar > NField
Type alias in namespace #"%alib".
threadmodel::Job Job
Type alias in namespace #"%alib".
Definition jobs.hpp:178
strings::TTab< nchar > NTab
Type alias in namespace #"%alib".
Definition format.hpp:514
strings::TDec< character > Dec
Type alias in namespace #"%alib".
Definition format.hpp:540
void Shutdown(ShutdownPhases targetPhase, camp::Camp *targetCamp)
NLocalString< 2048 > NString2K
Type alias name for #"TLocalString;TLocalString<nchar,2048>".
LocalString< 128 > String128
Type alias name for #"TLocalString;TLocalString<character,128>".
LocalString< 256 > String256
Type alias name for #"TLocalString;TLocalString<character,256>".
characters::character character
Type alias in namespace #"%alib".
threadmodel::PoolWorker PoolWorker
Type alias in namespace #"%alib".
#define ALIB_STRINGS_TO_NARROW( src, dest, bufSize)
virtual void PrepareJob(Job *job)
ThreadPool & threadPool
The pool that this instance belongs to.