7 #ifndef __network_ConnectedCatalog_h__ 8 #define __network_ConnectedCatalog_h__ 15 #define FE_CNC_MAX_CLIENTS 1024 //* avoid realloc 16 #define FE_CNC_IDENTITY_BYTES 5 79 void initialize(
void);
89 virtual Result start(
void)
override;
97 virtual Result stop(
void)
override;
103 virtual Result waitForConnection(
void)
override;
110 {
return m_started.load(std::memory_order_relaxed); }
116 {
return m_connected.load(std::memory_order_relaxed); }
125 virtual void setConnectionTimeout(Real a_seconds);
135 virtual Result flush(
void)
override;
137 virtual Result waitForUpdate(I32& a_rFlushCount,I32& a_rSpins,
138 volatile BWORD& a_rKeepWaiting,I32 a_microSleep)
override;
139 virtual Result lockAfterUpdate(I32& a_rFlushCount,I32& a_rSpins,
140 volatile BWORD& a_rKeepWaiting,I32 a_microSleep)
override;
147 virtual Result connect(
void);
148 virtual Result disconnect(
void);
149 virtual Result dropConnection(
void);
151 virtual Result connectAsServer(
String a_address,U16 a_port);
152 virtual Result connectAsClient(
String a_address,U16 a_port);
154 virtual void broadcastUpdate(
String a_name,
String a_property);
155 virtual void heartbeat(
void);
156 virtual void service(
void);
158 virtual void broadcastSelect(
String a_name,
String a_property,
160 I32 a_includeCount,
const String* a_pIncludes,
161 I32 a_excludeCount,
const String* a_pExcludes,
162 const U8* a_pRawBytes=NULL,I32 a_byteCount=0) =0;
164 virtual BWORD useBinaryForType(
String a_type)
const;
168 void broadcastTick(
void);
170 void timerRestart(
void);
171 BWORD timerReached(
void)
const;
172 Real timerSeconds(
void)
const;
174 void tickerRestart(
void);
175 BWORD tickerReached(
void)
const;
176 Real tickerSeconds(
void)
const;
178 const String& role(
void)
const {
return m_role; }
180 bool aborting(
void)
const 181 {
return m_aborting.load(std::memory_order_relaxed); }
183 void requestDisconnect(
void) { m_disconnecting=
true; }
184 bool disconnecting(
void)
const 185 {
return m_disconnecting.load(
186 std::memory_order_relaxed); }
188 void stall(
void)
const;
189 void microPause(I32 a_microSeconds)
const;
190 void yield(
void)
const;
191 void serviceYield(
void)
const;
201 void queueRepeat(Command a_command,
204 U8* a_pRawBytes,I32 a_byteCount);
205 Result flushRepeats(BWORD a_signalsOnly);
207 void update(Command a_command,
String a_source,
210 const U8* a_pRawBytes=NULL,I32 a_byteCount=0);
216 m_initiated(FALSE) {}
220 FE_CNC_IDENTITY_BYTES*
sizeof(U8));
224 Identity& operator=(
const Identity& a_rOther)
226 memcpy(m_data,a_rOther.m_data,
227 FE_CNC_IDENTITY_BYTES);
228 m_text=a_rOther.m_text;
229 m_initiated=a_rOther.m_initiated;
232 bool operator<(
const Identity& a_rOther)
const 233 {
return (memcmp(m_data,a_rOther.m_data,
234 FE_CNC_IDENTITY_BYTES)<0); }
235 bool operator==(
const Identity& a_rOther)
const 236 {
return !memcmp(m_data,a_rOther.m_data,
237 FE_CNC_IDENTITY_BYTES); }
239 {
return (m_text==a_rText); }
241 U8 m_data[FE_CNC_IDENTITY_BYTES];
246 virtual Result removeIdentity(I32 a_index);
247 virtual Result removeIdentity(
const Identity& a_rIdentity);
249 Result catchUpAsNeeded(
void);
250 virtual Result catchUp(Identity& a_rIdentity);
262 {
if(m_pRawBytes) free(m_pRawBytes); }
268 m_command(e_unknown),
271 KeyProperty(
const KeyProperty& a_keyProperty)
273 m_command=a_keyProperty.m_command;
274 m_key=a_keyProperty.m_key;
275 m_property=a_keyProperty.m_property;
276 m_message=a_keyProperty.m_message;
277 m_source=a_keyProperty.m_source;
278 m_spBytes=a_keyProperty.m_spBytes;
279 m_byteCount=a_keyProperty.m_byteCount;
282 KeyProperty(Command a_command,
285 U8* a_pRawBytes=NULL,I32 a_byteCount=0)
286 {
set(a_command,a_key,a_property,a_message,a_source,
287 a_pRawBytes,a_byteCount); }
289 void set(Command a_command,
292 U8* a_pRawBytes=NULL,I32 a_byteCount=0)
296 m_property=a_property;
299 m_byteCount=a_byteCount;
304 m_spBytes->m_pRawBytes=a_pRawBytes;
321 Identity m_identityArray[FE_CNC_MAX_CLIENTS];
324 BWORD m_lockOnUpdate;
328 void joinThreads(
void);
334 KeyMessage(
const KeyMessage& a_keyMessage)
335 { m_key=a_keyMessage.m_key;
336 m_message=a_keyMessage.m_message; }
339 m_message(a_property) {}
350 Change(
const Change& a_change)
351 { m_command=a_change.m_command;
352 m_source=a_change.m_source;
353 m_key=a_change.m_key;
354 m_property=a_change.m_property;
355 m_type=a_change.m_type;
356 m_text=a_change.m_text;
357 m_byteCount=a_change.m_byteCount;
358 m_pRawBytes=a_change.m_pRawBytes; }
369 class FE_DL_EXPORT ConnectionTask:
public Thread::Functor
372 ConnectionTask(
void);
373 virtual ~ConnectionTask(
void);
374 virtual void operate(
void);
379 class FE_DL_EXPORT ServiceTask:
public Thread::Functor
383 virtual ~ServiceTask(
void);
384 virtual void operate(
void);
389 Result m_connectionResult;
390 Thread* m_pConnectionThread;
391 ConnectionTask m_connectionTask;
393 Thread* m_pServiceThread;
394 ServiceTask m_serviceTask;
400 std::atomic<int> m_signalsQueued;
401 std::atomic<bool> m_started;
402 std::atomic<bool> m_connected;
403 std::atomic<bool> m_aborting;
404 std::atomic<bool> m_disconnecting;
410 std::map<String, Array<KeyProperty> > m_repeatQueueMap;
412 std::chrono::steady_clock::time_point m_timerStart;
415 std::chrono::steady_clock::time_point m_tickerStart;
418 RecursiveMutex m_startStopMutex;
virtual BWORD started(void) const override
Check if the catalog wants to connect.
Definition: ConnectedCatalog.h:109
Heap-based support for classes participating in fe::ptr <>
Definition: Counted.h:35
kernel
Definition: namespace.dox:3
Per-class participation in the Initialized <> mechanism.
Definition: Initialized.h:117
StateCatalog with connected mirroring.
Definition: ConnectedCatalog.h:70
BWORD operator==(const DualString &s1, const DualString &s2)
Compare two DualString's.
Definition: DualString.h:208
Catalog with extensible mirroring.
Definition: StateCatalog.h:29
Automatically reference-counted string container.
Definition: String.h:128
virtual BWORD connected(void) const override
Check if the catalog has connections.
Definition: ConnectedCatalog.h:115