Jpp
frame_farm.cc
Go to the documentation of this file.
1 #include "frame_farm.hh"
3 #include <log.hh>
4 
5 /**
6  * \author cpellegrino
7  */
8 
9 static const useconds_t sc_sleep = 100000; // 100000 = 100 ms, 10000 = 10 ms
10 
11 static const int sc_expsec = 0;
12 
13 static const int sc_expusec = 3000000;
14 
16 {
17  for (
18  container_t::iterator it = m_container.begin(), et = m_container.end();
19  it != et;
20  ++it
21  ) {
22  delete it->second;
23  }
24 
25  for (
26  buffers_collector_t::iterator it = m_empty_buffers.begin(), et = m_empty_buffers.end();
27  it != et;
28  ++it
29  ) {
30  delete *it;
31  }
32 }
33 
35 {
36  frame.clear();
37  // Block until data is coming
38  PuzzledFrame* pframe = static_cast<PuzzledFrame*>(m_frames.get());
39 
40  if (pframe) {
41  pframe->getFrame(frame);
42 
43  pframe->reset();
44 
45  boost::mutex::scoped_lock lock(m_ebf_mutex);
46 
47  if (m_empty_buffers.size() > 1000) {
48  delete pframe;
49  } else {
50  m_empty_buffers.push_back(pframe);
51  }
52  }
53 }
54 
56 {
57  /** This method runs on an independent thread the polling of
58  * completed or expired frames.
59  *
60  *
61  **/
62 
63  m_running = true;
64 
65  while (m_running.load()) {
66  {
67  boost::mutex::scoped_lock lock(m_mutex);
68 
69  container_t::iterator it = m_container.begin();
70 
71  container_t::iterator et = m_container.end();
72 
73  m_frames.lock();
74 
75  Timer const t(0, 0);
76 
77  while (it != et) {
78  PuzzledFrame* pframe = it->second;
79  if (pframe->hasExpiredWrt(t)) {
80  // Signal the completion!
81  m_container.erase(it++);
82  et = m_container.end();
83 
84  benchmark::chrono_set(*pframe);
85 
86  m_frames.put_nolock(pframe);
87  } else {
88  ++it;
89  }
90  }
91  m_frames.signal();
92  m_frames.unlock();
93  }
94  usleep(sc_sleep);
95  }
96 }
97 
99 {
100  CLBCommonHeader const*const header = datagram->getCLBCommonHeader();
101 
102  if (!datagram->hasValidTimeStamp()) {
103  LOG_DEBUG
104  << "Time-unreliable datagram detected: "
105  << "DOM ID: " << header->domIdentifier() << ", "
106  << "time stamp secs: " << header->timeStamp().sec() << ", "
107  << "time stamp tics: " << header->timeStamp().tics() << ", "
108  << (m_max_dump_size ? "dumping to file" : "not dumping to file");
109 
110  if (m_max_dump_size) {
111  bool const ok = m_dumpfile.write(datagram, m_detector_id);
112  if (!ok) {
113  LOG_ERROR << "Impossible to write time-unreliable data to dump file";
114  }
115  }
116 
117  Log::Counter::get().add("dumped dgrams");
119  return false;
120  }
121 
122  std::pair<uint32_t, bool> const seq_num = seq_number(
123  *header,
126 
127  if (!seq_num.second) {
128  LOG_DEBUG << "Discarding data which timestamp is before the run start time. "
129  << "DOM ID: " << header->domIdentifier() << ", "
130  << "seq_num: " << seq_num.first << ", "
131  << "run start time in ms: " << m_start_run_ms << ", "
132  << "time stamp secs: " << header->timeStamp().sec() << ", "
133  << "time stamp tics: " << header->timeStamp().tics() << ", "
134  << "timeslice duration in ms: " << m_timeslice_duration;
135 
136  Log::Counter::get().add("old data");
138  return false;
139  }
140 
141  if (
142  m_run_number != -1
143  && header->runNumber() != static_cast<unsigned int>(m_run_number)
144  ) {
145  LOG_DEBUG << "Discarding data which run number ("
146  << header->runNumber()
147  << ") is different from the one of the current run: "
148  << runNumber();
149  Log::Counter::get().add("wrong runno");
151  return false;
152  }
153 
154  frame_idx_t const idx = data2idx(
155  *header,
158 
159  bool insert_value;
160 
161  boost::mutex::scoped_lock lock(m_mutex);
162 
163  benchmark::chrono_set(*datagram);
164 
165  container_t::const_iterator const it = m_container.find(idx);
166 
167  benchmark::chrono_set(*datagram);
168 
169  if (it != m_container.end()) {
170  insert_value = it->second->insert(datagram);
171  } else {
172  PuzzledFrame* const pframe = getEmptyPuzzledFrame();
173 
174  pframe->setSeqNumber(seq_num.first);
175  pframe->setFrameIndex(idx);
176 
177  pframe->setDataType(header->dataType());
178  pframe->setDetectorId(m_detector_id);
179  insert_value = pframe->insert(datagram);
180 
181  container_t::value_type const pair(idx, pframe);
182 
183  m_container.insert(pair);
184  }
185 
186  return insert_value;
187 }
188 
190 {
191  PuzzledFrame* pframe;
192 
193  boost::mutex::scoped_lock lock(m_ebf_mutex);
194 
195  if (m_empty_buffers.size()) {
196  pframe = m_empty_buffers.front();
197 
198  m_empty_buffers.pop_front();
199  lock.unlock();
200  } else {
201  lock.unlock();
202  pframe = new PuzzledFrame;
203  }
204 
205  pframe->setTimer(Timer(sc_expsec, sc_expusec));
206 
207  return pframe;
208 }
CLBCommonHeader::runNumber
uint32_t runNumber() const
Definition: clb_common_header.hh:34
Log::Counter::add
void add(std::string const &tag)
Definition: log.hh:60
CLBCommonHeader::dataType
uint32_t dataType() const
Definition: clb_common_header.hh:29
LOG_DEBUG
#define LOG_DEBUG
Definition: log.hh:109
PuzzledFrame::insert
bool insert(CLBDataGram *datagram)
Definition: puzzled_frame.cc:61
PuzzledFrame::setSeqNumber
void setSeqNumber(unsigned int seqnumber)
Definition: puzzled_frame.hh:63
log.hh
CLBDataGram::getCLBCommonHeader
const CLBCommonHeader * getCLBCommonHeader() const
Definition: clb_datagram.hh:90
FrameFarm::runNumber
int runNumber() const
Definition: frame_farm.hh:87
InBufferCollector::putDataGram
void putDataGram(CLBDataGram *p)
Definition: input_buffer_collector.hh:53
FrameFarm::m_frames
ptrDispatcher m_frames
Definition: frame_farm.hh:31
FrameFarm::m_dumpfile
DumpFile m_dumpfile
Definition: frame_farm.hh:50
UTCTime::sec
uint32_t sec() const
Definition: utctime.hh:17
FrameFarm::m_container
container_t m_container
Definition: frame_farm.hh:29
PuzzledFrame::setDetectorId
void setDetectorId(unsigned int detector_id)
Definition: puzzled_frame.hh:73
FrameFarm::m_start_run_ms
uint64_t m_start_run_ms
Definition: frame_farm.hh:43
FrameFarm::m_mutex
boost::mutex m_mutex
Definition: frame_farm.hh:35
Frame
Template Frame for ARS data.
Definition: frame.hh:12
PuzzledFrame::setDataType
void setDataType(unsigned int datatype)
Definition: puzzled_frame.hh:68
benchmark::chrono_set
void chrono_set(chronometrable &chr)
Definition: time_tracking.hh:39
frame_farm.hh
PuzzledFrame::reset
void reset()
Definition: puzzled_frame.cc:398
seq_number
std::pair< uint32_t, bool > seq_number(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
Definition: clb_common_header.hh:88
Timer
Definition: timer.hh:10
ptrDispatcher::get
void * get()
Definition: ptr_dispatcher.hh:74
FrameFarm::m_timeslice_duration
unsigned int m_timeslice_duration
Definition: frame_farm.hh:41
FrameFarm::m_ebf_mutex
boost::mutex m_ebf_mutex
Definition: frame_farm.hh:37
PuzzledFrame::getFrame
void getFrame(Frame &frame)
Definition: puzzled_frame.cc:304
FrameFarm::m_run_number
int m_run_number
Definition: frame_farm.hh:47
sc_sleep
static const useconds_t sc_sleep
Definition: frame_farm.cc:9
sc_expusec
static const int sc_expusec
Definition: frame_farm.cc:13
FrameFarm::insert
bool insert(CLBDataGram *datagram)
Definition: frame_farm.cc:98
FrameFarm::getFrame
void getFrame(Frame &frame)
Definition: frame_farm.cc:34
ptrDispatcher::unlock
void unlock()
Definition: ptr_dispatcher.hh:57
std::pair
Definition: JSTDTypes.hh:15
data2idx
frame_idx_t data2idx(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
Definition: clb_common_header.hh:101
ptrDispatcher::lock
void lock()
Definition: ptr_dispatcher.hh:52
FrameFarm::m_max_dump_size
std::size_t m_max_dump_size
Definition: frame_farm.hh:45
CLBDataGram::hasValidTimeStamp
bool hasValidTimeStamp() const
Definition: clb_datagram.hh:123
frame_idx_t
uint64_t frame_idx_t
Definition: clb_common_header.hh:85
PuzzledFrame::setTimer
void setTimer(const Timer &exp)
Definition: puzzled_frame.hh:58
CLBCommonHeader
Definition: clb_common_header.hh:17
FrameFarm::operator()
void operator()()
Definition: frame_farm.cc:55
CLBDataGram
Definition: clb_datagram.hh:15
PuzzledFrame::hasExpiredWrt
bool hasExpiredWrt(Timer const &t) const
Definition: puzzled_frame.hh:90
input_buffer_collector.hh
LOG_ERROR
#define LOG_ERROR
Definition: log.hh:111
PuzzledFrame::setFrameIndex
void setFrameIndex(frame_idx_t frame_idx)
Definition: puzzled_frame.hh:78
ptrDispatcher::signal
void signal()
Definition: ptr_dispatcher.hh:47
InBufferCollector::getCollector
static InBufferCollector & getCollector()
Definition: input_buffer_collector.hh:27
Log::Counter::get
static Counter & get()
Definition: log.hh:53
FrameFarm::m_empty_buffers
buffers_collector_t m_empty_buffers
Definition: frame_farm.hh:33
ptrDispatcher::put_nolock
void put_nolock(void *pointer)
Definition: ptr_dispatcher.hh:96
CLBCommonHeader::timeStamp
UTCTime timeStamp() const
Definition: clb_common_header.hh:44
DumpFile::write
bool write(const CLBDataGram *data, unsigned int detector_id)
Definition: dump_file.hh:74
FrameFarm::m_detector_id
unsigned int m_detector_id
Definition: frame_farm.hh:48
FrameFarm::getEmptyPuzzledFrame
PuzzledFrame * getEmptyPuzzledFrame()
Definition: frame_farm.cc:189
PuzzledFrame::PuzzledFrame
PuzzledFrame()
Definition: puzzled_frame.hh:41
FrameFarm::m_running
boost::atomic< bool > m_running
Definition: frame_farm.hh:39
sc_expsec
static const int sc_expsec
Definition: frame_farm.cc:11
FrameFarm::~FrameFarm
~FrameFarm()
Definition: frame_farm.cc:15
PuzzledFrame
Definition: puzzled_frame.hh:18
UTCTime::tics
uint32_t tics() const
Definition: utctime.hh:22
CLBCommonHeader::domIdentifier
uint32_t domIdentifier() const
Definition: clb_common_header.hh:49