Free Electron
JobQueue.h
Go to the documentation of this file.
1 /* Copyright (C) 2003-2021 Free Electron Organization
2  Any use of this software requires a license. If a valid license
3  was not distributed with this file, visit freeelectron.org. */
4 
5 /** @file */
6 
7 #ifndef __thread_JobQueue_h__
8 #define __thread_JobQueue_h__
9 
10 #define FE_JQ_LOCK_DEBUG FALSE
11 
12 namespace fe
13 {
14 namespace ext
15 {
16 
17 class JobQueuePre
18 {
19  public:
20  JobQueuePre(void) { Mutex::confirm(); }
21 };
22 
23 /**************************************************************************//**
24  @brief Queue of tasks
25 
26  @ingroup thread
27 *//***************************************************************************/
28 template<typename T>
29 class JobQueue: public JobQueuePre, public Handled< JobQueue<T> >
30 {
31  public:
32  JobQueue(void):
33  m_workers(0),
34  m_barrierCount(0),
35  m_barrierMet(FALSE),
36  m_pException(NULL)
37  { Counted::setName("JobQueue"); }
38 
39  ~JobQueue(void)
40  {
41  if(m_pException)
42  {
43  delete m_pException;
44  m_pException=NULL;
45  }
46  }
47 
48  U32 workers(void) const { return m_workers; }
49 
50  void post(T job)
51  {
52 #if FE_JQ_LOCK_DEBUG
53  feLog("JobQueue::post %s LOCK\n",c_print(job));
54 #endif
55  Thread::interruptionPoint();
56 
57  RecursiveMutex::Guard lock(m_jobMonitor);
58 // feLog("JobQueue::post %d\n",job);
59  m_posted.push(job);
60  m_jobAvailable.notifyOne();
61  }
62  void post(T first_job,T last_job)
63  {
64 #if FE_JQ_LOCK_DEBUG
65  feLog("JobQueue::post %s %s LOCK\n",
66  c_print(first_job),c_print(last_job));
67 #endif
68  Thread::interruptionPoint();
69 
70  RecursiveMutex::Guard lock(m_jobMonitor);
71 // feLog("JobQueue::post %d\n",job);
72  for(T job=first_job;job<=last_job;job++)
73  {
74  m_posted.push(job);
75  m_jobAvailable.notifyOne();
76  }
77  }
78 
79  BWORD take(T& job)
80  {
81 // feLog("JobQueue::take\n");
82 #if FE_JQ_LOCK_DEBUG
83  feLog("JobQueue::take %s LOCK\n",c_print(job));
84 #endif
85  Thread::interruptionPoint();
86 
87  RecursiveMutex::Guard lock(m_jobMonitor);
88 // feLog("JobQueue::take locked\n");
89  if(m_posted.empty())
90  {
91  return FALSE;
92  }
93  job=m_posted.front();
94  m_posted.pop();
95  return TRUE;
96  }
97  void waitForJob(T& job)
98  {
99 // feLog("JobQueue::waitForJob\n");
100 #if FE_JQ_LOCK_DEBUG
101  feLog("JobQueue::waitForJob %s LOCK\n",c_print(job));
102 #endif
103  Thread::interruptionPoint();
104 
105  RecursiveMutex::Guard lock(m_jobMonitor);
106 // feLog("JobQueue::waitForJob locked\n");
107  while(m_posted.empty())
108  {
109 // feLog("JobQueue::waitForJob empty\n");
110  m_jobAvailable.wait(lock);
111  }
112  FEASSERT(!m_posted.empty());
113  job=m_posted.front();
114 // feLog("JobQueue::waitForJob job=%d\n",job);
115  m_posted.pop();
116  }
117 
118  void deliver(T job)
119  {
120 #if FE_JQ_LOCK_DEBUG
121  feLog("JobQueue::deliver %s LOCK\n",c_print(job));
122 #endif
123  Thread::interruptionPoint();
124 
125  RecursiveMutex::Guard lock(m_workerMonitor);
126 // feLog("JobQueue::deliver %d\n",job);
127  m_delivered.push(job);
128  m_workerDelivered.notifyOne();
129  }
130 
131  BWORD acceptDelivery(T& job)
132  {
133 #if FE_JQ_LOCK_DEBUG
134  feLog("JobQueue::acceptDelivery %s LOCK\n",c_print(job));
135 #endif
136  Thread::interruptionPoint();
137 
138  RecursiveMutex::Guard lock(m_workerMonitor);
139 // feLog("JobQueue::acceptDelivery\n");
140  if(m_delivered.empty())
141  {
142  return FALSE;
143  }
144  job=m_delivered.front();
145  m_delivered.pop();
146  return TRUE;
147  }
148  void waitForDelivery(T& job)
149  {
150 #if FE_JQ_LOCK_DEBUG
151  feLog("JobQueue::waitForDelivery %s LOCK\n",c_print(job));
152 #endif
153  Thread::interruptionPoint();
154 
155  RecursiveMutex::Guard lock(m_workerMonitor);
156 // feLog("JobQueue::waitForDelivery\n");
157  while(m_delivered.empty())
158  {
159 // feLog("JobQueue::waitForDelivery empty\n");
160  m_workerDelivered.wait(lock);
161  }
162  FEASSERT(!m_delivered.empty());
163  job=m_delivered.front();
164  m_delivered.pop();
165  }
166 
167  /** @brief waits until all threads call sync
168 
169  Returns true to exactly one of the threads. */
170  BWORD synchronize(BWORD a_supervised=FALSE)
171  {
172  const U32 participants=m_workers+a_supervised;
173 #if FE_JQ_LOCK_DEBUG
174  feLog("JobQueue::synchronize LOCK\n");
175 #endif
176  Thread::interruptionPoint();
177 
178  RecursiveMutex::Guard lock(m_barrierMonitor);
179 // feLog("JobQueue::sync %d\n",m_barrierCount);
180  while(m_barrierMet)
181  {
182 // feLog("JobQueue::sync previous %d/%d\n",
183 // m_barrierCount,participants);
184  m_barrierHit.wait(lock);
185  }
186 // feLog("JobQueue::sync %d++\n",m_barrierCount);
187  m_barrierCount++;
188  while(!m_barrierMet && m_barrierCount<participants)
189  {
190 // feLog("JobQueue::sync wait %d/%d\n",
191 // m_barrierCount,participants);
192  m_barrierHit.wait(lock);
193  }
194  BWORD result=(m_barrierCount==participants);
195  m_barrierCount--;
196  m_barrierMet=(m_barrierCount>0);
197  m_barrierHit.notifyAll();
198 // feLog("JobQueue::sync complete\n");
199  return result;
200  }
201 
202  void relayException(const Exception& a_rException)
203  {
204 #if FE_JQ_LOCK_DEBUG
205  feLog("JobQueue::relayException LOCK\n");
206 #endif
207  Thread::interruptionPoint();
208 
209  RecursiveMutex::Guard lock(m_workerMonitor);
210 
211  if(!m_pException)
212  {
213  m_pException=new Exception(a_rException);
214  }
215  };
216 
217  void resolveException(void)
218  {
219 #if FE_JQ_LOCK_DEBUG
220  feLog("JobQueue::resolveException LOCK\n");
221 #endif
222  Thread::interruptionPoint();
223 
224  RecursiveMutex::Guard lock(m_workerMonitor);
225 
226  if(m_pException)
227  {
228  Exception exception=*m_pException;
229 
230  delete m_pException;
231  m_pException=NULL;
232 
233  throw exception;
234  }
235  };
236 
237  protected:
238  U32 m_workers;
239 
240  private:
241  RecursiveMutex m_jobMonitor;
242  RecursiveMutex m_workerMonitor;
243  RecursiveMutex::Condition m_jobAvailable;
244  RecursiveMutex::Condition m_workerDelivered;
245 
246  std::queue<T> m_posted;
247  std::queue<T> m_delivered;
248 
249  RecursiveMutex m_barrierMonitor;
250  RecursiveMutex::Condition m_barrierHit;
251  U32 m_barrierCount;
252  U32 m_barrierMet;
253 
254  Exception* m_pException;
255 };
256 
257 } /* namespace ext */
258 } /* namespace fe */
259 
260 #endif // __thread_JobQueue_h__
kernel
Definition: namespace.dox:3
BWORD synchronize(BWORD a_supervised=FALSE)
waits until all threads call sync
Definition: JobQueue.h:170
Generic exception carrying a fe::String payload.
Definition: Exception.h:34
Base class providing an fe::Handle to the derived class.
Definition: Handled.h:209
Queue of tasks.
Definition: JobQueue.h:29