Ocean
Loading...
Searching...
No Matches
PackagedConnectionlessServer.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#ifndef FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
9#define FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
10
14
15#include "ocean/base/Callback.h"
17
18namespace Ocean
19{
20
21namespace Network
22{
23
24/**
25 * This class is the base class for all package connectionless server.
26 * @ingroup network
27 */
28class OCEAN_NETWORK_EXPORT PackagedConnectionlessServer :
29 virtual public PackagedConnectionlessClient,
30 virtual public Server
31{
32 public:
33
34 /**
35 * Definition of a data callback function.
36 * Parameter 0 provides the address of the sender.<br>
37 * Parameter 1 provides the port of the sender.<br>
38 * Parameter 2 provides the received buffer, which must be copied, nullptr if the message could not be delivered correctly
39 * Parameter 3 provides the size of the received buffer, in bytes; 0 if the message could not be delivered correctly
40 * Parameter 4 provides the id of the message to which the received buffer belongs
41 */
43
44 protected:
45
46 /**
47 * This class implements a message.
48 */
50 {
51 public:
52
53 /**
54 * Creates an empty data object.
55 */
56 inline MessageData() = default;
57
58 /**
59 * Copy constructor.
60 * @param messageData Message object to be copied
61 */
62 inline MessageData(const MessageData& messageData);
63
64 /**
65 * Move constructor.
66 * @param messageData Message object to be moved
67 */
68 inline MessageData(MessageData&& messageData) noexcept;
69
70 /**
71 * Creates a new message data object.
72 * @param retireTimestamp The timestamp at which this message will be retired as no further packages arrived in the meantime.
73 * @param size The size of the entire message, in bytes
74 * @param remainingPackages The number of packages in which the entire message is divided
75 */
76 inline MessageData(const Timestamp retireTimestamp, const size_t size, const unsigned int remainingPackages);
77
78 /**
79 * Returns the retire timestamp.
80 * @return Retire timestamp
81 */
82 inline Timestamp retireTimestamp() const;
83
84 /**
85 * Returns the number of remaining packages.
86 * @return Remaining packages
87 */
88 inline unsigned int remainingPackages() const;
89
90 /**
91 * Returns the size of the message buffer, in bytes.
92 * @return The number of bytes of the message buffer
93 */
94 inline size_t size() const;
95
96 /**
97 * Returns the message buffer.
98 * @return Message buffer
99 */
100 inline const uint8_t* buffer() const;
101
102 /**
103 * Returns the message buffer.
104 * @return Message buffer
105 */
106 inline uint8_t* buffer();
107
108 /**
109 * Sets or changes the retire timestamp.
110 * @param timestamp The new timestamp
111 */
112 inline void setRetireTimestamp(const Timestamp timestamp);
113
114 /**
115 * Sets the number of packages which are still missing.
116 * @param packages The number of remaining packages
117 */
118 inline void setRemaininigPackages(const unsigned int packages);
119
120 /**
121 * Assign operator.
122 * @param messageData Message data object to copy
123 * @return Reference to this object
124 */
125 inline MessageData& operator=(const MessageData& messageData);
126
127 /**
128 * Move operator.
129 * @param messageData Message data object to move
130 * @return Reference to this object
131 */
132 inline MessageData& operator=(MessageData&& messageData) noexcept;
133
134 protected:
135
136 /// The timestamp at which this message will be retired as no further packages arrived in the meantime.
137 Timestamp retireTimestamp_ = Timestamp(false);
138
139 /// The number of packages which are still missing.
140 unsigned int remainingPackages_ = 0u;
141
142 /// The entire buffer of the message.
144 };
145
146 /**
147 * Definition of a triple storing an address, a port and a message id.
148 */
149 class Triple
150 {
151 public:
152
153 /**
154 * Creates a new triple object.
155 * @param address The address of the object
156 * @param port The port of the object
157 * @param messageId The id of the object
158 */
159 inline Triple(const Address4& address, const Port& port, const MessageId messageId);
160
161 /**
162 * Returns the address of this object.
163 * @return The address
164 */
165 inline const Address4& address() const;
166
167 /**
168 * Returns the port of this object.
169 * @return The port
170 */
171 inline const Port& port() const;
172
173 /**
174 * Returns the message id of this object.
175 * @return The message id
176 */
177 inline MessageId messageId() const;
178
179 /**
180 * Compares two triple objects.
181 * @param triple The second triple object
182 * @return True, if the left object is lesser than the right one
183 */
184 inline bool operator<(const Triple& triple) const;
185
186 protected:
187
188 /// The address of this object.
190
191 /// The port of this object.
193
194 /// The message id of this object.
195 unsigned int messageId_ = invalidMessageId();
196 };
197
198 /**
199 * Definition of a map mapping message ids to massage data objects.
200 */
201 typedef std::map<Triple, MessageData> MessageMap;
202
203 public:
204
205 /**
206 * Destructs a connectionless server object.
207 */
209
210 /**
211 * Sets the receive data callback function.
212 * @param callback the callback function to be called if a new message arrives.
213 */
214 inline void setReceiveCallback(const ReceiveCallback& callback);
215
216 protected:
217
218 /**
219 * Creates a new connectionless server object.
220 */
222
223 /**
224 * The scheduler event function.
225 * Socket::onScheduler().
226 */
227 bool onScheduler() override;
228
229 protected:
230
231 /// Data callback function called on new message arrivals.
233
234 /// The time between the first package of a large message and the decision to retire the message if still packages are missing, in seconds.
235 double maximalMessageTime_ = 5.0;
236
237 /// Intermediate buffer storing individual parts of a large message.
239
240 /// The map holding all partially received message.
242};
243
245 retireTimestamp_(messageData.retireTimestamp_),
246 remainingPackages_(messageData.remainingPackages_),
247 buffer_(messageData.buffer_)
248{
249 // nothing to do here
250}
251
253 retireTimestamp_(messageData.retireTimestamp_),
254 remainingPackages_(messageData.remainingPackages_),
255 buffer_(std::move(messageData.buffer_))
256{
257 messageData.retireTimestamp_.toInvalid();
258 messageData.remainingPackages_ = 0u;
259}
260
261inline PackagedConnectionlessServer::MessageData::MessageData(const Timestamp retireTimestamp, const size_t size, const unsigned int remainingPackages) :
262 retireTimestamp_(retireTimestamp),
263 remainingPackages_(remainingPackages),
264 buffer_(size)
265{
266 ocean_assert(buffer_.empty() || remainingPackages_ != 0u);
267}
268
270{
271 return retireTimestamp_;
272}
273
275{
276 return remainingPackages_;
277}
278
280{
281 return buffer_.size();
282}
283
285{
286 return buffer_.data();
287}
288
290{
291 return buffer_.data();
292}
293
295{
296 retireTimestamp_ = timestamp;
297}
298
300{
301 remainingPackages_ = packages;
302}
303
305{
306 retireTimestamp_ = messageData.retireTimestamp_;
307 remainingPackages_ = messageData.remainingPackages_;
308 buffer_ = messageData.buffer_;
309
310 return *this;
311}
312
314{
315 if (this != &messageData)
316 {
317 retireTimestamp_ = messageData.retireTimestamp_;
318 remainingPackages_ = messageData.remainingPackages_;
319 buffer_ = std::move(messageData.buffer_);
320
321 messageData.retireTimestamp_.toInvalid();
322 messageData.remainingPackages_ = 0u;
323 }
324
325 return *this;
326}
327
329 address_(address),
330 port_(port),
331 messageId_(messageId)
332{
333 // nothing to do here
334}
335
337{
338 return address_;
339}
340
342{
343 return port_;
344}
345
350
352{
353 return address_ < triple.address_ || (address_ == triple.address_ && (port_ < triple.port_ || (port_ == triple.port_ && messageId_ < triple.messageId_)));
354}
355
357{
358 const ScopedLock scopedLock(lock_);
359
360 receiveCallback_ = callback;
361}
362
363}
364
365}
366
367#endif // FACEBOOK_NETWORK_PACKAGED_CONNECTIONLESS_SERVER_H
This class implements a container for callback functions.
Definition Callback.h:3456
This class wraps an address number with 32 bits.
Definition Address4.h:26
This class is the base class for all packaged connectionless clients.
Definition PackagedConnectionlessClient.h:31
This class implements a message.
Definition PackagedConnectionlessServer.h:50
unsigned int remainingPackages() const
Returns the number of remaining packages.
Definition PackagedConnectionlessServer.h:274
const uint8_t * buffer() const
Returns the message buffer.
Definition PackagedConnectionlessServer.h:284
Buffer buffer_
The entire buffer of the message.
Definition PackagedConnectionlessServer.h:143
MessageData & operator=(const MessageData &messageData)
Assign operator.
Definition PackagedConnectionlessServer.h:304
void setRetireTimestamp(const Timestamp timestamp)
Sets or changes the retire timestamp.
Definition PackagedConnectionlessServer.h:294
void setRemaininigPackages(const unsigned int packages)
Sets the number of packages which are still missing.
Definition PackagedConnectionlessServer.h:299
Timestamp retireTimestamp() const
Returns the retire timestamp.
Definition PackagedConnectionlessServer.h:269
Timestamp retireTimestamp_
The timestamp at which this message will be retired as no further packages arrived in the meantime.
Definition PackagedConnectionlessServer.h:137
MessageData()=default
Creates an empty data object.
size_t size() const
Returns the size of the message buffer, in bytes.
Definition PackagedConnectionlessServer.h:279
unsigned int remainingPackages_
The number of packages which are still missing.
Definition PackagedConnectionlessServer.h:140
Definition of a triple storing an address, a port and a message id.
Definition PackagedConnectionlessServer.h:150
Triple(const Address4 &address, const Port &port, const MessageId messageId)
Creates a new triple object.
Definition PackagedConnectionlessServer.h:328
bool operator<(const Triple &triple) const
Compares two triple objects.
Definition PackagedConnectionlessServer.h:351
unsigned int messageId_
The message id of this object.
Definition PackagedConnectionlessServer.h:195
const Address4 & address() const
Returns the address of this object.
Definition PackagedConnectionlessServer.h:336
MessageId messageId() const
Returns the message id of this object.
Definition PackagedConnectionlessServer.h:346
const Port & port() const
Returns the port of this object.
Definition PackagedConnectionlessServer.h:341
Address4 address_
The address of this object.
Definition PackagedConnectionlessServer.h:189
Port port_
The port of this object.
Definition PackagedConnectionlessServer.h:192
This class is the base class for all package connectionless server.
Definition PackagedConnectionlessServer.h:31
bool onScheduler() override
The scheduler event function.
ReceiveCallback receiveCallback_
Data callback function called on new message arrivals.
Definition PackagedConnectionlessServer.h:232
~PackagedConnectionlessServer() override
Destructs a connectionless server object.
void setReceiveCallback(const ReceiveCallback &callback)
Sets the receive data callback function.
Definition PackagedConnectionlessServer.h:356
Callback< void, const Address4 &, const Port &, const void *, const size_t, const MessageId > ReceiveCallback
Definition of a data callback function.
Definition PackagedConnectionlessServer.h:42
MessageMap connectionlessServerMessageMap
The map holding all partially received message.
Definition PackagedConnectionlessServer.h:241
Buffer packageBuffer_
Intermediate buffer storing individual parts of a large message.
Definition PackagedConnectionlessServer.h:238
std::map< Triple, MessageData > MessageMap
Definition of a map mapping message ids to massage data objects.
Definition PackagedConnectionlessServer.h:201
PackagedConnectionlessServer()
Creates a new connectionless server object.
uint32_t MessageId
Definition of a message id.
Definition PackagedSocket.h:192
This class wraps a port number with 16 bits.
Definition Port.h:26
This class is the base class for all server.
Definition Server.h:26
Buffer buffer_
The socket buffer of this server.
Definition Server.h:57
Address4 address() const
Returns the own address of this socket.
std::vector< uint8_t > Buffer
Definition of a vector holding 8 bit values.
Definition Socket.h:76
Port port() const
Returns the own port of this socket.
Lock lock_
Socket lock.
Definition Socket.h:187
This class implements a scoped lock object for recursive lock objects.
Definition Lock.h:135
This class implements a timestamp.
Definition Timestamp.h:36
The namespace covering the entire Ocean framework.
Definition Accessor.h:15