Ocean
Loading...
Searching...
No Matches
InputDataSerializer.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#ifndef META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
9#define META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
10
13
14#include <functional>
15
16namespace Ocean
17{
18
19namespace IO
20{
21
22namespace Serialization
23{
24
25/**
26 * This class implements an input data serializer.
27 * The input data serializer deserializes data samples from a stream (e.g., file or network) and provides playback functionality with configurable speed.<br>
28 * Before starting playback, factory functions must be registered for each expected sample type so that the serializer can construct the appropriate sample objects when reading from the stream.<br>
29 * Samples for which no factory function is registered are simply skipped during playback.<br>
30 * Samples are returned through the sample() function in playback order, with optional speed control for real-time or accelerated playback.<br>
31 * The class uses a background thread to continuously read and buffer samples, ensuring smooth playback without blocking.
32 * @ingroup ioserialization
33 */
34class OCEAN_IO_SERIALIZATION_EXPORT InputDataSerializer : public DataSerializer
35{
36 public:
37
38 /// Definition of a factory function which creates a data sample based on a sample type.
39 using FactoryFunction = std::function<UniqueDataSample(const std::string& sampleType)>;
40
41 /**
42 * Definition of a callback function which is invoked whenever a new channel is parsed.
43 * @param channel The channel information
44 */
45 using ChannelEventFunction = std::function<void(const Channel& channel)>;
46
47 protected:
48
49 /**
50 * This class implements an abstract stream for input data serializers.
51 */
52 class Stream
53 {
54 public:
55
56 /**
57 * Destructs the stream.
58 */
59 virtual ~Stream() = default;
60
61 /**
62 * Returns the input bitstream.
63 * @return The input bitstream
64 */
66
67 /**
68 * Returns whether this stream is valid.
69 * @return True, if so
70 */
71 virtual bool isValid() const = 0;
72 };
73
74 /// Definition of a unique pointer holding a stream.
75 using UniqueStream = std::unique_ptr<Stream>;
76
77 /// Definition of a map mapping sample types to factory functions.
78 using FactoryFunctionMap = std::unordered_map<std::string, FactoryFunction>;
79
80 /**
81 * This class implements an extended channel with factory function.
82 */
83 class ExtendedChannel : public Channel
84 {
85 public:
86
87 /**
88 * Creates a new invalid extended channel.
89 */
90 ExtendedChannel() = default;
91
92 /**
93 * Creates a new extended channel with given channel and factory function.
94 * @param channel The channel
95 * @param factoryFunction The factory function
96 */
97 inline ExtendedChannel(const Channel& channel, const FactoryFunction& factoryFunction);
98
99 public:
100
101 /// The factory function for creating data samples.
103 };
104
105 /// Definition of a map mapping channel ids to extended channels.
106 using ExtendedChannelMap = std::unordered_map<ChannelId, ExtendedChannel>;
107
108 /**
109 * Comparator for SamplePair that orders by playback timestamp (min-heap).
110 * Samples with smaller playback timestamps have higher priority.
111 */
113 {
114 /**
115 * Compares two sample pairs by their playback timestamps.
116 * @param sampleA The first sample pair
117 * @param sampleB The second sample pair
118 * @return True if 'sampleA' should come after 'sampleB' (i.e., 'sampleA' has a larger timestamp)
119 */
120 inline bool operator()(const SamplePair& sampleA, const SamplePair& sampleB) const;
121 };
122
123 /**
124 * A priority queue that allows moving elements out.
125 * This extends std::priority_queue to provide access to the underlying container
126 * for proper move semantics when popping elements.
127 */
128 class SampleQueue : public std::priority_queue<SamplePair, std::vector<SamplePair>, SamplePairComparator>
129 {
130 public:
131
132 /**
133 * Pops the top element from the queue.
134 * @return The top element, moved out of the queue
135 */
136 inline SamplePair popTop();
137 };
138
139 public:
140
141 /**
142 * Initializes the input data serializer.
143 * The serializer will create the input stream and read the header.
144 * Optionally, the serializer can pre-parse all channels before returning.
145 * @param preparsedChannels Optional output parameter to receive all channels that were pre-parsed, nullptr if not of interest
146 * @param isStreamCorrupted Optional output parameter to receive whether the stream is corrupted (missing end-of-stream indication), nullptr if not of interest
147 * @return True, if succeeded
148 */
149 bool initialize(Channels* preparsedChannels = nullptr, bool* isStreamCorrupted = nullptr);
150
151 /**
152 * Starts the serializer.
153 * @return True, if succeeded
154 * @see DataSerializer::start().
155 */
156 bool start() override;
157
158 /**
159 * Stops the serializer.
160 * This function sets a stopping flag and returns immediately; it does not wait for the serializer to actually stop.
161 * The background thread will check the stopping flag at the beginning of each iteration and terminate.
162 * @return True, if the stop request was accepted; False, if the serializer was not started
163 * @see DataSerializer::stop(), stopAndWait(), hasStopped().
164 */
165 bool stop() override;
166
167 /**
168 * Returns whether the serializer has been started.
169 * @return True, if so
170 * @see DataSerializer::isStarted().
171 */
172 [[nodiscard]] bool isStarted() const override;
173
174 /**
175 * Returns whether the serializer has stopped and all remaining samples have been retrieved.
176 * @return True, if so
177 * @see DataSerializer::hasFinished().
178 */
179 [[nodiscard]] bool hasFinished() const override;
180
181 /**
182 * Registers a factory function for a given sample type.
183 * The factory function will be used to create data samples when reading from the stream.
184 * @param sampleType The sample type for which the factory function will be registered, must be non-empty
185 * @param factoryFunction The factory function to register, must be valid
186 * @return True, if succeeded
187 */
188 bool registerFactoryFunction(const std::string& sampleType, const FactoryFunction& factoryFunction);
189
190 /**
191 * Registers a factory function for a sample type T.
192 * The sample type T must provide static functions `sampleType()` and `createSample()`.
193 * This is a convenience function that calls `registerFactoryFunction(T::sampleType(), T::createSample)`.
194 * @tparam T The sample type which must provide `static const std::string& sampleType()` and `static UniqueDataSample createSample(const std::string&)`
195 * @return True, if succeeded
196 */
197 template <typename T>
198 bool registerSample();
199
200 /**
201 * Registers a callback function that will be invoked whenever a new channel is parsed.
202 * @param channelEventFunction The callback function to be invoked, must be valid
203 * @return True if the callback was successfully registered, false otherwise
204 */
205 bool registerChannelEventFunction(const ChannelEventFunction& channelEventFunction);
206
207 /**
208 * Returns the next sample from the stream.
209 * The sample will be removed from the internal queue and returned to the caller.
210 * @param channelId The resulting channel id of the sample
211 * @param speed The playback speed, with range (0, infinity), 0 to return samples as fast as possible, 1 for real-time playback
212 * @return The sample, nullptr if no sample is available yet or if the stream has ended
213 */
214 [[nodiscard]] UniqueDataSample sample(ChannelId& channelId, const double speed = 1.0);
215
216 /**
217 * Returns the channel information for a given channel.
218 * @param channelId The channel id to query
219 * @return The channel information, an invalid channel if the channel doesn't exist
220 */
221 [[nodiscard]] ChannelConfiguration channelConfiguration(const ChannelId channelId) const;
222
223 /**
224 * Returns all channels that have been parsed so far.
225 * @return The channels
226 */
227 [[nodiscard]] Channels channels() const;
228
229 protected:
230
231 /**
232 * Creates the input stream.
233 * @return The input stream, nullptr if the stream could not be created
234 */
235 virtual UniqueStream createStream() const = 0;
236
237 /**
238 * Reads the header from the input bitstream.
239 * @param inputBitstream The input bitstream from which the header will be read
240 * @return True, if succeeded
241 */
242 virtual bool readHeader(InputBitstream& inputBitstream);
243
244 /**
245 * The thread run function.
246 * @see Thread::threadRun().
247 */
248 void threadRun() override;
249
250 protected:
251
252 /// The input stream.
254
255 /// The map mapping sample types to factory functions.
257
258 /// The map mapping channel ids to extended channels (with factory function).
260
261 /// The callback function which is invoked whenever a new channel is parsed.
263
264 /// The priority queue holding samples which are pending to be retrieved, ordered by playback timestamp (smallest first).
266
267 /// The maximum number of pending samples in the queue.
268 static constexpr size_t maxPendingSampleQueueSize_ = 100;
269};
270
271/**
272 * This class implements a file-based input data serializer.
273 * This specialization of InputDataSerializer reads serialized data from a binary file on disk.<br>
274 * Use setFilename() to configure the input file path before calling initialize() and start().
275 * @ingroup ioserialization
276 */
277class OCEAN_IO_SERIALIZATION_EXPORT FileInputDataSerializer : public InputDataSerializer
278{
279 protected:
280
281 /**
282 * This class implements a file stream for file input data serializers.
283 */
284 class FileStream : public Stream
285 {
286 public:
287
288 /**
289 * Creates a new file stream with given filename.
290 * @param filename The filename of the file to read, must be valid
291 */
292 inline explicit FileStream(const std::string& filename);
293
294 /**
295 * Destructs the file stream.
296 */
297 inline ~FileStream() override;
298
299 /**
300 * Returns the input bitstream.
301 * @return The input bitstream
302 * @see Stream::inputBitstream().
303 */
304 inline InputBitstream& inputBitstream() override;
305
306 /**
307 * Returns whether this stream is valid.
308 * @return True, if so
309 * @see Stream::isValid().
310 */
311 inline bool isValid() const override;
312
313 protected:
314
315 /// The file stream.
316 std::ifstream stream_;
317
318 /// The input bitstream.
320 };
321
322 public:
323
324 /**
325 * Sets the filename of the file to read.
326 * @param filename The filename of the file to read, must be non-empty
327 * @return True, if succeeded
328 */
329 virtual bool setFilename(const std::string& filename);
330
331 protected:
332
333 /**
334 * Creates the input stream.
335 * @return The input stream, nullptr if the stream could not be created
336 * @see InputDataSerializer::createStream().
337 */
338 UniqueStream createStream() const override;
339
340 protected:
341
342 /// The filename of the file to read.
343 std::string filename_;
344};
345
347 Channel(channel),
348 factoryFunction_(factoryFunction)
349{
350 // nothing to do here
351}
352
353inline bool InputDataSerializer::SamplePairComparator::operator()(const SamplePair& sampleA, const SamplePair& sampleB) const
354{
355 return sampleA.second->playbackTimestamp() > sampleB.second->playbackTimestamp();
356}
357
359{
360 ocean_assert(!empty());
361
362 std::pop_heap(c.begin(), c.end(), comp);
363
364 SamplePair result = std::move(c.back());
365 c.pop_back();
366
367 return result;
368}
369
370inline FileInputDataSerializer::FileStream::FileStream(const std::string& filename) :
371 stream_(filename.c_str(), std::ios::binary),
372 inputBitstream_(stream_)
373
374{
375 // nothing to do here
376}
377
382
384{
385 return inputBitstream_;
386}
387
389{
390 return stream_.is_open() && !stream_.fail();
391}
392
393template <typename T>
395{
396 return registerFactoryFunction(T::sampleType(), T::createSample);
397}
398
399}
400
401}
402
403}
404
405#endif // META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
This class implements an input bitstream.
Definition Bitstream.h:51
This class holds channel configuration (sample type, name, and content type).
Definition DataSerializer.h:56
This class implements a channel with configuration and channel id.
Definition DataSerializer.h:136
This class implements the base class for data serializers.
Definition DataSerializer.h:38
std::pair< ChannelId, UniqueDataSample > SamplePair
Definition of a pair holding a channel id and a unique data sample.
Definition DataSerializer.h:217
std::vector< Channel > Channels
Definition of a vector holding channels.
Definition DataSerializer.h:192
uint32_t ChannelId
Definition of a channel id.
Definition DataSerializer.h:42
This class implements a file stream for file input data serializers.
Definition InputDataSerializer.h:285
~FileStream() override
Destructs the file stream.
Definition InputDataSerializer.h:378
bool isValid() const override
Returns whether this stream is valid.
Definition InputDataSerializer.h:388
FileStream(const std::string &filename)
Creates a new file stream with given filename.
Definition InputDataSerializer.h:370
InputBitstream & inputBitstream() override
Returns the input bitstream.
Definition InputDataSerializer.h:383
InputBitstream inputBitstream_
The input bitstream.
Definition InputDataSerializer.h:319
std::ifstream stream_
The file stream.
Definition InputDataSerializer.h:316
This class implements a file-based input data serializer.
Definition InputDataSerializer.h:278
std::string filename_
The filename of the file to read.
Definition InputDataSerializer.h:343
virtual bool setFilename(const std::string &filename)
Sets the filename of the file to read.
UniqueStream createStream() const override
Creates the input stream.
This class implements an extended channel with factory function.
Definition InputDataSerializer.h:84
FactoryFunction factoryFunction_
The factory function for creating data samples.
Definition InputDataSerializer.h:102
ExtendedChannel()=default
Creates a new invalid extended channel.
A priority queue that allows moving elements out.
Definition InputDataSerializer.h:129
SamplePair popTop()
Pops the top element from the queue.
Definition InputDataSerializer.h:358
This class implements an abstract stream for input data serializers.
Definition InputDataSerializer.h:53
virtual InputBitstream & inputBitstream()=0
Returns the input bitstream.
virtual bool isValid() const =0
Returns whether this stream is valid.
virtual ~Stream()=default
Destructs the stream.
This class implements an input data serializer.
Definition InputDataSerializer.h:35
Channels channels() const
Returns all channels that have been parsed so far.
bool registerSample()
Registers a factory function for a sample type T.
Definition InputDataSerializer.h:394
bool stop() override
Stops the serializer.
bool start() override
Starts the serializer.
void threadRun() override
The thread run function.
std::unique_ptr< Stream > UniqueStream
Definition of a unique pointer holding a stream.
Definition InputDataSerializer.h:75
SampleQueue sampleQueue_
The priority queue holding samples which are pending to be retrieved, ordered by playback timestamp (...
Definition InputDataSerializer.h:265
UniqueStream stream_
The input stream.
Definition InputDataSerializer.h:253
virtual UniqueStream createStream() const =0
Creates the input stream.
ExtendedChannelMap extendedChannelMap_
The map mapping channel ids to extended channels (with factory function).
Definition InputDataSerializer.h:259
virtual bool readHeader(InputBitstream &inputBitstream)
Reads the header from the input bitstream.
std::function< UniqueDataSample(const std::string &sampleType)> FactoryFunction
Definition of a factory function which creates a data sample based on a sample type.
Definition InputDataSerializer.h:39
bool initialize(Channels *preparsedChannels=nullptr, bool *isStreamCorrupted=nullptr)
Initializes the input data serializer.
bool hasFinished() const override
Returns whether the serializer has stopped and all remaining samples have been retrieved.
UniqueDataSample sample(ChannelId &channelId, const double speed=1.0)
Returns the next sample from the stream.
bool isStarted() const override
Returns whether the serializer has been started.
std::unordered_map< std::string, FactoryFunction > FactoryFunctionMap
Definition of a map mapping sample types to factory functions.
Definition InputDataSerializer.h:78
bool registerFactoryFunction(const std::string &sampleType, const FactoryFunction &factoryFunction)
Registers a factory function for a given sample type.
std::unordered_map< ChannelId, ExtendedChannel > ExtendedChannelMap
Definition of a map mapping channel ids to extended channels.
Definition InputDataSerializer.h:106
std::function< void(const Channel &channel)> ChannelEventFunction
Definition of a callback function which is invoked whenever a new channel is parsed.
Definition InputDataSerializer.h:45
ChannelEventFunction channelEventFunction_
The callback function which is invoked whenever a new channel is parsed.
Definition InputDataSerializer.h:262
FactoryFunctionMap factoryFunctionMap_
The map mapping sample types to factory functions.
Definition InputDataSerializer.h:256
bool registerChannelEventFunction(const ChannelEventFunction &channelEventFunction)
Registers a callback function that will be invoked whenever a new channel is parsed.
ChannelConfiguration channelConfiguration(const ChannelId channelId) const
Returns the channel information for a given channel.
std::unique_ptr< DataSample > UniqueDataSample
Definition of a unique pointer holding a DataSample.
Definition DataSample.h:39
The namespace covering the entire Ocean framework.
Definition Accessor.h:15
Comparator for SamplePair that orders by playback timestamp (min-heap).
Definition InputDataSerializer.h:113
bool operator()(const SamplePair &sampleA, const SamplePair &sampleB) const
Compares two sample pairs by their playback timestamps.
Definition InputDataSerializer.h:353