ALib C++ Framework
by
Library Version: 2605 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
dedicatedworker.cpp
1
3
4//##################################################################################################
5// DWManager
6//##################################################################################################
7
9: ma (ALIB_DBG("DWManager",) 16)
10, pool (ma)
11, workers (ma) {
12 #if ALIB_DEBUG
13 Lock::Dbg.Name= "DWManager";
14 #endif
15 #if ALIB_DEBUG_CRITICAL_SECTIONS
16 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
17 #endif
18}
19
20
22 #if ALIB_DEBUG
23 for( DedicatedWorker* it : workers )
24 ALIB_ASSERT_ERROR( it != &thread, "TMOD", "Thread already added" )
25 #endif
26
27 workers.push_back( &thread );
28 thread.Start();
29}
30
31bool DWManager::Remove( DedicatedWorker& thread, Priority stopPriority ) {
33 auto it= std::find( workers.begin(), workers.end(), &thread );
34 if( it == workers.end() ) {
35 ALIB_WARNING( "TMOD", "Thread \"{}\" to remove not found", thread.GetName())
36 return false;
37 }
38 workers.erase( it );
39 }
40
41 if( !thread.StopIsScheduled() )
42 thread.ScheduleStop( stopPriority );
43
44 #if ALIB_DEBUG
45 Ticks waitCheck= Ticks::Now();
46 int nextWarnSecond= 1;
47 #endif
48 while(thread.GetState() < Thread::State::Done) {
50 #if ALIB_DEBUG
51 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond ) {
52 ALIB_WARNING( "TMOD",
53 "DWManager::Remove: Waiting on thread \"{}\" to stop. State::{}, Load: ",
54 thread.GetName(), thread.GetState(), thread.Load() )
55 nextWarnSecond++;
56 }
57 #endif
58 }
59
60 if ( thread.GetState() != Thread::State::Terminated )
61 thread.Join();
62
63 return true;
64}
65
66bool DWManager::WaitForAllIdle( Ticks::Duration timeout
67 ALIB_DBG(, Ticks::Duration dbgWarnAfter) ) {
68 ALIB_MESSAGE( "TMOD", "DWManager::StopAndJoinAll" )
69
70 Ticks waitStart= Ticks::Now();
71 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
72 while(true) {
73 // check that all threads are stopped
74 int cntRunning= 0;
76 for( DedicatedWorker* it : workers )
77 if( it->Load() > 0 )
78 cntRunning++;
79 }
80 if( cntRunning == 0 )
81 return true;
82
83 #if ALIB_DEBUG
84 if( nextWarning.Age() > dbgWarnAfter) {
85 std::vector<std::any> args; args.reserve(32);
86 args.emplace_back( "Waiting on {} thread(s) to become idle.\n");
87 args.emplace_back( cntRunning );
88 int tNr= 0;
90 for( DedicatedWorker* it : workers )
91 if ( it->Load() > 0 ) {
92 args.emplace_back( ++tNr );
93 args.emplace_back( ": {},\tState::{},\t Load: \n" );
94 args.emplace_back( it->GetName() );
95 args.emplace_back( it->state );
96 args.emplace_back( it->Load() );
97 } }
98 assert::raise( ALIB_CALLER_PRUNED, 1, "TMOD", args );
99 nextWarning= Ticks::Now();
100 }
101 #endif
102
103 // check timeout
104 if (waitStart.Age() > timeout)
105 return false;
106
107 // sleep
109} }
110
111void DWManager::RemoveAll( Priority stopPriority ) {
112 ALIB_MESSAGE( "TMOD", "DWManager::StopAndJoinAll" )
113
114 ALIB_DBG( Ticks waitCheck= Ticks::Now();
115 int nextWarnSecond= 1; )
116
117 // send stop to those unstopped
118 for( DedicatedWorker* it : workers )
119 if( !it->StopIsScheduled() )
120 it->ScheduleStop( stopPriority );
121
122 ALIB_DBG( waitCheck= Ticks::Now(); nextWarnSecond= 1; )
123 int cntRunning;
124 while(true) {
125 // check that all threads are stopped
126 cntRunning= 0;
127 for( DedicatedWorker* it : workers )
128 if( it->GetState() < Thread::State::Done )
129 cntRunning++;
130 if( cntRunning == 0 )
131 break;
132
134 #if ALIB_DEBUG
135 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond ) {
136 std::vector<std::any> args; args.reserve(32);
137 args.emplace_back( "DWManager Termination: Waiting on {} thread(s) to stop.\n");
138 args.emplace_back( cntRunning );
139 int tNr= 0;
140 for( DedicatedWorker* it : workers ) {
141 args.emplace_back( ++tNr );
142 args.emplace_back( ": {},\tState::{},\t Load: \n" );
143 args.emplace_back( it->GetName() );
144 args.emplace_back( it->state );
145 args.emplace_back( it->Load() );
146 }
147 assert::raise( ALIB_CALLER_PRUNED, 1, "TMOD", args );
148 nextWarnSecond++;
149 }
150 #endif
151 }
152
153 // terminate all registered MThreads and remove them from our the list.
154 for( DedicatedWorker* it : workers )
155 it->Join();
156 workers.Clear();
157}
158
159//##################################################################################################
160// DedicatedWorker
161//##################################################################################################
164
165 // search the first element with equal or higher priority
166 auto it= queue.begin();
167 while(it != queue.end()) {
168 if(jobInfo.priority <= it->priority )
169 break;
170 it++;
171 }
172
173 // insert before found
174 queue.Insert(it, jobInfo );
175
176 #if ALIB_DEBUG
177 std::vector<std::any> args; args.reserve(32);
178 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
179 args.emplace_back( queue.size() );
180 args.emplace_back( &jobInfo.job->ID );
181 args.emplace_back( jobInfo.priority );
182 args.emplace_back( jobInfo.keepJob );
183 assert::raise( ALIB_CALLER_PRUNED, 2, "TMOD/QUEUE", args );
184 #endif
185
186 ++length;
188
190}
191
192std::pair<Job*, bool> DedicatedWorker::pop() {ALIB_LOCK
194 ALIB_ASSERT_ERROR(length != 0, "TMOD", "Job pipe empty after wakeup" )
195 ALIB_MESSAGE( "TMOD/QUEUE", "Queue--, size: ", length )
196
197 std::pair<Job*, bool> result= { queue.back().job, queue.back().keepJob };
198 ALIB_DBG( auto dbgPriority= queue.back().priority; )
199
200 queue.pop_back();
201 --length;
202
203 #if ALIB_DEBUG
204 std::vector<std::any> args; args.reserve(32);
205 args.emplace_back( "Queue({}) Job({}) pushed. P::{} Keep: ");
206 args.emplace_back( length );
207 args.emplace_back( &result.first->ID );
208 args.emplace_back( dbgPriority );
209 args.emplace_back( result.second );
210 assert::raise( ALIB_CALLER_PRUNED, 2, "TMOD/QUEUE", args );
211 #endif
212
213 return result;
214}
215
217 ALIB_MESSAGE("TMOD", "DedicatedWorker \"{}\" is running", GetName() )
218
219 while(!stopJobExecuted) {
220 auto jobInfo = pop();
221
222 // Deferred job-deletion job
223 if (jobInfo.first->Is<JobDeleter>() ) {
224 auto& job= jobInfo.first->Cast<JobDeleter>();
226
227 ALIB_ASSERT(jobInfo.second == false, "TMOD")
229 auto size= job.JobToDelete->SizeOf();
230 job.JobToDelete->~Job();
231 manager.GetPoolAllocator().free( job.JobToDelete, size );
232 manager.GetPoolAllocator().free( &job, sizeof(JobDeleter) );
233
234 goto CONTINUE;
235 }
236
237
238 // overloaded custom process?
239 if ( process(*jobInfo.first) )
240 goto CONTINUE;
241
242
243 // Stop!
244 if (jobInfo.first->Is<JobStop>() ) {
245 stopJobExecuted= true;
246 ALIB_ASSERT(jobInfo.second == false, "TMOD")
248 manager.GetPoolAllocator().free( jobInfo.first, sizeof(JobStop) );
249 goto CONTINUE;
250 }
251
252 // Custom method, implemented with Job::Do()
253 if ( jobInfo.first->Do() ) {
254 // delete?
255 if ( !jobInfo.second )
257 auto size= jobInfo.first->SizeOf();
258 jobInfo.first->~Job();
259 manager.GetPoolAllocator().free( jobInfo.first, size );
260 }
261
262 goto CONTINUE;
263 }
264
265 // Not processed!
266 ALIB_ERROR("TMOD",
267 "Job of type <{}> passed to DedicatedWorker, which was neither recognized by\n"
268 "the specialist nor has it a Job::Do() implementation!", &jobInfo.first->ID )
269
270 CONTINUE:
271 statLastJobExecution.Reset();
272 }
273
274 ALIB_ASSERT_WARNING( Load() == 0, "TMOD",
275 "DedicatedWorker \"{}\" has jobs still queued when stopped!", GetName(), Load() )
276
277 ALIB_MESSAGE( "TMOD", "DedicatedWorker \"{}\" is stopping (leaving method Run()).", GetName() )
278}
279
280} // namespace [alib::threadmodel]
#define ALIB_MESSAGE(domain,...)
#define ALIB_ASSERT(cond, domain)
#define ALIB_WARNING(domain,...)
#define ALIB_ASSERT_WARNING(cond, domain,...)
#define ALIB_ERROR(domain,...)
#define ALIB_LOCK
#define ALIB_DBG(...)
#define ALIB_ASSERT_ERROR(cond, domain,...)
#define ALIB_CALLER_PRUNED
#define ALIB_LOCK_WITH(lock)
ListMA< DedicatedWorker * > workers
The list of workers.
MonoAllocator ma
Mono allocator. Used for commands and by DedicatedWorkers.
void RemoveAll(Priority stopPriority=Priority::Lowest)
void Add(DedicatedWorker &thread)
bool Remove(DedicatedWorker &thread, Priority stopPriority=Priority::Lowest)
PoolAllocator pool
Pool allocator. Used for command objects.
bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
void pushAndRelease(QueueElement &&jobInfo)
DWManager & manager
Reference to #"%DWManager" instance.
int length
The current number of jobs in the queue.
bool stopJobExecuted
Flag which is set when the stop-job was executed.
virtual const character * GetName() const
Definition thread.hpp:237
DbgLockAsserter Dbg
The debug tool instance.
Definition lock.hpp:65
@ Terminated
The thread is terminated.
Definition thread.hpp:141
static void SleepMicros(int64_t microseconds)
Definition thread.hpp:349
virtual void Start()
Definition thread.cpp:224
virtual void Join()
Definition thread.cpp:183
void raise(const CallerInfo &ci, int type, std::string_view domain, const std::span< std::any > &args)
Definition assert.cpp:552
Priority
Possible priorities of jobs assigned to an #"DedicatedWorker".
Definition jobs.hpp:166
time::Ticks Ticks
Type alias in namespace #"%alib".
Definition ticks.hpp:86
The stop job sent by method #"ScheduleStop".
virtual void PrepareDeferredDeletion()
Definition jobs.hpp:83
void WaitForNotification(ALIB_DBG_TAKE_CI)
void Acquire(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)