Jpp 19.3.0-rc.1
the software that should make you happy
Loading...
Searching...
No Matches
puzzled_frame.cc
Go to the documentation of this file.
1#include "puzzled_frame.hh"
2
5
7
8#include <string.h>
9
10#include <log.hh>
11
12#include <utility> // for std::pair and std::make_pair
13#include <stdexcept> // for runtime_error
14
15/**
16 * \author cpellegrino
17 */
18
19boost::atomic<unsigned int> PuzzledFrame::n_obj(0);
20
21uint32_t PuzzledFrame::frameDomStatus(int i) const
22{
23 uint32_t dom_status = 0;
24 for (
25 container_t::const_iterator it = m_container.begin(), et = m_container.end();
26 it != et;
27 ++it
28 ) {
29 const CLBCommonHeader* const clb_header = it->second->getCLBCommonHeader();
30 dom_status = dom_status | clb_header->domStatus(i);
31 }
32 return dom_status;
33}
34
35uint32_t calculate_number_of_items(Frame const& frame)
36{
37 uint32_t nitems = 0;
38 uint32_t const frame_length = frame.size();
39
40 uint32_t const data_type = frame.getHeader()->DataType;
41
42 if (is_optical(data_type)) {
43 nitems = (frame_length - sizeof(DAQCommonHeader)) / 6; // 6 is sizeof(hit)
44 } else if (is_acoustic(data_type)) {
45 InfoWord const*const iw = static_cast<const InfoWord*>(
46 static_cast<const void*>(frame.getPayload()));
47
48 const unsigned int audiowordsize = iw->audioWordSize();
49
50 nitems = (frame_length
51 - sizeof(DAQCommonHeader)
52 - sizeof(InfoWord))
53 / audiowordsize;
54 } else {
55 throw std::runtime_error("Unknown data type");
56 }
57
58 return nitems;
59}
60
62{
63 //Test collision: is m_container[idx] empty?
64 unsigned int const idx = datagram->getUDPSequenceNumber();
65
66 container_t::const_iterator const it = m_container.find(idx);
67
68 if (it != m_container.end()) {
69 LOG_DEBUG << "Datagram collision. idx: " << idx << '\n'
70 << "container.size(): " << m_container.size() << '\n'
71 << "INFO:\n" << *datagram->getCLBCommonHeader();
72
73 Log::Counter::get().add("collision");
75 return false;
76 }
77
78 if (!m_totalsize) {
79 benchmark::chrono_swap(*datagram, *this);
80 }
81
82 m_container.insert(m_container.end(), container_t::value_type(idx, datagram));
83
84 m_totalsize += datagram->size();
85
86 return true;
87}
88
90 CLBDataGram const& dg,
91 std::size_t max_size)
92{
93 std::size_t const previous_packets_size =
95 * (max_size - sizeof(CLBCommonHeader));
96
97 std::size_t const chunk_in_previous = previous_packets_size % 6;
98
99 unsigned int const head =
100 chunk_in_previous
101 ? 6 - chunk_in_previous
102 : 0;
103
104 unsigned int const tail =
105 (dg.size() - sizeof(CLBCommonHeader) - head) % 6;
106
107 return std::make_pair(head, tail);
108}
109
110// Preconditions:
111// - Frame's memory has been already allocated elsewhere.
112// - The header has been already written.
113// - The container has more than 1 datagram, or it contains only the first one
114// UDPSequenceNumber == 0.
115
118 frame_idx_t frame_idx,
119 Frame& frame)
120{
121 assert(
122 (
123 c.size() > 1
124 || c.begin()->second->getCLBCommonHeader()->udpSequenceNumber() == 0
125 )
126 && "Precondition failed."
127 );
128
129 uint32_t current_seq_number = 0;
130 unsigned int previous_chunk = 0;
131
132 std::size_t const max_size = c.begin()->second->size();
133
134 std::size_t size = sizeof(DAQCommonHeader);
135
136 for (
137 PuzzledFrame::container_t::const_iterator it = c.begin(), et = c.end();
138 it != et;
139 ++it
140 ) {
141
142 CLBDataGram const& dg = *it->second;
144 dg,
145 max_size
146 );
147
148 if (dg.getCLBCommonHeader()->udpSequenceNumber() != current_seq_number) {
149 LOG_DEBUG << "Datagram "
150 << current_seq_number
151 << " not present in optical frame "
152 << frame_idx
153 << ". Packet loss(?)";
154
155 Log::Counter::get().add("missing odgram");
156
157 char* const dest = &frame.front() + size - previous_chunk;
158
159 std::size_t const write_size = dg.getPayloadSize() - bounds.first;
160
161 std::memcpy(dest, dg.getPayload() + bounds.first, write_size);
162
163 size += write_size - previous_chunk;
164 } else {
165 char* const dest = &frame.front() + size;
166
167 std::size_t const write_size = dg.getPayloadSize();
168
169 std::memcpy(dest, dg.getPayload(), write_size);
170
171 size += write_size;
172 }
173
174 current_seq_number = dg.getCLBCommonHeader()->udpSequenceNumber() + 1;
175 previous_chunk = bounds.second;
176 }
177
178 // check whether size is compatible an integer number of items
179 // and if not strip away the boundary
180
181 if (previous_chunk != 0) {
182 LOG_DEBUG << "Missing last datagram(s) in optical frame "
183 << frame_idx
184 << ". Packet loss (?).";
185
186 Log::Counter::get().add("missing odgram");
187 size -= previous_chunk;
188 }
189
190 // check the presence of the trailer
191
192 if (!isTrailer(*c.rbegin()->second->getCLBCommonHeader())) {
193 LOG_DEBUG << "Missing trailer in optical frame "
194 << frame_idx;
195
196 Log::Counter::get().add("missing trailer");
197 }
198
199 frame.resize(size);
200}
201
203 CLBDataGram const& dg,
204 std::size_t max_size,
205 std::size_t item_size)
206{
207 unsigned int head = 0;
208 unsigned int tail = 0;
209
210 if (dg.getCLBCommonHeader()->udpSequenceNumber() != 0) {
211 std::size_t const previous_audio_size =
213 * (max_size - sizeof(CLBCommonHeader))
214 - sizeof(InfoWord);
215
216 std::size_t const chunk_in_previous = previous_audio_size % item_size;
217
218 head = chunk_in_previous ? item_size - chunk_in_previous : 0;
219
220 tail = (dg.size() - sizeof(CLBCommonHeader) - head) % item_size;
221 } else {
222 head = 0;
223 tail = (dg.size() - sizeof(CLBCommonHeader) - sizeof(InfoWord)) % item_size;
224 }
225
226 return std::make_pair(head, tail);
227}
228
229// Preconditions:
230// - Frame's memory has been already allocated elsewhere.
231// - The header has been already written.
232// - The first datagram in the container must have UDPSequenceNumber == 0.
233
236 frame_idx_t frame_idx,
237 Frame& frame)
238{
239 uint32_t current_seq_number = 0;
240 unsigned int previous_chunk = 0;
241
242 std::size_t const max_size = c.begin()->second->size();
243
244 std::size_t size = sizeof(DAQCommonHeader);
245
246 InfoWord const& iw = *static_cast<const InfoWord*>(
247 static_cast<const void*>(c.begin()->second->getPayload()));
248
249 unsigned int const audio_word_size = iw.audioWordSize();
250
251 for (
252 PuzzledFrame::container_t::const_iterator it = c.begin(), et = c.end();
253 it != et;
254 ++it
255 ) {
256
257 CLBDataGram const& dg = *it->second;
258
260 dg,
261 max_size,
262 audio_word_size);
263
264 if (dg.getCLBCommonHeader()->udpSequenceNumber() != current_seq_number) {
265 LOG_DEBUG << "Datagram "
266 << current_seq_number
267 << " not present in acoustic frame "
268 << frame_idx
269 << ". Packet loss(?)";
270
271 Log::Counter::get().add("missing adgram");
272
273 size -= previous_chunk;
274 break;
275 } else {
276 char* const dest = &frame.front() + size;
277
278 std::size_t const write_size = dg.getPayloadSize();
279
280 memcpy(dest, dg.getPayload(), write_size);
281
282 size += write_size;
283 }
284
285 ++current_seq_number;
286 previous_chunk = bounds.second;
287 }
288
289 // check whether size is compatible an integer number of items
290 // and if not strip away the boundary
291
292 if (previous_chunk != 0) {
293 LOG_DEBUG << "Missing last datagram(s) in acoustic frame "
294 << frame_idx
295 << ". Packet loss (?).";
296
297 Log::Counter::get().add("missing adgram");
298 size -= previous_chunk;
299 }
300
301 frame.resize(size);
302}
303
305{
307 benchmark::chrono_swap(*this, frame);
308
309 CLBCommonHeader const*const clb_header
310 = m_container.begin()->second->getCLBCommonHeader();
311
312 int const frame_length = frameLength();
313
314 // test the frame length
315 // must be greater than 0
316 if (frame_length < 0) {
317 LOG_DEBUG << "Anomalous frame size: " << frame_length << '\n'
318 << "Read total size: " << m_totalsize << '\n'
319 << "container.size(): " << m_container.size();
320
321 Log::Counter::get().add("wrong size");
322 frame.clear();
323 return;
324 }
325
326 frame.m_seqnumber = m_seqnumber;
327 frame.resize(frame_length);
328
329 DAQCommonHeader& daq_header = *static_cast<DAQCommonHeader*>(
330 static_cast<void*>(
331 &frame.front()));
332
333 // initialise the parts of the DAQCommonHeader that are already known
334 // before the assembling of the data.
335
336 daq_header.DataType = clb_header->dataType();
337
339
340 daq_header.RunNumber = clb_header->runNumber();
341
342 daq_header.SequenceNumber = m_seqnumber;
343
344 daq_header.Timestamp.Sec = clb_header->timeStamp().sec();
345 daq_header.Timestamp.Tics = clb_header->timeStamp().tics();
346
347 daq_header.DOMIdentifier = clb_header->domIdentifier();
348
349 daq_header.NumberOfPackets = m_container.size();
350 daq_header.HighestPacketNumber
351 = m_container.rbegin()
352 ->second
353 ->getCLBCommonHeader()
354 ->udpSequenceNumber();
355
356 daq_header.DOMStatus1 = frameDomStatus(1);
357 daq_header.DOMStatus2 = frameDomStatus(2);
358 daq_header.DOMStatus3 = 0;
359 daq_header.DOMStatus4 = 0;
360
361 if (is_optical(daq_header.DataType)) {
362 if (
363 !(
364 m_container.size() > 1
365 || m_container.begin()->second->getCLBCommonHeader()->udpSequenceNumber() == 0
366 )
367 ) {
368 LOG_DEBUG << "Optical frame "
369 << m_frame_idx
370 << " contains too few data to be assembled. Ignored.";
371
372 Log::Counter::get().add("skipd oframe");
373 frame.resize(0);
374 } else {
376 }
377 } else if (is_acoustic(daq_header.DataType)) {
378 if (m_container.begin()->second->getUDPSequenceNumber() != 0) {
379 LOG_DEBUG << "Acoustic frame "
380 << m_frame_idx
381 << " without infoword. Ignored.";
382
383 Log::Counter::get().add("skipd aframe");
384 frame.resize(0);
385 } else if (m_container.begin()->second->size() <= sizeof(CLBCommonHeader)) {
386 frame.resize(0);
387 } else {
389 }
390 } else {
391 throw std::runtime_error("Unknown data type");
392 }
393
394 daq_header.FrameLength = frame.size();
395 daq_header.NumberOfItems = calculate_number_of_items(frame);
396}
397
399{
400 for (
401 container_t::const_iterator it = m_container.begin(), et = m_container.end();
402 it != et;
403 ++it
404 )
405 {
407 }
408
409 m_container.clear();
410
411 m_totalsize = 0;
412 m_seqnumber = 0;
413 m_detector_id = 0;
414 m_datatype = 0;
415 m_frame_idx = 0;
416}
const CLBCommonHeader * getCLBCommonHeader() const
const char * getPayload() const
size_t getPayloadSize() const
size_t size() const
uint32_t getUDPSequenceNumber() const
Template Frame for ARS data.
Definition frame.hh:13
const DAQCommonHeader *const getHeader() const
Definition frame.hh:18
unsigned int m_seqnumber
Definition frame.hh:14
const char *const getPayload() const
Definition frame.hh:24
void putDataGram(CLBDataGram *p)
static InBufferCollector & getCollector()
void add(std::string const &tag)
Definition log.hh:60
static Counter & get()
Definition log.hh:53
unsigned int m_totalsize
static boost::atomic< unsigned int > n_obj
unsigned int m_datatype
void getFrame(Frame &frame)
bool insert(CLBDataGram *datagram)
container_t m_container
uint32_t frameDomStatus(int i=1) const
unsigned int m_detector_id
unsigned int m_seqnumber
uint32_t frameLength() const
frame_idx_t m_frame_idx
bool isTrailer(CLBCommonHeader const &header)
uint64_t frame_idx_t
bool is_acoustic(unsigned int dt)
bool is_optical(unsigned int dt)
#define LOG_DEBUG
Definition log.hh:109
void chrono_set(chronometrable &chr)
void chrono_swap(chronometrable &input, chronometrable &output)
void buildAcou(PuzzledFrame::container_t const &c, frame_idx_t frame_idx, Frame &frame)
uint32_t calculate_number_of_items(Frame const &frame)
std::pair< unsigned int, unsigned int > acou_boundaries(CLBDataGram const &dg, std::size_t max_size, std::size_t item_size)
void buildOpto(PuzzledFrame::container_t const &c, frame_idx_t frame_idx, Frame &frame)
std::pair< unsigned int, unsigned int > opt_boundaries(CLBDataGram const &dg, std::size_t max_size)
uint32_t dataType() const
uint32_t udpSequenceNumber() const
UTCTime timeStamp() const
uint32_t runNumber() const
uint32_t domIdentifier() const
uint32_t domStatus(int n=1) const
uint32_t Sec
Definition utctime.hh:14
uint32_t sec() const
Definition utctime.hh:17
uint32_t Tics
Definition utctime.hh:15
uint32_t tics() const
Definition utctime.hh:22