Free Electron
ConnectedCatalog.h
Go to the documentation of this file.
1 /* Copyright (C) 2003-2021 Free Electron Organization
2  Any use of this software requires a license. If a valid license
3  was not distributed with this file, visit freeelectron.org. */
4 
5 /** @file */
6 
7 #ifndef __network_ConnectedCatalog_h__
8 #define __network_ConnectedCatalog_h__
9 
10 namespace fe
11 {
12 namespace ext
13 {
14 
15 #define FE_CNC_MAX_CLIENTS 1024 //* avoid realloc
16 #define FE_CNC_IDENTITY_BYTES 5
17 
18 //* TODO maybe extend configure like ZeroMQ endpoint: "tcp://localhost:7890"
19 
20 /**************************************************************************//**
21  @brief StateCatalog with connected mirroring
22 
23  Before starting a connection, several Catalog values may need to be set.
24  In the short form, the configure method can set up the connection
25  with a single parsable string, such as:
26 
27  "localhost:7890 role=client connectionBlock=true"
28 
29  The first argument in the address and port.
30  For a server, the address is ignored,
31  although the convention is to just use "*".
32 
33  The remaining arguments are name/value pairs.
34  The current list a recognized values are as follows:
35 
36  role "server" or "client"
37  address string address for a client to connect to
38  port integer port number
39  ioPriority usually "normal" or "high" (for Windows only)
40  transport "tcp" or "udp", where appropriate
41  connectionBlock "true" or "false", to block flushes before connection
42 
43  In the long form,
44  the internal catalog settings can be written and read directly.
45  The long form keys used in the catalog match the short form names,
46  but with a "net:" prefix.
47 
48  "net:role" should be set to either "server" or "client".
49 
50  "net:address" should be set to the string address
51  for a client to connect to.
52  This is ignored for a server.
53 
54  "net:port" should be set to the integer port number.
55 
56  "net:ioPriority" can be set to a string, usually either "normal" or "high".
57  The additional values are "idle", "lowest", "low", "highest",
58  and "critical".
59  Currently, this only has effect when using the stdthread module
60  under Windows.
61 
62  "net:transport" can be set to "tcp" or "udp",
63  if appropriate for the implementation.
64 
65  "net:connectionBlock" can be set to "true" or "false",
66  whether to block flushes when waiting for a connection.
67 
68  @ingroup network
69 *//***************************************************************************/
70 class FE_DL_EXPORT ConnectedCatalog:
71  public StateCatalog,
72  public Initialize<ConnectedCatalog>
73 {
74  public:
75 
76  ConnectedCatalog(void);
77 virtual ~ConnectedCatalog(void);
78 
79  void initialize(void);
80 
81  /// @brief Provide a parsable setup string
82 virtual Result configure(String a_line) override;
83 
84  /** @brief Initiate connections
85 
86  A server will begin seeking client connections
87  and a client will try to make a connection.
88  This method may return before making any connections. */
89 virtual Result start(void) override;
90 
91  /** @brief Disconnect and stop initiating connections
92 
93  New connections will no longer be accepted.
94 
95  This method will not return until the connections
96  existing connections are disconnected. */
97 virtual Result stop(void) override;
98 
99  /** @brief Return only after a connection has been made
100 
101  A derived implementation could return a failure result code,
102  indicating a problem prior to making a connection. */
103 virtual Result waitForConnection(void) override;
104 
105  /** @brief Check if the catalog wants to connect
106 
107  The started state means the catalog is seeking connections.
108  A started catlog may have connections. */
109 virtual BWORD started(void) const override
110  { return m_started.load(std::memory_order_relaxed); }
111 
112  /** @brief Check if the catalog has connections
113 
114  A catalog with connections is always started. */
115 virtual BWORD connected(void) const override
116  { return m_connected.load(std::memory_order_relaxed); }
117 
118  /** @brief Indicate how long the server may remain quiet
119 
120  If the server has a gap between flushes exceeding
121  this time limit, the clients may drop the connection.
122  They may try to reconnect with the current
123  connection settings.
124  */
125 virtual void setConnectionTimeout(Real a_seconds);
126 
127  /** @brief Indicate the end of an atomic set of changes
128 
129  This may provoke a transmission of buffered data.
130 
131  Receiving connections should use this to increment
132  the StateCatalog flush count and
133  make a new Snapshot available.
134  */
135 virtual Result flush(void) override;
136 
137 virtual Result waitForUpdate(I32& a_rFlushCount,I32& a_rSpins,
138  volatile BWORD& a_rKeepWaiting,I32 a_microSleep) override;
139 virtual Result lockAfterUpdate(I32& a_rFlushCount,I32& a_rSpins,
140  volatile BWORD& a_rKeepWaiting,I32 a_microSleep) override;
141 
142  protected:
143 
144 virtual Result preGet(String a_name,String a_property) const override;
145 virtual Result postSet(String a_name,String a_property) override;
146 
147 virtual Result connect(void);
148 virtual Result disconnect(void);
149 virtual Result dropConnection(void);
150 
151 virtual Result connectAsServer(String a_address,U16 a_port);
152 virtual Result connectAsClient(String a_address,U16 a_port);
153 
154 virtual void broadcastUpdate(String a_name,String a_property);
155 virtual void heartbeat(void);
156 virtual void service(void);
157 
158 virtual void broadcastSelect(String a_name,String a_property,
159  String a_message,
160  I32 a_includeCount,const String* a_pIncludes,
161  I32 a_excludeCount,const String* a_pExcludes,
162  const U8* a_pRawBytes=NULL,I32 a_byteCount=0) =0;
163 
164 virtual BWORD useBinaryForType(String a_type) const;
165 
166  void broadcastMessage(String a_name,String a_message);
167 
168  void broadcastTick(void);
169 
170  void timerRestart(void);
171  BWORD timerReached(void) const;
172  Real timerSeconds(void) const;
173 
174  void tickerRestart(void);
175  BWORD tickerReached(void) const;
176  Real tickerSeconds(void) const;
177 
178  const String& role(void) const { return m_role; }
179 
180  bool aborting(void) const
181  { return m_aborting.load(std::memory_order_relaxed); }
182 
183  void requestDisconnect(void) { m_disconnecting=true; }
184  bool disconnecting(void) const
185  { return m_disconnecting.load(
186  std::memory_order_relaxed); }
187 
188  void stall(void) const;
189  void microPause(I32 a_microSeconds) const;
190  void yield(void) const;
191  void serviceYield(void) const;
192 
193  enum Command
194  {
195  e_unknown=0,
196  e_update,
197  e_signal,
198  e_remove
199  };
200 
201  void queueRepeat(Command a_command,
202  String a_name,String a_property,
203  String a_message,String a_source,
204  U8* a_pRawBytes,I32 a_byteCount);
205  Result flushRepeats(BWORD a_signalsOnly);
206 
207  void update(Command a_command, String a_source,
208  String a_key,String a_property,
209  String a_type,String a_text,
210  const U8* a_pRawBytes=NULL,I32 a_byteCount=0);
211 
212  class Identity
213  {
214  public:
215  Identity(void):
216  m_initiated(FALSE) {}
217  void reset(void)
218  {
219  memset(m_data,0,
220  FE_CNC_IDENTITY_BYTES*sizeof(U8));
221  m_text="";
222  m_initiated=FALSE;
223  }
224  Identity& operator=(const Identity& a_rOther)
225  {
226  memcpy(m_data,a_rOther.m_data,
227  FE_CNC_IDENTITY_BYTES);
228  m_text=a_rOther.m_text;
229  m_initiated=a_rOther.m_initiated;
230  return *this;
231  }
232  bool operator<(const Identity& a_rOther) const
233  { return (memcmp(m_data,a_rOther.m_data,
234  FE_CNC_IDENTITY_BYTES)<0); }
235  bool operator==(const Identity& a_rOther) const
236  { return !memcmp(m_data,a_rOther.m_data,
237  FE_CNC_IDENTITY_BYTES); }
238  bool operator==(const String& a_rText) const
239  { return (m_text==a_rText); }
240 
241  U8 m_data[FE_CNC_IDENTITY_BYTES];
242  String m_text;
243  BWORD m_initiated;
244  };
245 
246 virtual Result removeIdentity(I32 a_index);
247 virtual Result removeIdentity(const Identity& a_rIdentity);
248 
249  Result catchUpAsNeeded(void);
250 virtual Result catchUp(Identity& a_rIdentity);
251 
252  class KeyProperty
253  {
254  public:
255 
256  class Bytes: public Counted
257  {
258  public:
259  Bytes(void):
260  m_pRawBytes(NULL) {}
261  ~Bytes(void)
262  { if(m_pRawBytes) free(m_pRawBytes); }
263 
264  U8* m_pRawBytes;
265  };
266 
267  KeyProperty(void):
268  m_command(e_unknown),
269  m_byteCount(0) {}
270 
271  KeyProperty(const KeyProperty& a_keyProperty)
272  {
273  m_command=a_keyProperty.m_command;
274  m_key=a_keyProperty.m_key;
275  m_property=a_keyProperty.m_property;
276  m_message=a_keyProperty.m_message;
277  m_source=a_keyProperty.m_source;
278  m_spBytes=a_keyProperty.m_spBytes;
279  m_byteCount=a_keyProperty.m_byteCount;
280  }
281 
282  KeyProperty(Command a_command,
283  String a_key,String a_property,
284  String a_message="",String a_source="",
285  U8* a_pRawBytes=NULL,I32 a_byteCount=0)
286  { set(a_command,a_key,a_property,a_message,a_source,
287  a_pRawBytes,a_byteCount); }
288 
289  void set(Command a_command,
290  String a_key,String a_property,
291  String a_message="",String a_source="",
292  U8* a_pRawBytes=NULL,I32 a_byteCount=0)
293  {
294  m_command=a_command;
295  m_key=a_key;
296  m_property=a_property;
297  m_message=a_message;
298  m_source=a_source;
299  m_byteCount=a_byteCount;
300 
301  if(a_pRawBytes)
302  {
303  m_spBytes=new Bytes;
304  m_spBytes->m_pRawBytes=a_pRawBytes;
305  }
306  else
307  {
308  m_spBytes=NULL;
309  }
310  }
311 
312  Command m_command;
313  String m_key;
314  String m_property;
315  String m_message;
316  String m_source;
317  sp<Bytes> m_spBytes;
318  I32 m_byteCount;
319  };
320 
321  Identity m_identityArray[FE_CNC_MAX_CLIENTS];
322  I32 m_identityCount;
323 
324  BWORD m_lockOnUpdate;
325 
326  private:
327 
328  void joinThreads(void);
329 
330  class KeyMessage
331  {
332  public:
333  KeyMessage(void) {}
334  KeyMessage(const KeyMessage& a_keyMessage)
335  { m_key=a_keyMessage.m_key;
336  m_message=a_keyMessage.m_message; }
337  KeyMessage(String a_key,String a_property):
338  m_key(a_key),
339  m_message(a_property) {}
340  String m_key;
341  String m_message;
342  };
343 
344  class Change
345  {
346  public:
347  Change(void):
348  m_byteCount(0),
349  m_pRawBytes(NULL) {}
350  Change(const Change& a_change)
351  { m_command=a_change.m_command;
352  m_source=a_change.m_source;
353  m_key=a_change.m_key;
354  m_property=a_change.m_property;
355  m_type=a_change.m_type;
356  m_text=a_change.m_text;
357  m_byteCount=a_change.m_byteCount;
358  m_pRawBytes=a_change.m_pRawBytes; }
359  Command m_command;
360  String m_source;
361  String m_key;
362  String m_property;
363  String m_type;
364  String m_text;
365  I32 m_byteCount;
366  U8* m_pRawBytes;
367  };
368 
369  class FE_DL_EXPORT ConnectionTask: public Thread::Functor
370  {
371  public:
372  ConnectionTask(void);
373  virtual ~ConnectionTask(void);
374  virtual void operate(void);
375 
376  ConnectedCatalog* m_pConnectedCatalog;
377  };
378 
379  class FE_DL_EXPORT ServiceTask: public Thread::Functor
380  {
381  public:
382  ServiceTask(void);
383  virtual ~ServiceTask(void);
384  virtual void operate(void);
385 
386  ConnectedCatalog* m_pConnectedCatalog;
387  };
388 
389  Result m_connectionResult;
390  Thread* m_pConnectionThread;
391  ConnectionTask m_connectionTask;
392 
393  Thread* m_pServiceThread;
394  ServiceTask m_serviceTask;
395 
396  String m_role;
397  String m_clientID;
398  String m_listenKey;
399 
400  std::atomic<int> m_signalsQueued;
401  std::atomic<bool> m_started;
402  std::atomic<bool> m_connected;
403  std::atomic<bool> m_aborting;
404  std::atomic<bool> m_disconnecting;
405 
406  Array<KeyProperty> m_flushKeys;
407  Array<KeyMessage> m_flushMessages;
408  Array<Change> m_changes;
409 
410  std::map<String, Array<KeyProperty> > m_repeatQueueMap;
411 
412  std::chrono::steady_clock::time_point m_timerStart;
413  Real m_timerLimit;
414 
415  std::chrono::steady_clock::time_point m_tickerStart;
416  Real m_tickerLimit;
417 
418  RecursiveMutex m_startStopMutex;
419 };
420 
421 } /* namespace ext */
422 } /* namespace fe */
423 
424 #endif /* __network_ConnectedCatalog_h__ */
425 
virtual BWORD started(void) const override
Check if the catalog wants to connect.
Definition: ConnectedCatalog.h:109
Heap-based support for classes participating in fe::ptr <>
Definition: Counted.h:35
kernel
Definition: namespace.dox:3
Per-class participation in the Initialized <> mechanism.
Definition: Initialized.h:117
StateCatalog with connected mirroring.
Definition: ConnectedCatalog.h:70
BWORD operator==(const DualString &s1, const DualString &s2)
Compare two DualString&#39;s.
Definition: DualString.h:208
Catalog with extensible mirroring.
Definition: StateCatalog.h:29
Automatically reference-counted string container.
Definition: String.h:128
Result
Generalized return value indicating success or failure, and why.
Definition: Result.h:24
virtual BWORD connected(void) const override
Check if the catalog has connections.
Definition: ConnectedCatalog.h:115