ALib C++ Library
Library Version: 2510 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
dedicatedworker.cpp
1// #################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2025 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6// #################################################################################################
7#include "alib_precompile.hpp"
8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
10#endif
11#if ALIB_C20_MODULES
12 module;
13#endif
14// ====================================== Global Fragment ======================================
15#include "alib/alib.inl"
16#if ALIB_DEBUG
17# include <vector>
18# include <any>
19#endif
20#include <algorithm>
21// =========================================== Module ==========================================
22#if ALIB_C20_MODULES
23 module ALib.ThreadModel;
24#else
25# include "ALib.ThreadModel.H"
26#endif
27// ====================================== Implementation =======================================
28
30
31// #################################################################################################
32// DWManager
33// #################################################################################################
34
36: ma (ALIB_DBG("DWManager",) 16)
37, pool (ma)
38, workers (ma)
39{
40 #if ALIB_DEBUG
41 Lock::Dbg.Name= "DWManager";
42 #endif
43 #if ALIB_DEBUG_CRITICAL_SECTIONS
44 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
45 #endif
46}
47
48
51 #if ALIB_DEBUG
52 for( DedicatedWorker* it : workers )
53 ALIB_ASSERT_ERROR( it != &thread, "MGTHR", "Thread already added" )
54 #endif
55
56 workers.push_back( &thread );
57 thread.Start();
58}
59
60bool DWManager::Remove( DedicatedWorker& thread, Priority stopPriority ) {
62 auto it= std::find( workers.begin(), workers.end(), &thread );
63 if( it == workers.end() ) {
64 ALIB_WARNING( "MGTHR", "Thread \"{}\" to remove not found", thread.GetName())
65 return false;
66 }
67 workers.erase( it );
68 }
69
70 if( !thread.StopIsScheduled() )
71 thread.ScheduleStop( stopPriority );
72
73 #if ALIB_DEBUG
74 Ticks waitCheck= Ticks::Now();
75 int nextWarnSecond= 1;
76 #endif
77 while(thread.GetState() < Thread::State::Done) {
79 #if ALIB_DEBUG
80 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond )
81 {
82 ALIB_WARNING( "MGTHR",
83 "DWManager::Remove: Waiting on thread \"{}\" to stop. State::{}, Load: ",
84 thread.GetName(), thread.GetState(), thread.Load() )
85 nextWarnSecond++;
86 }
87 #endif
88 }
89
90 if ( thread.GetState() != Thread::State::Terminated )
91 thread.Join();
92
93 return true;
94}
95
96bool DWManager::WaitForAllIdle( Ticks::Duration timeout
97 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
98{
99 ALIB_MESSAGE( "MGTHR", "DWManager::StopAndJoinAll" )
100
101 Ticks waitStart= Ticks::Now();
102 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
103 while(true)
104 {
105 // check that all threads are stopped
106 int cntRunning= 0;
107 {ALIB_LOCK
108 for( DedicatedWorker* it : workers )
109 if( it->Load() > 0 )
110 cntRunning++;
111 }
112 if( cntRunning == 0 )
113 return true;
114
115 #if ALIB_DEBUG
116 if( nextWarning.Age() > dbgWarnAfter)
117 {
118 std::vector<std::any> args; args.reserve(32);
119 args.emplace_back( "Waiting on {} thread(s) to become idle.\n");
120 args.emplace_back( cntRunning );
121 int tNr= 0;
122 {ALIB_LOCK
123 for( DedicatedWorker* it : workers )
124 if ( it->Load() > 0 ) {
125 args.emplace_back( ++tNr );
126 args.emplace_back( ": {},\tState::{},\t Load: \n" );
127 args.emplace_back( it->GetName() );
128 args.emplace_back( it->state );
129 args.emplace_back( it->Load() );
130 }
131 }
132 assert::raise( ALIB_CALLER_PRUNED, 1, "MGTHR", args );
133 nextWarning= Ticks::Now();
134 }
135 #endif
136
137 // check timeout
138 if (waitStart.Age() > timeout)
139 return false;
140
141 // sleep
143 }
144}
145
146void DWManager::RemoveAll( Priority stopPriority )
147{
148 ALIB_MESSAGE( "MGTHR", "DWManager::StopAndJoinAll" )
149
150 ALIB_DBG( Ticks waitCheck= Ticks::Now();
151 int nextWarnSecond= 1; )
152
153 // send stop to those unstopped
154 for( DedicatedWorker* it : workers )
155 if( !it->StopIsScheduled() )
156 it->ScheduleStop( stopPriority );
157
158 ALIB_DBG( waitCheck= Ticks::Now(); nextWarnSecond= 1; )
159 int cntRunning;
160 while(true)
161 {
162 // check that all threads are stopped
163 cntRunning= 0;
164 for( DedicatedWorker* it : workers )
165 if( it->GetState() < Thread::State::Done )
166 cntRunning++;
167 if( cntRunning == 0 )
168 break;
169
171 #if ALIB_DEBUG
172 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond )
173 {
174 std::vector<std::any> args; args.reserve(32);
175 args.emplace_back( "DWManager Termination: Waiting on {} thread(s) to stop.\n");
176 args.emplace_back( cntRunning );
177 int tNr= 0;
178 for( DedicatedWorker* it : workers )
179 {
180 args.emplace_back( ++tNr );
181 args.emplace_back( ": {},\tState::{},\t Load: \n" );
182 args.emplace_back( it->GetName() );
183 args.emplace_back( it->state );
184 args.emplace_back( it->Load() );
185 }
186 assert::raise( ALIB_CALLER_PRUNED, 1, "MGTHR", args );
187 nextWarnSecond++;
188 }
189 #endif
190 }
191
192 // terminate all registered MThreads and remove them from our the list.
193 for( DedicatedWorker* it : workers )
194 it->Join();
195 workers.Clear();
196}
197
198// #################################################################################################
199// DedicatedWorker
200// #################################################################################################
202{
204
205 // search the first element with equal or higher priority
206 auto it= queue.begin();
207 while(it != queue.end())
208 {
209 if(jobInfo.priority <= it->priority )
210 break;
211 it++;
212 }
213
214 // insert before found
215 queue.Insert(it, jobInfo );
216
217 #if ALIB_DEBUG
218 std::vector<std::any> args; args.reserve(32);
219 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
220 args.emplace_back( queue.size() );
221 args.emplace_back( &jobInfo.job->ID );
222 args.emplace_back( jobInfo.priority );
223 args.emplace_back( jobInfo.keepJob );
224 assert::raise( ALIB_CALLER_PRUNED, 2, "MGTHR/QUEUE", args );
225 #endif
226
227 ++length;
229
231}
232
233std::pair<Job*, bool> DedicatedWorker::pop()
236 ALIB_ASSERT_ERROR(length != 0, "MGTHR", "Job pipe empty after wakeup" )
237 ALIB_MESSAGE( "MGTHR/QUEUE", "Queue--, size: ", length )
238
239 std::pair<Job*, bool> result= { queue.back().job, queue.back().keepJob };
240 ALIB_DBG( auto dbgPriority= queue.back().priority; )
241
242 queue.pop_back();
243 --length;
244
245 #if ALIB_DEBUG
246 std::vector<std::any> args; args.reserve(32);
247 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
248 args.emplace_back( length );
249 args.emplace_back( &result.first->ID );
250 args.emplace_back( dbgPriority );
251 args.emplace_back( result.second );
252 assert::raise( ALIB_CALLER_PRUNED, 2, "MGTHR/QUEUE", args );
253 #endif
254
255 return result;
256}
257
259{
260 ALIB_MESSAGE("MGTHR", "DedicatedWorker \"{}\" is running", GetName() )
261
262 while(!stopJobExecuted)
263 {
264 auto jobInfo = pop();
265
266 // Deferred job-deletion job
267 if (jobInfo.first->Is<JobDeleter>() )
268 {
269 auto& job= jobInfo.first->Cast<JobDeleter>();
271
272 ALIB_ASSERT(jobInfo.second == false, "MGTHR")
274 auto size= job.JobToDelete->SizeOf();
275 job.JobToDelete->~Job();
276 manager.GetPoolAllocator().free( job.JobToDelete, size );
277 manager.GetPoolAllocator().free( &job, sizeof(JobDeleter) );
278
279 goto CONTINUE;
280 }
281
282
283 // overloaded custom process?
284 if ( process(*jobInfo.first) )
285 goto CONTINUE;
286
287
288 // Stop!
289 if (jobInfo.first->Is<JobStop>() )
290 {
291 stopJobExecuted= true;
292 ALIB_ASSERT(jobInfo.second == false, "MGTHR")
294 manager.GetPoolAllocator().free( jobInfo.first, sizeof(JobStop) );
295 goto CONTINUE;
296 }
297
298 // Custom method, implemented with Job::Do()
299 if ( jobInfo.first->Do() )
300 {
301 // delete?
302 if ( !jobInfo.second )
304 auto size= jobInfo.first->SizeOf();
305 jobInfo.first->~Job();
306 manager.GetPoolAllocator().free( jobInfo.first, size );
307 }
308
309 goto CONTINUE;
310 }
311
312 // Not processed!
313 ALIB_ERROR("MGTHR",
314 "Job of type <{}> passed to DedicatedWorker, which was neither recognized by\n"
315 "the specialist nor has it a Job::Do() implementation!", &jobInfo.first->ID )
316
317 CONTINUE:
318 statLastJobExecution.Reset();
319 }
320
321 ALIB_ASSERT_WARNING( Load() == 0, "MGTHR",
322 "DedicatedWorker \"{}\" has jobs still queued when stopped!", GetName(), Load() )
323
324 ALIB_MESSAGE( "MGTHR", "DedicatedWorker \"{}\" is stopping (leaving method Run()).", GetName() )
325}
326
327} // namespace [alib::threadmodel]
328
329
MonoAllocator ma
Mono allocator. Used for commands and by DedicatedWorkers.
ALIB_DLL void RemoveAll(Priority stopPriority=Priority::Lowest)
ALIB_DLL DWManager()
Constructor.
ALIB_DLL void Add(DedicatedWorker &thread)
ALIB_DLL bool Remove(DedicatedWorker &thread, Priority stopPriority=Priority::Lowest)
PoolAllocator pool
Pool allocator. Used for command objects.
List< MonoAllocator, DedicatedWorker * > workers
The list of workers.
ALIB_DLL bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
virtual ALIB_DLL void Run() override
ALIB_DLL void pushAndRelease(QueueElement &&jobInfo)
DWManager & manager
needed as we inherit TCondition
int length
The current number of jobs in the queue.
List< HeapAllocator, QueueElement > queue
bool stopJobExecuted
Flag which is set when the stop-job was executed.
virtual const character * GetName() const
Definition thread.inl:241
DbgLockAsserter Dbg
The debug tool instance.
Definition lock.inl:65
@ Terminated
The thread is terminated.
Definition thread.inl:144
static void SleepMicros(int64_t microseconds)
Definition thread.inl:353
virtual ALIB_DLL void Start()
Definition thread.cpp:296
virtual ALIB_DLL void Join()
Definition thread.cpp:251
#define ALIB_MESSAGE(domain,...)
Definition alib.inl:1047
#define ALIB_ASSERT(cond, domain)
Definition alib.inl:1048
#define ALIB_WARNING(domain,...)
Definition alib.inl:1046
#define ALIB_ASSERT_WARNING(cond, domain,...)
Definition alib.inl:1050
#define ALIB_ERROR(domain,...)
Definition alib.inl:1045
#define ALIB_LOCK
Definition alib.inl:1319
#define ALIB_DBG(...)
Definition alib.inl:836
#define ALIB_ASSERT_ERROR(cond, domain,...)
Definition alib.inl:1049
#define ALIB_CALLER_PRUNED
Definition alib.inl:1007
#define ALIB_LOCK_WITH(lock)
Definition alib.inl:1322
void raise(const CallerInfo &ci, int type, std::string_view domain, const std::span< std::any > &args)
Definition assert.cpp:575
Priority
Possible priorities of jobs assigned to an DedicatedWorker.
Definition jobs.inl:171
time::Ticks Ticks
Type alias in namespace alib.
Definition ticks.inl:109
The job sent by method DeleteJobDeferred.
The stop job sent by method ScheduleStop.
virtual void PrepareDeferredDeletion()
Definition jobs.inl:86
void WaitForNotification(ALIB_DBG_TAKE_CI)
void Acquire(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)