Ocean
Loading...
Searching...
No Matches
InputDataSerializer.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8#ifndef META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
9#define META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
10
13
14#include <functional>
15
16namespace Ocean
17{
18
19namespace IO
20{
21
22namespace Serialization
23{
24
25/**
26 * This class implements an input data serializer.
27 * The input data serializer deserializes data samples from a stream (e.g., file or network).
28 * @ingroup ioserialization
29 */
31{
32 public:
33
34 /// Definition of a factory function which creates a data sample based on a sample type.
35 using FactoryFunction = std::function<UniqueDataSample(const std::string& sampleType)>;
36
37 /**
38 * Definition of a callback function which is invoked whenever a new channel is parsed.
39 * @param channel The channel information
40 */
41 using ChannelEventFunction = std::function<void(const Channel& channel)>;
42
43 protected:
44
45 /**
46 * This class implements an abstract stream for input data serializers.
47 */
48 class Stream
49 {
50 public:
51
52 /**
53 * Destructs the stream.
54 */
55 virtual ~Stream() = default;
56
57 /**
58 * Returns the input bitstream.
59 * @return The input bitstream
60 */
62
63 /**
64 * Returns whether this stream is valid.
65 * @return True, if so
66 */
67 virtual bool isValid() const = 0;
68 };
69
70 /// Definition of a unique pointer holding a stream.
71 using UniqueStream = std::unique_ptr<Stream>;
72
73 /// Definition of a map mapping sample types to factory functions.
74 using FactoryFunctionMap = std::unordered_map<std::string, FactoryFunction>;
75
76 /**
77 * This class implements an extended channel with factory function.
78 */
79 class ExtendedChannel : public Channel
80 {
81 public:
82
83 /**
84 * Creates a new invalid extended channel.
85 */
86 ExtendedChannel() = default;
87
88 /**
89 * Creates a new extended channel with given channel and factory function.
90 * @param channel The channel
91 * @param factoryFunction The factory function
92 */
93 inline ExtendedChannel(const Channel& channel, const FactoryFunction& factoryFunction);
94
95 public:
96
97 /// The factory function for creating data samples.
99 };
100
101 /// Definition of a map mapping channel ids to extended channels.
102 using ExtendedChannelMap = std::unordered_map<ChannelId, ExtendedChannel>;
103
104 /**
105 * Comparator for SamplePair that orders by playback timestamp (min-heap).
106 * Samples with smaller playback timestamps have higher priority.
107 */
109 {
110 /**
111 * Compares two sample pairs by their playback timestamps.
112 * @param sampleA The first sample pair
113 * @param sampleB The second sample pair
114 * @return True if 'sampleA' should come after 'sampleB' (i.e., 'sampleA' has a larger timestamp)
115 */
116 inline bool operator()(const SamplePair& sampleA, const SamplePair& sampleB) const;
117 };
118
119 /**
120 * A priority queue that allows moving elements out.
121 * This extends std::priority_queue to provide access to the underlying container
122 * for proper move semantics when popping elements.
123 */
124 class SampleQueue : public std::priority_queue<SamplePair, std::vector<SamplePair>, SamplePairComparator>
125 {
126 public:
127
128 /**
129 * Pops the top element from the queue.
130 * @return The top element, moved out of the queue
131 */
132 inline SamplePair popTop();
133 };
134
135 public:
136
137 /**
138 * Initializes the input data serializer.
139 * The serializer will create the input stream and read the header.
140 * Optionally, the serializer can pre-parse all channels before returning.
141 * @param preparsedChannels Optional output parameter to receive all channels that were pre-parsed, nullptr if not of interest
142 * @param isStreamCorrupted Optional output parameter to receive whether the stream is corrupted (missing end-of-stream indication), nullptr if not of interest
143 * @return True, if succeeded
144 */
145 bool initialize(Channels* preparsedChannels = nullptr, bool* isStreamCorrupted = nullptr);
146
147 /**
148 * Starts the serializer.
149 * @return True, if succeeded
150 * @see DataSerializer::start().
151 */
152 bool start() override;
153
154 /**
155 * Stops the serializer.
156 * This function sets a stopping flag and returns immediately; it does not wait for the serializer to actually stop.
157 * The background thread will check the stopping flag at the beginning of each iteration and terminate.
158 * @return True, if the stop request was accepted; False, if the serializer was not started
159 * @see DataSerializer::stop(), stopAndWait(), hasStopped().
160 */
161 bool stop() override;
162
163 /**
164 * Returns whether the serializer has been started.
165 * @return True, if so
166 * @see DataSerializer::isStarted().
167 */
168 [[nodiscard]] bool isStarted() const override;
169
170 /**
171 * Returns whether the serializer has stopped and all remaining samples have been retrieved.
172 * @return True, if so
173 * @see DataSerializer::hasFinished().
174 */
175 [[nodiscard]] bool hasFinished() const override;
176
177 /**
178 * Registers a factory function for a given sample type.
179 * The factory function will be used to create data samples when reading from the stream.
180 * @param sampleType The sample type for which the factory function will be registered, must be non-empty
181 * @param factoryFunction The factory function to register, must be valid
182 * @return True, if succeeded
183 */
184 bool registerFactoryFunction(const std::string& sampleType, const FactoryFunction& factoryFunction);
185
186 /**
187 * Registers a factory function for a sample type T.
188 * The sample type T must provide static functions `sampleType()` and `createSample()`.
189 * This is a convenience function that calls `registerFactoryFunction(T::sampleType(), T::createSample)`.
190 * @tparam T The sample type which must provide `static const std::string& sampleType()` and `static UniqueDataSample createSample(const std::string&)`
191 * @return True, if succeeded
192 */
193 template <typename T>
194 bool registerSample();
195
196 /**
197 * Registers a callback function that will be invoked whenever a new channel is parsed.
198 * @param channelEventFunction The callback function to be invoked, must be valid
199 * @return True if the callback was successfully registered, false otherwise
200 */
201 bool registerChannelEventFunction(const ChannelEventFunction& channelEventFunction);
202
203 /**
204 * Returns the next sample from the stream.
205 * The sample will be removed from the internal queue and returned to the caller.
206 * @param channelId The resulting channel id of the sample
207 * @param speed The playback speed, with range (0, infinity), 0 to return samples as fast as possible, 1 for real-time playback
208 * @return The sample, nullptr if no sample is available yet or if the stream has ended
209 */
210 [[nodiscard]] UniqueDataSample sample(ChannelId& channelId, const double speed = 1.0);
211
212 /**
213 * Returns the channel information for a given channel.
214 * @param channelId The channel id to query
215 * @return The channel information, an invalid channel if the channel doesn't exist
216 */
217 [[nodiscard]] ChannelConfiguration channelConfiguration(const ChannelId channelId) const;
218
219 /**
220 * Returns all channels that have been parsed so far.
221 * @return The channels
222 */
223 [[nodiscard]] Channels channels() const;
224
225 protected:
226
227 /**
228 * Creates the input stream.
229 * @return The input stream, nullptr if the stream could not be created
230 */
231 virtual UniqueStream createStream() const = 0;
232
233 /**
234 * Reads the header from the input bitstream.
235 * @param inputBitstream The input bitstream from which the header will be read
236 * @return True, if succeeded
237 */
238 virtual bool readHeader(InputBitstream& inputBitstream);
239
240 /**
241 * The thread run function.
242 * @see Thread::threadRun().
243 */
244 void threadRun() override;
245
246 protected:
247
248 /// The input stream.
250
251 /// The map mapping sample types to factory functions.
253
254 /// The map mapping channel ids to extended channels (with factory function).
256
257 /// The callback function which is invoked whenever a new channel is parsed.
259
260 /// The priority queue holding samples which are pending to be retrieved, ordered by playback timestamp (smallest first).
262
263 /// The maximum number of pending samples in the queue.
264 static constexpr size_t maxPendingSampleQueueSize_ = 100;
265};
266
267/**
268 * This class implements a file-based input data serializer.
269 * @ingroup ioserialization
270 */
272{
273 protected:
274
275 /**
276 * This class implements a file stream for file input data serializers.
277 */
278 class FileStream : public Stream
279 {
280 public:
281
282 /**
283 * Creates a new file stream with given filename.
284 * @param filename The filename of the file to read, must be valid
285 */
286 inline explicit FileStream(const std::string& filename);
287
288 /**
289 * Destructs the file stream.
290 */
291 inline ~FileStream() override;
292
293 /**
294 * Returns the input bitstream.
295 * @return The input bitstream
296 * @see Stream::inputBitstream().
297 */
298 inline InputBitstream& inputBitstream() override;
299
300 /**
301 * Returns whether this stream is valid.
302 * @return True, if so
303 * @see Stream::isValid().
304 */
305 inline bool isValid() const override;
306
307 protected:
308
309 /// The file stream.
310 std::ifstream stream_;
311
312 /// The input bitstream.
314 };
315
316 public:
317
318 /**
319 * Sets the filename of the file to read.
320 * @param filename The filename of the file to read, must be non-empty
321 * @return True, if succeeded
322 */
323 virtual bool setFilename(const std::string& filename);
324
325 protected:
326
327 /**
328 * Creates the input stream.
329 * @return The input stream, nullptr if the stream could not be created
330 * @see InputDataSerializer::createStream().
331 */
332 UniqueStream createStream() const override;
333
334 protected:
335
336 /// The filename of the file to read.
337 std::string filename_;
338};
339
341 Channel(channel),
342 factoryFunction_(factoryFunction)
343{
344 // nothing to do here
345}
346
347inline bool InputDataSerializer::SamplePairComparator::operator()(const SamplePair& sampleA, const SamplePair& sampleB) const
348{
349 return sampleA.second->playbackTimestamp() > sampleB.second->playbackTimestamp();
350}
351
353{
354 ocean_assert(!empty());
355
356 std::pop_heap(c.begin(), c.end(), comp);
357
358 SamplePair result = std::move(c.back());
359 c.pop_back();
360
361 return result;
362}
363
364inline FileInputDataSerializer::FileStream::FileStream(const std::string& filename) :
365 stream_(filename.c_str(), std::ios::binary),
366 inputBitstream_(stream_)
367
368{
369 // nothing to do here
370}
371
376
378{
379 return inputBitstream_;
380}
381
383{
384 return stream_.is_open() && !stream_.fail();
385}
386
387template <typename T>
389{
390 return registerFactoryFunction(T::sampleType(), T::createSample);
391}
392
393}
394
395}
396
397}
398
399#endif // META_OCEAN_IO_SERIALIZATION_INPUT_DATA_SERIALIZER_H
This class implements an input bitstream.
Definition Bitstream.h:51
This class holds channel configuration (sample type, name, and content type).
Definition DataSerializer.h:52
This class implements a channel with configuration and channel id.
Definition DataSerializer.h:130
This class implements the base class for data serializers.
Definition DataSerializer.h:36
std::pair< ChannelId, UniqueDataSample > SamplePair
Definition of a pair holding a channel id and a unique data sample.
Definition DataSerializer.h:211
std::vector< Channel > Channels
Definition of a vector holding channels.
Definition DataSerializer.h:186
uint32_t ChannelId
Definition of a channel id.
Definition DataSerializer.h:40
This class implements a file stream for file input data serializers.
Definition InputDataSerializer.h:279
~FileStream() override
Destructs the file stream.
Definition InputDataSerializer.h:372
bool isValid() const override
Returns whether this stream is valid.
Definition InputDataSerializer.h:382
FileStream(const std::string &filename)
Creates a new file stream with given filename.
Definition InputDataSerializer.h:364
InputBitstream & inputBitstream() override
Returns the input bitstream.
Definition InputDataSerializer.h:377
InputBitstream inputBitstream_
The input bitstream.
Definition InputDataSerializer.h:313
std::ifstream stream_
The file stream.
Definition InputDataSerializer.h:310
This class implements a file-based input data serializer.
Definition InputDataSerializer.h:272
std::string filename_
The filename of the file to read.
Definition InputDataSerializer.h:337
virtual bool setFilename(const std::string &filename)
Sets the filename of the file to read.
UniqueStream createStream() const override
Creates the input stream.
This class implements an extended channel with factory function.
Definition InputDataSerializer.h:80
FactoryFunction factoryFunction_
The factory function for creating data samples.
Definition InputDataSerializer.h:98
ExtendedChannel()=default
Creates a new invalid extended channel.
A priority queue that allows moving elements out.
Definition InputDataSerializer.h:125
SamplePair popTop()
Pops the top element from the queue.
Definition InputDataSerializer.h:352
This class implements an abstract stream for input data serializers.
Definition InputDataSerializer.h:49
virtual InputBitstream & inputBitstream()=0
Returns the input bitstream.
virtual bool isValid() const =0
Returns whether this stream is valid.
virtual ~Stream()=default
Destructs the stream.
This class implements an input data serializer.
Definition InputDataSerializer.h:31
Channels channels() const
Returns all channels that have been parsed so far.
bool registerSample()
Registers a factory function for a sample type T.
Definition InputDataSerializer.h:388
bool stop() override
Stops the serializer.
bool start() override
Starts the serializer.
void threadRun() override
The thread run function.
std::unique_ptr< Stream > UniqueStream
Definition of a unique pointer holding a stream.
Definition InputDataSerializer.h:71
SampleQueue sampleQueue_
The priority queue holding samples which are pending to be retrieved, ordered by playback timestamp (...
Definition InputDataSerializer.h:261
UniqueStream stream_
The input stream.
Definition InputDataSerializer.h:249
virtual UniqueStream createStream() const =0
Creates the input stream.
ExtendedChannelMap extendedChannelMap_
The map mapping channel ids to extended channels (with factory function).
Definition InputDataSerializer.h:255
virtual bool readHeader(InputBitstream &inputBitstream)
Reads the header from the input bitstream.
std::function< UniqueDataSample(const std::string &sampleType)> FactoryFunction
Definition of a factory function which creates a data sample based on a sample type.
Definition InputDataSerializer.h:35
bool initialize(Channels *preparsedChannels=nullptr, bool *isStreamCorrupted=nullptr)
Initializes the input data serializer.
bool hasFinished() const override
Returns whether the serializer has stopped and all remaining samples have been retrieved.
UniqueDataSample sample(ChannelId &channelId, const double speed=1.0)
Returns the next sample from the stream.
bool isStarted() const override
Returns whether the serializer has been started.
std::unordered_map< std::string, FactoryFunction > FactoryFunctionMap
Definition of a map mapping sample types to factory functions.
Definition InputDataSerializer.h:74
bool registerFactoryFunction(const std::string &sampleType, const FactoryFunction &factoryFunction)
Registers a factory function for a given sample type.
std::unordered_map< ChannelId, ExtendedChannel > ExtendedChannelMap
Definition of a map mapping channel ids to extended channels.
Definition InputDataSerializer.h:102
std::function< void(const Channel &channel)> ChannelEventFunction
Definition of a callback function which is invoked whenever a new channel is parsed.
Definition InputDataSerializer.h:41
ChannelEventFunction channelEventFunction_
The callback function which is invoked whenever a new channel is parsed.
Definition InputDataSerializer.h:258
FactoryFunctionMap factoryFunctionMap_
The map mapping sample types to factory functions.
Definition InputDataSerializer.h:252
static constexpr size_t maxPendingSampleQueueSize_
The maximum number of pending samples in the queue.
Definition InputDataSerializer.h:264
bool registerChannelEventFunction(const ChannelEventFunction &channelEventFunction)
Registers a callback function that will be invoked whenever a new channel is parsed.
ChannelConfiguration channelConfiguration(const ChannelId channelId) const
Returns the channel information for a given channel.
std::unique_ptr< DataSample > UniqueDataSample
Definition of a unique pointer holding a DataSample.
Definition DataSample.h:39
The namespace covering the entire Ocean framework.
Definition Accessor.h:15
Comparator for SamplePair that orders by playback timestamp (min-heap).
Definition InputDataSerializer.h:109
bool operator()(const SamplePair &sampleA, const SamplePair &sampleB) const
Compares two sample pairs by their playback timestamps.
Definition InputDataSerializer.h:347