7 #ifndef __thread_JobQueue_h__ 8 #define __thread_JobQueue_h__ 10 #define FE_JQ_LOCK_DEBUG FALSE 20 JobQueuePre(
void) { Mutex::confirm(); }
37 { Counted::setName(
"JobQueue"); }
48 U32 workers(
void)
const {
return m_workers; }
53 feLog(
"JobQueue::post %s LOCK\n",c_print(job));
55 Thread::interruptionPoint();
57 RecursiveMutex::Guard lock(m_jobMonitor);
60 m_jobAvailable.notifyOne();
62 void post(T first_job,T last_job)
65 feLog(
"JobQueue::post %s %s LOCK\n",
66 c_print(first_job),c_print(last_job));
68 Thread::interruptionPoint();
70 RecursiveMutex::Guard lock(m_jobMonitor);
72 for(T job=first_job;job<=last_job;job++)
75 m_jobAvailable.notifyOne();
83 feLog(
"JobQueue::take %s LOCK\n",c_print(job));
85 Thread::interruptionPoint();
87 RecursiveMutex::Guard lock(m_jobMonitor);
97 void waitForJob(T& job)
101 feLog(
"JobQueue::waitForJob %s LOCK\n",c_print(job));
103 Thread::interruptionPoint();
105 RecursiveMutex::Guard lock(m_jobMonitor);
107 while(m_posted.empty())
110 m_jobAvailable.wait(lock);
112 FEASSERT(!m_posted.empty());
113 job=m_posted.front();
121 feLog(
"JobQueue::deliver %s LOCK\n",c_print(job));
123 Thread::interruptionPoint();
125 RecursiveMutex::Guard lock(m_workerMonitor);
127 m_delivered.push(job);
128 m_workerDelivered.notifyOne();
131 BWORD acceptDelivery(T& job)
134 feLog(
"JobQueue::acceptDelivery %s LOCK\n",c_print(job));
136 Thread::interruptionPoint();
138 RecursiveMutex::Guard lock(m_workerMonitor);
140 if(m_delivered.empty())
144 job=m_delivered.front();
148 void waitForDelivery(T& job)
151 feLog(
"JobQueue::waitForDelivery %s LOCK\n",c_print(job));
153 Thread::interruptionPoint();
155 RecursiveMutex::Guard lock(m_workerMonitor);
157 while(m_delivered.empty())
160 m_workerDelivered.wait(lock);
162 FEASSERT(!m_delivered.empty());
163 job=m_delivered.front();
172 const U32 participants=m_workers+a_supervised;
174 feLog(
"JobQueue::synchronize LOCK\n");
176 Thread::interruptionPoint();
178 RecursiveMutex::Guard lock(m_barrierMonitor);
184 m_barrierHit.wait(lock);
188 while(!m_barrierMet && m_barrierCount<participants)
192 m_barrierHit.wait(lock);
194 BWORD result=(m_barrierCount==participants);
196 m_barrierMet=(m_barrierCount>0);
197 m_barrierHit.notifyAll();
202 void relayException(
const Exception& a_rException)
205 feLog(
"JobQueue::relayException LOCK\n");
207 Thread::interruptionPoint();
209 RecursiveMutex::Guard lock(m_workerMonitor);
213 m_pException=
new Exception(a_rException);
217 void resolveException(
void)
220 feLog(
"JobQueue::resolveException LOCK\n");
222 Thread::interruptionPoint();
224 RecursiveMutex::Guard lock(m_workerMonitor);
241 RecursiveMutex m_jobMonitor;
242 RecursiveMutex m_workerMonitor;
243 RecursiveMutex::Condition m_jobAvailable;
244 RecursiveMutex::Condition m_workerDelivered;
246 std::queue<T> m_posted;
247 std::queue<T> m_delivered;
249 RecursiveMutex m_barrierMonitor;
250 RecursiveMutex::Condition m_barrierHit;
260 #endif // __thread_JobQueue_h__ kernel
Definition: namespace.dox:3
BWORD synchronize(BWORD a_supervised=FALSE)
waits until all threads call sync
Definition: JobQueue.h:170
Generic exception carrying a fe::String payload.
Definition: Exception.h:34
Base class providing an fe::Handle to the derived class.
Definition: Handled.h:209
Queue of tasks.
Definition: JobQueue.h:29