Free Electron
Gang.h
Go to the documentation of this file.
1 /* Copyright (C) 2003-2021 Free Electron Organization
2  Any use of this software requires a license. If a valid license
3  was not distributed with this file, visit freeelectron.org. */
4 
5 /** @file */
6 
7 #ifndef __thread_Gang_h__
8 #define __thread_Gang_h__
9 
10 namespace fe
11 {
12 namespace ext
13 {
14 
15 /**************************************************************************//**
16  @brief Group of Worker threads
17 
18  @ingroup thread
19 *//***************************************************************************/
20 template<typename WORKER,typename T>
21 class Gang: public JobQueue<T>
22 {
23  public:
24  Gang(void):
25  m_pGroup(NULL),
26  m_ppThread(NULL),
27  m_ppWorker(NULL)
28 
29  { Counted::setName("Gang"); }
30 
31 virtual ~Gang(void)
32  {
34  {
35  finish();
36  }
37  }
38 
39  BWORD start(sp<Counted> a_spObject=sp<Counted>(NULL),
40  U32 a_threads=0,String a_stage="")
41  {
42  FEASSERT(!JobQueue<T>::m_workers);
43 
44  //* if threads arg is zero, create one thread per core
45  if(!a_threads)
46  {
47  a_threads=Thread::hardwareConcurrency();
48  }
49 
50  JobQueue<T>::m_workers=a_threads;
51  sp< JobQueue<T> > spJobQueue(this);
52 
53  FEASSERT(!m_pGroup);
54  m_pGroup=new Thread::Group();
55  m_ppWorker=new WORKER*[a_threads];
56  m_ppThread=new Thread*[a_threads];
57 
58  for(U32 m=0;m<a_threads;m++)
59  {
60  m_ppWorker[m]=new WORKER(spJobQueue,m,a_stage);
61  m_ppWorker[m]->setObject(a_spObject);
62  m_ppThread[m]=NULL;
63  }
64 
65  for(U32 m=0;m<a_threads;m++)
66  {
67  U32 tries=0;
68  const U32 maxTries=3;
69  while(TRUE)
70  {
71  try
72  {
73  //* boost can throw boost::thread_resource_error
74  m_ppThread[m]=m_pGroup->createThread(
75  m_ppWorker[m]);
76  if(!m_ppThread[m])
77  {
78  feX(e_cannotCreate,"Gang::start",
79  "failed to create thread");
80  }
81 
82  }
83  catch (std::exception& e)
84  {
85 #if !FE_RTTI_HOMEBREW
86  Exception* pException=fe_cast<Exception>(&e);
87  if(pException &&
88  pException->getResult()==e_cannotCreate)
89  {
90 #if FE_CODEGEN<=FE_DEBUG
91  feLog("Gang::start"
92  " failed to create thread\n");
93 #endif
94  return FALSE;
95  }
96 #endif
97 
98  feLog("Gang::start"
99  " failed to create thread "
100  " worker %d/%d try %d/%d"
101  " (\"%s\")\n",
102  m,a_threads,tries,maxTries,e.what());
103  if(++tries>=maxTries)
104  {
105  feLog("Gang::start"
106  " giving up creating thread\n");
107 
108  for(U32 n=0;n<m;n++)
109  {
110  if(m_ppThread[n])
111  {
112  feLog("Gang::start"
113  " interrupt %d/%d\n",n,m);
114  m_ppThread[n]->interrupt();
115  }
116  }
117 
118  feLog("Gang::start cleaning up\n");
119  discardWorkers();
120  feLog("Gang::start done\n");
121 
122  return FALSE;
123  }
124  milliSleep(1000);
125  feLog("Gang::start retrying\n");
126  continue;
127  }
128  if(tries)
129  {
130  feLog("Gang::start retry succeeded\n");
131  }
132  break;
133  }
134  }
135  return TRUE;
136  }
137 
138  void finish(void)
139  {
140  FEASSERT(JobQueue<T>::m_workers);
141  FEASSERT(m_pGroup);
142 
143  m_pGroup->joinAll();
144 
145  discardWorkers();
146 
147  //* empty out queue
148  T job;
149  while(JobQueue<T>::take(job)){}
150  }
151 
152  private:
153 
154  void discardWorkers(void)
155  {
156  for(U32 m=0;m<JobQueue<T>::m_workers;m++)
157  {
158 // if(m_ppThread[m])
159 // {
160 // m_pGroup->remove_thread(m_ppThread[m]);
161 // m_ppThread[m]=NULL;
162 // }
163  if(m_ppThread[m])
164  {
165  delete m_ppThread[m];
166  m_ppThread[m]=NULL;
167  }
168 
169  delete m_ppWorker[m];
170  m_ppWorker[m]=NULL;
171  }
172 
173  delete[] m_ppThread;
174  m_ppThread=NULL;
175 
176  delete[] m_ppWorker;
177  m_ppWorker=NULL;
178 
179  delete m_pGroup;
180  m_pGroup=NULL;
181 
183  }
184 
185  Thread::Group* m_pGroup;
186  Thread** m_ppThread;
187  WORKER** m_ppWorker;
188 };
189 
190 } /* namespace ext */
191 } /* namespace fe */
192 
193 #endif // __thread_Gang_h__
Group of Worker threads.
Definition: Gang.h:21
kernel
Definition: namespace.dox:3
Generic exception carrying a fe::String payload.
Definition: Exception.h:34
Automatically reference-counted string container.
Definition: String.h:128
Intrusive Smart Pointer.
Definition: src/core/ptr.h:53
Queue of tasks.
Definition: JobQueue.h:29
void milliSleep(long milliseconds)
Sleep the current process for the given number of milliseconds.
Definition: SystemTicker.cc:28