Ocean
PackagedConnectionlessServer.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) Meta Platforms, Inc. and affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  */
7 
8 #ifndef FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
9 #define FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
10 
11 #include "ocean/network/Network.h"
13 #include "ocean/network/Server.h"
14 
15 #include "ocean/base/Callback.h"
16 #include "ocean/base/Timestamp.h"
17 
18 namespace Ocean
19 {
20 
21 namespace Network
22 {
23 
24 /**
25  * This class is the base class for all package connectionless server.
26  * @ingroup network
27  */
28 class OCEAN_NETWORK_EXPORT PackagedConnectionlessServer :
29  virtual public PackagedConnectionlessClient,
30  virtual public Server
31 {
32  public:
33 
34  /**
35  * Definition of a data callback function.
36  * Parameter 0 provides the address of the sender.<br>
37  * Parameter 1 provides the port of the sender.<br>
38  * Parameter 2 provides the received buffer, which must be copied, nullptr if the message could not be delivered correctly
39  * Parameter 3 provides the size of the received buffer, in bytes; 0 if the message could not be delivered correctly
40  * Parameter 4 provides the id of the message to which the received buffer belongs
41  */
43 
44  protected:
45 
46  /**
47  * This class implements a message.
48  */
50  {
51  public:
52 
53  /**
54  * Creates an empty data object.
55  */
56  inline MessageData() = default;
57 
58  /**
59  * Copy constructor.
60  * @param messageData Message object to be copied
61  */
62  inline MessageData(const MessageData& messageData);
63 
64  /**
65  * Move constructor.
66  * @param messageData Message object to be moved
67  */
68  inline MessageData(MessageData&& messageData) noexcept;
69 
70  /**
71  * Creates a new message data object.
72  * @param retireTimestamp The timestamp at which this message will be retired as no further packages arrived in the meantime.
73  * @param size The size of the entire message, in bytes
74  * @param remainingPackages The number of packages in which the entire message is divided
75  */
76  inline MessageData(const Timestamp retireTimestamp, const size_t size, const unsigned int remainingPackages);
77 
78  /**
79  * Returns the retire timestamp.
80  * @return Retire timestamp
81  */
82  inline Timestamp retireTimestamp() const;
83 
84  /**
85  * Returns the number of remaining packages.
86  * @return Remaining packages
87  */
88  inline unsigned int remainingPackages() const;
89 
90  /**
91  * Returns the size of the message buffer, in bytes.
92  * @return The number of bytes of the message buffer
93  */
94  inline size_t size() const;
95 
96  /**
97  * Returns the message buffer.
98  * @return Message buffer
99  */
100  inline const uint8_t* buffer() const;
101 
102  /**
103  * Returns the message buffer.
104  * @return Message buffer
105  */
106  inline uint8_t* buffer();
107 
108  /**
109  * Sets or changes the retire timestamp.
110  * @param timestamp The new timestamp
111  */
112  inline void setRetireTimestamp(const Timestamp timestamp);
113 
114  /**
115  * Sets the number of packages which are still missing.
116  * @param packages The number of remaining packages
117  */
118  inline void setRemaininigPackages(const unsigned int packages);
119 
120  /**
121  * Assign operator.
122  * @param messageData Message data object to copy
123  * @return Reference to this object
124  */
125  inline MessageData& operator=(const MessageData& messageData);
126 
127  /**
128  * Move operator.
129  * @param messageData Message data object to move
130  * @return Reference to this object
131  */
132  inline MessageData& operator=(MessageData&& messageData) noexcept;
133 
134  protected:
135 
136  /// The timestamp at which this message will be retired as no further packages arrived in the meantime.
137  Timestamp retireTimestamp_ = Timestamp(false);
138 
139  /// The number of packages which are still missing.
140  unsigned int remainingPackages_ = 0u;
141 
142  /// The entire buffer of the message.
144  };
145 
146  /**
147  * Definition of a triple storing an address, a port and a message id.
148  */
149  class Triple
150  {
151  public:
152 
153  /**
154  * Creates a new triple object.
155  * @param address The address of the object
156  * @param port The port of the object
157  * @param messageId The id of the object
158  */
159  inline Triple(const Address4& address, const Port& port, const MessageId messageId);
160 
161  /**
162  * Returns the address of this object.
163  * @return The address
164  */
165  inline const Address4& address() const;
166 
167  /**
168  * Returns the port of this object.
169  * @return The port
170  */
171  inline const Port& port() const;
172 
173  /**
174  * Returns the message id of this object.
175  * @return The message id
176  */
177  inline MessageId messageId() const;
178 
179  /**
180  * Compares two triple objects.
181  * @param triple The second triple object
182  * @return True, if the left object is lesser than the right one
183  */
184  inline bool operator<(const Triple& triple) const;
185 
186  protected:
187 
188  /// The address of this object.
190 
191  /// The port of this object.
193 
194  /// The message id of this object.
195  unsigned int messageId_ = invalidMessageId();
196  };
197 
198  /**
199  * Definition of a map mapping message ids to massage data objects.
200  */
201  typedef std::map<Triple, MessageData> MessageMap;
202 
203  public:
204 
205  /**
206  * Destructs a connectionless server object.
207  */
209 
210  /**
211  * Sets the receive data callback function.
212  * @param callback the callback function to be called if a new message arrives.
213  */
214  inline void setReceiveCallback(const ReceiveCallback& callback);
215 
216  protected:
217 
218  /**
219  * Creates a new connectionless server object.
220  */
222 
223  /**
224  * The scheduler event function.
225  * Socket::onScheduler().
226  */
227  bool onScheduler() override;
228 
229  protected:
230 
231  /// Data callback function called on new message arrivals.
233 
234  /// The time between the first package of a large message and the decision to retire the message if still packages are missing, in seconds.
235  double maximalMessageTime_ = 5.0;
236 
237  /// Intermediate buffer storing individual parts of a large message.
239 
240  /// The map holding all partially received message.
242 };
243 
245  retireTimestamp_(messageData.retireTimestamp_),
246  remainingPackages_(messageData.remainingPackages_),
247  buffer_(messageData.buffer_)
248 {
249  // nothing to do here
250 }
251 
253  retireTimestamp_(messageData.retireTimestamp_),
254  remainingPackages_(messageData.remainingPackages_),
255  buffer_(std::move(messageData.buffer_))
256 {
257  messageData.retireTimestamp_.toInvalid();
258  messageData.remainingPackages_ = 0u;
259 }
260 
261 inline PackagedConnectionlessServer::MessageData::MessageData(const Timestamp retireTimestamp, const size_t size, const unsigned int remainingPackages) :
262  retireTimestamp_(retireTimestamp),
263  remainingPackages_(remainingPackages),
264  buffer_(size)
265 {
266  ocean_assert(buffer_.empty() || remainingPackages_ != 0u);
267 }
268 
270 {
271  return retireTimestamp_;
272 }
273 
275 {
276  return remainingPackages_;
277 }
278 
280 {
281  return buffer_.size();
282 }
283 
285 {
286  return buffer_.data();
287 }
288 
290 {
291  return buffer_.data();
292 }
293 
295 {
296  retireTimestamp_ = timestamp;
297 }
298 
300 {
301  remainingPackages_ = packages;
302 }
303 
305 {
306  retireTimestamp_ = messageData.retireTimestamp_;
307  remainingPackages_ = messageData.remainingPackages_;
308  buffer_ = messageData.buffer_;
309 
310  return *this;
311 }
312 
314 {
315  if (this != &messageData)
316  {
317  retireTimestamp_ = messageData.retireTimestamp_;
318  remainingPackages_ = messageData.remainingPackages_;
319  buffer_ = std::move(messageData.buffer_);
320 
321  messageData.retireTimestamp_.toInvalid();
322  messageData.remainingPackages_ = 0u;
323  }
324 
325  return *this;
326 }
327 
329  address_(address),
330  port_(port),
331  messageId_(messageId)
332 {
333  // nothing to do here
334 }
335 
337 {
338  return address_;
339 }
340 
342 {
343  return port_;
344 }
345 
347 {
348  return messageId_;
349 }
350 
352 {
353  return address_ < triple.address_ || (address_ == triple.address_ && (port_ < triple.port_ || (port_ == triple.port_ && messageId_ < triple.messageId_)));
354 }
355 
357 {
358  const ScopedLock scopedLock(lock_);
359 
360  receiveCallback_ = callback;
361 }
362 
363 }
364 
365 }
366 
367 #endif // FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
This class wraps an address number with 32 bits.
Definition: Address4.h:26
This class is the base class for all packaged connectionless clients.
Definition: PackagedConnectionlessClient.h:31
This class implements a message.
Definition: PackagedConnectionlessServer.h:50
unsigned int remainingPackages() const
Returns the number of remaining packages.
Definition: PackagedConnectionlessServer.h:274
const uint8_t * buffer() const
Returns the message buffer.
Definition: PackagedConnectionlessServer.h:284
Buffer buffer_
The entire buffer of the message.
Definition: PackagedConnectionlessServer.h:143
MessageData & operator=(const MessageData &messageData)
Assign operator.
Definition: PackagedConnectionlessServer.h:304
void setRetireTimestamp(const Timestamp timestamp)
Sets or changes the retire timestamp.
Definition: PackagedConnectionlessServer.h:294
void setRemaininigPackages(const unsigned int packages)
Sets the number of packages which are still missing.
Definition: PackagedConnectionlessServer.h:299
Timestamp retireTimestamp() const
Returns the retire timestamp.
Definition: PackagedConnectionlessServer.h:269
Timestamp retireTimestamp_
The timestamp at which this message will be retired as no further packages arrived in the meantime.
Definition: PackagedConnectionlessServer.h:137
MessageData()=default
Creates an empty data object.
size_t size() const
Returns the size of the message buffer, in bytes.
Definition: PackagedConnectionlessServer.h:279
unsigned int remainingPackages_
The number of packages which are still missing.
Definition: PackagedConnectionlessServer.h:140
Definition of a triple storing an address, a port and a message id.
Definition: PackagedConnectionlessServer.h:150
Triple(const Address4 &address, const Port &port, const MessageId messageId)
Creates a new triple object.
Definition: PackagedConnectionlessServer.h:328
bool operator<(const Triple &triple) const
Compares two triple objects.
Definition: PackagedConnectionlessServer.h:351
unsigned int messageId_
The message id of this object.
Definition: PackagedConnectionlessServer.h:195
const Address4 & address() const
Returns the address of this object.
Definition: PackagedConnectionlessServer.h:336
MessageId messageId() const
Returns the message id of this object.
Definition: PackagedConnectionlessServer.h:346
const Port & port() const
Returns the port of this object.
Definition: PackagedConnectionlessServer.h:341
Address4 address_
The address of this object.
Definition: PackagedConnectionlessServer.h:189
Port port_
The port of this object.
Definition: PackagedConnectionlessServer.h:192
This class is the base class for all package connectionless server.
Definition: PackagedConnectionlessServer.h:31
bool onScheduler() override
The scheduler event function.
ReceiveCallback receiveCallback_
Data callback function called on new message arrivals.
Definition: PackagedConnectionlessServer.h:232
~PackagedConnectionlessServer() override
Destructs a connectionless server object.
void setReceiveCallback(const ReceiveCallback &callback)
Sets the receive data callback function.
Definition: PackagedConnectionlessServer.h:356
Callback< void, const Address4 &, const Port &, const void *, const size_t, const MessageId > ReceiveCallback
Definition of a data callback function.
Definition: PackagedConnectionlessServer.h:42
MessageMap connectionlessServerMessageMap
The map holding all partially received message.
Definition: PackagedConnectionlessServer.h:241
Buffer packageBuffer_
Intermediate buffer storing individual parts of a large message.
Definition: PackagedConnectionlessServer.h:238
std::map< Triple, MessageData > MessageMap
Definition of a map mapping message ids to massage data objects.
Definition: PackagedConnectionlessServer.h:201
PackagedConnectionlessServer()
Creates a new connectionless server object.
uint32_t MessageId
Definition of a message id.
Definition: PackagedSocket.h:185
This class wraps a port number with 16 bits.
Definition: Port.h:26
This class is the base class for all server.
Definition: Server.h:26
Buffer buffer_
The socket buffer of this server.
Definition: Server.h:57
Address4 address() const
Returns the own address of this socket.
std::vector< uint8_t > Buffer
Definition of a vector holding 8 bit values.
Definition: Socket.h:76
Port port() const
Returns the own port of this socket.
Lock lock_
Socket lock.
Definition: Socket.h:187
This class implements a scoped lock object for recursive lock objects.
Definition: Lock.h:135
This class implements a timestamp.
Definition: Timestamp.h:36
The namespace covering the entire Ocean framework.
Definition: Accessor.h:15