ALib C++ Library
Library Version: 2412 R0
Documentation generated by doxygen
Loading...
Searching...
No Matches
dedicatedworker.cpp
1// #################################################################################################
2// ALib C++ Library
3//
4// Copyright 2013-2024 A-Worx GmbH, Germany
5// Published under 'Boost Software License' (a free software license, see LICENSE.txt)
6// #################################################################################################
8
11
13
14// #################################################################################################
15// threadmodel::Bootstrap()
16// #################################################################################################
17#if !ALIB_CAMP
18#if ALIB_DEBUG && !DOXYGEN
19 namespace{ unsigned int initFlag= 0; }
20#endif // !DOXYGEN
21
22
24 void Bootstrap()
25 {
26 ALIB_ASSERT_ERROR( initFlag == 0, "ENUMS", "This method must not be invoked twice." )
27 ALIB_DBG(initFlag= 0x92A3EF61;)
28
29 #if !ALIB_CAMP
31 {
32 { Priority::Lowest , A_CHAR("Lowest" ), 4 },
33 { Priority::DeferredDeletion, A_CHAR("DeferredDeletion" ), 1 },
34 { Priority::Low , A_CHAR("Low" ), 1 },
35 { Priority::Standard , A_CHAR("Standard" ), 1 },
36 { Priority::Highest , A_CHAR("Highest" ), 5 },
37 { Priority::High , A_CHAR("High" ), 1 },
38 } );
39 #endif // !ALIB_CAMP
40 }
42#endif // !ALIB_CAMP
43
44// #################################################################################################
45// DWManager
46// #################################################################################################
47
49: ma (ALIB_DBG("DWManager",) 16)
50, pool (ma)
51, workers (ma)
52{
53 #if ALIB_DEBUG
54 Lock::Dbg.Name= "DWManager";
55 #endif
56 #if ALIB_DEBUG_CRITICAL_SECTIONS
57 ma.DbgCriticalSectionsPH.Get()->DCSLock= this;
58 #endif
59}
60
61
64 #if ALIB_DEBUG
65 for( auto it : workers )
66 ALIB_ASSERT_ERROR( it != &thread, "MGTHR", "Thread already added" )
67 #endif
68
69 workers.PushBack( &thread );
70 thread.Start();
71}
72
73bool DWManager::Remove( DedicatedWorker& thread, Priority stopPriority ) {
75 auto it= std::find( workers.begin(), workers.end(), &thread );
76 if( it == workers.end() ) {
77 ALIB_WARNING( "MGTHR", NString256("Thread \"") << thread.GetName()
78 << "\"to remove not found ")
79 return false;
80 }
81 workers.Erase( it );
82 }
83
84 if( !thread.StopIsScheduled() )
85 thread.ScheduleStop( stopPriority );
86
87 #if ALIB_DEBUG
88 Ticks waitCheck= Ticks::Now();
89 int nextWarnSecond= 1;
90 #endif
91 while(thread.GetState() < Thread::State::Done) {
93 #if ALIB_DEBUG
94 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond )
95 {
96 ALIB_WARNING( "MGTHR", NString4K("DWManager::Remove: Waiting on thread \"" )
97 << thread.GetName() << "\" to stop. "
98 "State::" << thread.GetState()
99 << ", Load: " << thread.Load() )
100 nextWarnSecond++;
101 }
102 #endif
103 }
104
105 if ( thread.GetState() != Thread::State::Terminated )
106 thread.Join();
107
108 return true;
109}
110
111bool DWManager::WaitForAllIdle( Ticks::Duration timeout
112 ALIB_DBG(, Ticks::Duration dbgWarnAfter) )
113{
114 ALIB_MESSAGE("MGTHR", "DWManager::StopAndJoinAll" )
115
116 Ticks waitStart= Ticks::Now();
117 ALIB_DBG( Ticks nextWarning= waitStart + dbgWarnAfter; )
118 while(true)
119 {
120 // check that all threads are stopped
121 int cntRunning= 0;
122 {ALIB_LOCK
123 for( auto it : workers )
124 if( it->Load() > 0 )
125 cntRunning++;
126 }
127 if( cntRunning == 0 )
128 return true;
129
130 #if ALIB_DEBUG
131 if( nextWarning.Age() > dbgWarnAfter)
132 {
133 NString4K dbgThreadList;
134 dbgThreadList << "Waiting on " << cntRunning << " thread(s) to become idle:\n";
135 int tNr= 0;
136 {ALIB_LOCK
137 for( auto it : workers )
138 if ( it->Load() > 0 )
139 dbgThreadList << ++tNr << ": " << it->GetName()
140 << ",\tState::" << it->state
141 << ",\t Load: " << it->Load() << NEW_LINE;
142 }
143 ALIB_WARNING( "MGTHR", dbgThreadList )
144 nextWarning= Ticks::Now();
145 }
146 #endif
147
148 // check timeout
149 if (waitStart.Age() > timeout)
150 return false;
151
152 // sleep
154 }
155}
156
157void DWManager::RemoveAll( Priority stopPriority )
158{
159 ALIB_MESSAGE("MGTHR", "DWManager::StopAndJoinAll" )
160
161 ALIB_DBG( Ticks waitCheck= Ticks::Now();
162 int nextWarnSecond= 1; )
163
164 // send stop to those unstopped
165 for( auto it : workers )
166 if( !it->StopIsScheduled() )
167 it->ScheduleStop( stopPriority );
168
169 ALIB_DBG( waitCheck= Ticks::Now(); nextWarnSecond= 1; )
170 int cntRunning;
171 while(true)
172 {
173 // check that all threads are stopped
174 cntRunning= 0;
175 for( auto it : workers )
176 if( it->GetState() < Thread::State::Done )
177 cntRunning++;
178 if( cntRunning == 0 )
179 break;
180
182 #if ALIB_DEBUG
183 if( waitCheck.Age().InAbsoluteSeconds() == nextWarnSecond )
184 {
185 NString4K dbgThreadList("DWManager Termination: Waiting on " );
186 dbgThreadList << cntRunning << " Threads to stop. List of threads: " << NEW_LINE;
187 int tNr= 0;
188 for( auto it : workers )
189 {
190 dbgThreadList << ++tNr << ": " << it->GetName()
191 << ",\tState::" << it->state
192 << ",\t Load: " << it->Load() << NEW_LINE;
193 }
194 ALIB_WARNING( "MGTHR", dbgThreadList.NewLine().Terminate() )
195 nextWarnSecond++;
196 }
197 #endif
198 }
199
200 // terminate all registered MThreads and remove them from our the list.
201 for( auto it : workers )
202 it->Join();
203 workers.Clear();
204}
205
206// #################################################################################################
207// DedicatedWorker
208// #################################################################################################
210{
212
213 // search the first element with equal or higher priority
214 auto it= queue.begin();
215 while(it != queue.end())
216 {
217 if(jobInfo.priority <= it->priority )
218 break;
219 it++;
220 }
221
222 // insert before found
223 queue.Insert(it, jobInfo );
224
225 ALIB_MESSAGE( "MGTHR/QUEUE", NString512() <<
226 "Queue(" << queue.Count() << ") "
227 "Job(" << jobInfo.job->ID << ") pushed. "
228 "P::" << jobInfo.priority << ", "
229 "Keep: " << jobInfo.keepJob )
230
231 ++length;
233
235}
236
237std::pair<Job*, bool> DedicatedWorker::pop()
240 ALIB_ASSERT_ERROR(length != 0, "MGTHR", "Job pipe empty after wakeup" )
241 ALIB_MESSAGE( "MGTHR/QUEUE", "Queue--, size: ", length )
242
243 std::pair<Job*, bool> result= { queue.Back().job, queue.Back().keepJob };
244 ALIB_DBG( auto dbgPriority= queue.Back().priority; )
245
246 queue.PopBack();
247 --length;
248
249 ALIB_MESSAGE( "MGTHR/QUEUE", NString512() <<
250 "Queue(" << length << ") "
251 "Job(" << result.first->ID << ") popped. "
252 "P::" << dbgPriority << ", "
253 "Keep: " << result.second )
254
255
256 return result;
257}
258
260{
261 ALIB_MESSAGE("MGTHR", NString256( "DedicatedWorker \"") << GetName() << "\" is running" )
262
263 while(!stopJobExecuted)
264 {
265 auto jobInfo = pop();
266
267 // Deferred job-deletion job
268 if (jobInfo.first->Is<JobDeleter>() )
269 {
270 auto& job= jobInfo.first->Cast<JobDeleter>();
272
273 ALIB_ASSERT(jobInfo.second == false)
275 auto size= job.JobToDelete->SizeOf();
276 job.JobToDelete->~Job();
277 manager.GetPoolAllocator().free( job.JobToDelete, size );
278 manager.GetPoolAllocator().free( &job, sizeof(JobDeleter) );
279
280 goto CONTINUE;
281 }
282
283
284 // overloaded custom process?
285 if ( process(*jobInfo.first) )
286 goto CONTINUE;
287
288
289 // Stop!
290 if (jobInfo.first->Is<JobStop>() )
291 {
292 stopJobExecuted= true;
293 ALIB_ASSERT(jobInfo.second == false)
295 manager.GetPoolAllocator().free( jobInfo.first, sizeof(JobStop) );
296 goto CONTINUE;
297 }
298
299 // Custom method, implemented with Job::Do()
300 if ( jobInfo.first->Do() )
301 {
302 // delete?
303 if ( !jobInfo.second )
305 auto size= jobInfo.first->SizeOf();
306 jobInfo.first->~Job();
307 manager.GetPoolAllocator().free( jobInfo.first, size );
308 }
309
310 goto CONTINUE;
311 }
312
313 // Not processed!
314 ALIB_ERROR("MGTHR", NString512() <<
315 "Job of type <" << jobInfo.first->ID << ">"
316 "passed to DedicatedWorker, which was neither recognized by\n"
317 "the specialist nor has it a Job::Do() implementation!" )
318
319 CONTINUE:
321 }
322
323 ALIB_ASSERT_WARNING( Load() == 0, "MGTHR", NString256() <<
324 "DedicatedWorker \"" << GetName() << "\" has " << Load() <<
325 " jobs still queued when stopped!\n" )
326
327 ALIB_MESSAGE( "MGTHR", NString256() <<
328 "DedicatedWorker \"" << GetName() << "\" is stopping (leaving method Run())." )
329}
330
331} // namespace [alib::threadmodel]
332
333
lang::Placeholder< lang::DbgCriticalSections > DbgCriticalSectionsPH
void free(void *mem, size_t size)
constexpr const TChar * Terminate() const
Definition tastring.inl:821
ALIB_API bool WaitForAllIdle(Ticks::Duration timeout, Ticks::Duration dbgWarnAfter)
List< MonoAllocator, DedicatedWorker * > workers
The list of workers.
MonoAllocator ma
Mono allocator. Used for commands and by DedicatedWorkers.
ALIB_API void Add(DedicatedWorker &thread)
ALIB_API void RemoveAll(Priority stopPriority=Priority::Lowest)
ALIB_API bool Remove(DedicatedWorker &thread, Priority stopPriority=Priority::Lowest)
ALIB_API DWManager()
Constructor.
List< HeapAllocator, QueueElement > queue
virtual ALIB_API void Run() override
ALIB_API void pushAndRelease(QueueElement &&jobInfo)
virtual const CString GetName() const
Definition thread.hpp:225
bool stopJobExecuted
Flag which is set when the stop-job was executed.
DWManager & manager
needed as we inherit TCondition
int length
The current number of jobs in the queue.
DbgLockAsserter Dbg
The debug tool instance.
Definition lock.hpp:78
virtual ALIB_API void Start()
Definition thread.cpp:284
@ Terminated
The thread is terminated.
static void SleepMicros(int64_t microseconds)
Definition thread.hpp:332
virtual ALIB_API void Join()
Definition thread.cpp:220
#define ALIB_WARNING(...)
Definition alib.hpp:1268
#define A_CHAR(STR)
#define ALIB_MESSAGE(...)
Definition alib.hpp:1269
#define ALIB_ERROR(...)
Definition alib.hpp:1267
#define ALIB_ASSERT_ERROR(cond,...)
Definition alib.hpp:1271
#define ALIB_LOCK
Definition owner.hpp:453
#define ALIB_ASSERT_WARNING(cond,...)
Definition alib.hpp:1272
#define ALIB_DBG(...)
Definition alib.hpp:390
#define ALIB_CALLER_PRUNED
Definition alib.hpp:1170
#define ALIB_LOCK_WITH(lock)
Definition owner.hpp:456
#define ALIB_ASSERT(cond)
Definition alib.hpp:1270
#define ALIB_CAMP
Definition alib.hpp:201
ALIB_API void Bootstrap()
Priority
Possible priorities of jobs assigned to an DedicatedWorker.
Definition jobs.hpp:187
NLocalString< 256 > NString256
Type alias name for TLocalString<nchar,256>.
constexpr CString NEW_LINE
A zero-terminated string containing the new-line character sequence.
Definition cstring.hpp:580
NLocalString< 512 > NString512
Type alias name for TLocalString<nchar,512>.
NLocalString< 4096 > NString4K
Type alias name for TLocalString<nchar,8192>.
The job sent by method DeleteJobDeferred.
The stop job sent by method ScheduleStop.
virtual void PrepareDeferredDeletion()
Definition jobs.hpp:101
void WaitForNotification(ALIB_DBG_TAKE_CI)
void Acquire(ALIB_DBG_TAKE_CI)
void ReleaseAndNotify(ALIB_DBG_TAKE_CI)