ALib C++ Library
Library Version: 2511 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
dedicatedworker.cpp
1//##################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2025 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6//##################################################################################################
7#include "alib_precompile.hpp"
8#if !defined(ALIB_C20_MODULES) || ((ALIB_C20_MODULES != 0) && (ALIB_C20_MODULES != 1))
9# error "Symbol ALIB_C20_MODULES has to be given to the compiler as either 0 or 1"
10#endif
11#if ALIB_C20_MODULES
12 module;
13#endif
14//========================================= Global Fragment ========================================
15#include "alib/alib.inl"
16#if ALIB_DEBUG
17# include <vector>
18# include <any>
19#endif
20#include <algorithm>
21//============================================== Module ============================================
22#if ALIB_C20_MODULES
23 module ALib.ThreadModel;
24#else
25# include "ALib.ThreadModel.H"
26#endif
27//========================================== Implementation ========================================
28
30
31//##################################################################################################
32// DWManager
33//##################################################################################################
34
36: ma (ALIB_DBG("DWManager",) 16)
37, pool (ma)
38, workers (ma) {
39 #if ALIB_DEBUG
40 Lock::Dbg.Name= "DWManager";
41 #endif
42 #if ALIB_DEBUG_CRITICAL_SECTIONS
43 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
44 #endif
45}
46
47
49 #if ALIB_DEBUG
50 for( DedicatedWorker* it : workers )
51 ALIB_ASSERT_ERROR( it != &thread, "TMOD", "Thread already added" )
52 #endif
53
54 workers.push_back( &thread );
55 thread.Start();
56}
57
58bool DWManager::Remove( DedicatedWorker& thread, Priority stopPriority ) {
60 auto it= std::find( workers.begin(), workers.end(), &thread );
61 if( it == workers.end() ) {
62 ALIB_WARNING( "TMOD", "Thread \"{}\" to remove not found", thread.GetName())
63 return false;
64 }
65 workers.erase( it );
66 }
67
68 if( !thread.StopIsScheduled() )
69 thread.ScheduleStop( stopPriority );
70
71 #if ALIB_DEBUG
72 Ticks waitCheck= Ticks::Now();
73 int nextWarnSecond= 1;
74 #endif
75 while(thread.GetState() < Thread::State::Done) {
77 #if ALIB_DEBUG
78 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond ) {
79 ALIB_WARNING( "TMOD",
80 "DWManager::Remove: Waiting on thread \"{}\" to stop. State::{}, Load: ",
81 thread.GetName(), thread.GetState(), thread.Load() )
82 nextWarnSecond++;
83 }
84 #endif
85 }
86
87 if ( thread.GetState() != Thread::State::Terminated )
88 thread.Join();
89
90 return true;
91}
92
93bool DWManager::WaitForAllIdle( Ticks::Duration timeout
94 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
95 ALIB_MESSAGE( "TMOD", "DWManager::StopAndJoinAll" )
96
97 Ticks waitStart= Ticks::Now();
98 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
99 while(true) {
100 // check that all threads are stopped
101 int cntRunning= 0;
102 {ALIB_LOCK
103 for( DedicatedWorker* it : workers )
104 if( it->Load() > 0 )
105 cntRunning++;
106 }
107 if( cntRunning == 0 )
108 return true;
109
110 #if ALIB_DEBUG
111 if( nextWarning.Age() > dbgWarnAfter) {
112 std::vector<std::any> args; args.reserve(32);
113 args.emplace_back( "Waiting on {} thread(s) to become idle.\n");
114 args.emplace_back( cntRunning );
115 int tNr= 0;
116 {ALIB_LOCK
117 for( DedicatedWorker* it : workers )
118 if ( it->Load() > 0 ) {
119 args.emplace_back( ++tNr );
120 args.emplace_back( ": {},\tState::{},\t Load: \n" );
121 args.emplace_back( it->GetName() );
122 args.emplace_back( it->state );
123 args.emplace_back( it->Load() );
124 } }
125 assert::raise( ALIB_CALLER_PRUNED, 1, "TMOD", args );
126 nextWarning= Ticks::Now();
127 }
128 #endif
129
130 // check timeout
131 if (waitStart.Age() > timeout)
132 return false;
133
134 // sleep
136} }
137
138void DWManager::RemoveAll( Priority stopPriority ) {
139 ALIB_MESSAGE( "TMOD", "DWManager::StopAndJoinAll" )
140
141 ALIB_DBG( Ticks waitCheck= Ticks::Now();
142 int nextWarnSecond= 1; )
143
144 // send stop to those unstopped
145 for( DedicatedWorker* it : workers )
146 if( !it->StopIsScheduled() )
147 it->ScheduleStop( stopPriority );
148
149 ALIB_DBG( waitCheck= Ticks::Now(); nextWarnSecond= 1; )
150 int cntRunning;
151 while(true) {
152 // check that all threads are stopped
153 cntRunning= 0;
154 for( DedicatedWorker* it : workers )
155 if( it->GetState() < Thread::State::Done )
156 cntRunning++;
157 if( cntRunning == 0 )
158 break;
159
161 #if ALIB_DEBUG
162 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond ) {
163 std::vector<std::any> args; args.reserve(32);
164 args.emplace_back( "DWManager Termination: Waiting on {} thread(s) to stop.\n");
165 args.emplace_back( cntRunning );
166 int tNr= 0;
167 for( DedicatedWorker* it : workers ) {
168 args.emplace_back( ++tNr );
169 args.emplace_back( ": {},\tState::{},\t Load: \n" );
170 args.emplace_back( it->GetName() );
171 args.emplace_back( it->state );
172 args.emplace_back( it->Load() );
173 }
174 assert::raise( ALIB_CALLER_PRUNED, 1, "TMOD", args );
175 nextWarnSecond++;
176 }
177 #endif
178 }
179
180 // terminate all registered MThreads and remove them from our the list.
181 for( DedicatedWorker* it : workers )
182 it->Join();
183 workers.Clear();
184}
185
186//##################################################################################################
187// DedicatedWorker
188//##################################################################################################
191
192 // search the first element with equal or higher priority
193 auto it= queue.begin();
194 while(it != queue.end()) {
195 if(jobInfo.priority <= it->priority )
196 break;
197 it++;
198 }
199
200 // insert before found
201 queue.Insert(it, jobInfo );
202
203 #if ALIB_DEBUG
204 std::vector<std::any> args; args.reserve(32);
205 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
206 args.emplace_back( queue.size() );
207 args.emplace_back( &jobInfo.job->ID );
208 args.emplace_back( jobInfo.priority );
209 args.emplace_back( jobInfo.keepJob );
210 assert::raise( ALIB_CALLER_PRUNED, 2, "TMOD/QUEUE", args );
211 #endif
212
213 ++length;
215
217}
218
219std::pair<Job*, bool> DedicatedWorker::pop() {ALIB_LOCK
221 ALIB_ASSERT_ERROR(length != 0, "TMOD", "Job pipe empty after wakeup" )
222 ALIB_MESSAGE( "TMOD/QUEUE", "Queue--, size: ", length )
223
224 std::pair<Job*, bool> result= { queue.back().job, queue.back().keepJob };
225 ALIB_DBG( auto dbgPriority= queue.back().priority; )
226
227 queue.pop_back();
228 --length;
229
230 #if ALIB_DEBUG
231 std::vector<std::any> args; args.reserve(32);
232 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
233 args.emplace_back( length );
234 args.emplace_back( &result.first->ID );
235 args.emplace_back( dbgPriority );
236 args.emplace_back( result.second );
237 assert::raise( ALIB_CALLER_PRUNED, 2, "TMOD/QUEUE", args );
238 #endif
239
240 return result;
241}
242
244 ALIB_MESSAGE("TMOD", "DedicatedWorker \"{}\" is running", GetName() )
245
246 while(!stopJobExecuted) {
247 auto jobInfo = pop();
248
249 // Deferred job-deletion job
250 if (jobInfo.first->Is<JobDeleter>() ) {
251 auto& job= jobInfo.first->Cast<JobDeleter>();
253
254 ALIB_ASSERT(jobInfo.second == false, "TMOD")
256 auto size= job.JobToDelete->SizeOf();
257 job.JobToDelete->~Job();
258 manager.GetPoolAllocator().free( job.JobToDelete, size );
259 manager.GetPoolAllocator().free( &job, sizeof(JobDeleter) );
260
261 goto CONTINUE;
262 }
263
264
265 // overloaded custom process?
266 if ( process(*jobInfo.first) )
267 goto CONTINUE;
268
269
270 // Stop!
271 if (jobInfo.first->Is<JobStop>() ) {
272 stopJobExecuted= true;
273 ALIB_ASSERT(jobInfo.second == false, "TMOD")
275 manager.GetPoolAllocator().free( jobInfo.first, sizeof(JobStop) );
276 goto CONTINUE;
277 }
278
279 // Custom method, implemented with Job::Do()
280 if ( jobInfo.first->Do() ) {
281 // delete?
282 if ( !jobInfo.second )
284 auto size= jobInfo.first->SizeOf();
285 jobInfo.first->~Job();
286 manager.GetPoolAllocator().free( jobInfo.first, size );
287 }
288
289 goto CONTINUE;
290 }
291
292 // Not processed!
293 ALIB_ERROR("TMOD",
294 "Job of type <{}> passed to DedicatedWorker, which was neither recognized by\n"
295 "the specialist nor has it a Job::Do() implementation!", &jobInfo.first->ID )
296
297 CONTINUE:
298 statLastJobExecution.Reset();
299 }
300
301 ALIB_ASSERT_WARNING( Load() == 0, "TMOD",
302 "DedicatedWorker \"{}\" has jobs still queued when stopped!", GetName(), Load() )
303
304 ALIB_MESSAGE( "TMOD", "DedicatedWorker \"{}\" is stopping (leaving method Run()).", GetName() )
305}
306
307} // namespace [alib::threadmodel]
ListMA< DedicatedWorker * > workers
The list of workers.
MonoAllocator ma
Mono allocator. Used for commands and by DedicatedWorkers.
ALIB_DLL void RemoveAll(Priority stopPriority=Priority::Lowest)
ALIB_DLL DWManager()
Constructor.
ALIB_DLL void Add(DedicatedWorker &thread)
ALIB_DLL bool Remove(DedicatedWorker &thread, Priority stopPriority=Priority::Lowest)
PoolAllocator pool
Pool allocator. Used for command objects.
ALIB_DLL bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
virtual ALIB_DLL void Run() override
ALIB_DLL void pushAndRelease(QueueElement &&jobInfo)
DWManager & manager
needed as we inherit TCondition
int length
The current number of jobs in the queue.
bool stopJobExecuted
Flag which is set when the stop-job was executed.
virtual const character * GetName() const
Definition thread.inl:241
DbgLockAsserter Dbg
The debug tool instance.
Definition lock.inl:65
@ Terminated
The thread is terminated.
Definition thread.inl:144
static void SleepMicros(int64_t microseconds)
Definition thread.inl:351
virtual ALIB_DLL void Start()
Definition thread.cpp:285
virtual ALIB_DLL void Join()
Definition thread.cpp:244
#define ALIB_MESSAGE(domain,...)
Definition alib.inl:1064
#define ALIB_ASSERT(cond, domain)
Definition alib.inl:1065
#define ALIB_WARNING(domain,...)
Definition alib.inl:1063
#define ALIB_ASSERT_WARNING(cond, domain,...)
Definition alib.inl:1067
#define ALIB_ERROR(domain,...)
Definition alib.inl:1062
#define ALIB_LOCK
Definition alib.inl:1336
#define ALIB_DBG(...)
Definition alib.inl:853
#define ALIB_ASSERT_ERROR(cond, domain,...)
Definition alib.inl:1066
#define ALIB_CALLER_PRUNED
Definition alib.inl:1024
#define ALIB_LOCK_WITH(lock)
Definition alib.inl:1339
void raise(const CallerInfo &ci, int type, std::string_view domain, const std::span< std::any > &args)
Definition assert.cpp:565
Priority
Possible priorities of jobs assigned to an DedicatedWorker.
Definition jobs.inl:168
time::Ticks Ticks
Type alias in namespace alib.
Definition ticks.inl:79
The job sent by method DeleteJobDeferred.
The stop job sent by method ScheduleStop.
virtual void PrepareDeferredDeletion()
Definition jobs.inl:84
void WaitForNotification(ALIB_DBG_TAKE_CI)
void Acquire(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)