Jpp  master_rocky
the software that should make you happy
frame_farm.cc
Go to the documentation of this file.
1 #include "frame_farm.hh"
3 #include <log.hh>
4 
5 /**
6  * \author cpellegrino
7  */
8 
9 static const useconds_t sc_sleep = 100000; // 100000 = 100 ms, 10000 = 10 ms
10 
11 static const int sc_expsec = 0;
12 
13 static const int sc_expusec = 3000000;
14 
15 static const int error_tf = 3600000; //1 hour
16 
18 {
19  for (
20  container_t::iterator it = m_container.begin(), et = m_container.end();
21  it != et;
22  ++it
23  ) {
24  delete it->second;
25  }
26 
27  for (
28  buffers_collector_t::iterator it = m_empty_buffers.begin(), et = m_empty_buffers.end();
29  it != et;
30  ++it
31  ) {
32  delete *it;
33  }
34 }
35 
37 {
38  frame.clear();
39  // Block until data is coming
40  PuzzledFrame* pframe = static_cast<PuzzledFrame*>(m_frames.get());
41 
42  if (pframe) {
43  pframe->getFrame(frame);
44 
45  pframe->reset();
46 
47  boost::mutex::scoped_lock lock(m_ebf_mutex);
48 
49  if (m_empty_buffers.size() > 1000) {
50  delete pframe;
51  } else {
52  m_empty_buffers.push_back(pframe);
53  }
54  }
55 }
56 
58 {
59  /** This method runs on an independent thread the polling of
60  * completed or expired frames.
61  *
62  *
63  **/
64 
65  m_running = true;
66 
67  while (m_running.load()) {
68  {
69  boost::mutex::scoped_lock lock(m_mutex);
70 
71  container_t::iterator it = m_container.begin();
72 
73  container_t::iterator et = m_container.end();
74 
75  m_frames.lock();
76 
77  Timer const t(0, 0);
78 
79  while (it != et) {
80  PuzzledFrame* pframe = it->second;
81  if (pframe->hasExpiredWrt(t)) {
82  // Signal the completion!
83  m_container.erase(it++);
84  et = m_container.end();
85 
86  benchmark::chrono_set(*pframe);
87 
88  m_frames.put_nolock(pframe);
89  } else {
90  ++it;
91  }
92  }
93  m_frames.signal();
94  m_frames.unlock();
95  }
96  usleep(sc_sleep);
97  }
98 }
99 
101 {
102  CLBCommonHeader const*const header = datagram->getCLBCommonHeader();
103 
104  if (!datagram->hasValidTimeStamp()) {
105  LOG_DEBUG
106  << "Time-unreliable datagram detected: "
107  << "DOM ID: " << header->domIdentifier() << ", "
108  << "time stamp secs: " << header->timeStamp().sec() << ", "
109  << "time stamp tics: " << header->timeStamp().tics() << ", "
110  << (m_max_dump_size ? "dumping to file" : "not dumping to file");
111 
112  if (m_max_dump_size) {
113  bool const ok = m_dumpfile.write(datagram, m_detector_id);
114  if (!ok) {
115  LOG_ERROR << "Impossible to write time-unreliable data to dump file";
116  }
117  }
118 
119  Log::Counter::get().add("dumped dgrams");
121  return false;
122  }
123 
124  std::pair<uint32_t, bool> const seq_num = seq_number(
125  *header,
128 
129  if (!seq_num.second) {
130  if((m_start_run_ms - header->timeStamp().inMilliSeconds())>=error_tf)
131  LOG_ERROR << "Discarding data which timestamp is before the run start time. "
132  << "DOM ID: " << header->domIdentifier() << ", "
133  << "seq_num: " << seq_num.first << ", "
134  << "run start time in ms: " << m_start_run_ms << ", "
135  << "time stamp secs: " << header->timeStamp().sec() << ", "
136  << "time stamp tics: " << header->timeStamp().tics() << ", "
137  << "timeslice duration in ms: " << m_timeslice_duration;
138 
139  else LOG_DEBUG << "Discarding data which timestamp is before the run start time. "
140  << "DOM ID: " << header->domIdentifier() << ", "
141  << "seq_num: " << seq_num.first << ", "
142  << "run start time in ms: " << m_start_run_ms << ", "
143  << "time stamp secs: " << header->timeStamp().sec() << ", "
144  << "time stamp tics: " << header->timeStamp().tics() << ", "
145  << "timeslice duration in ms: " << m_timeslice_duration;
146 
147  Log::Counter::get().add("old data");
149  return false;
150  }
151 
152  if (
153  m_run_number != -1
154  && header->runNumber() != static_cast<unsigned int>(m_run_number)
155  ) {
156  LOG_DEBUG << "Discarding data which run number ("
157  << header->runNumber()
158  << ") is different from the one of the current run: "
159  << runNumber();
160  Log::Counter::get().add("wrong runno");
162  return false;
163  }
164 
165  frame_idx_t const idx = data2idx(
166  *header,
169 
170  bool insert_value;
171 
172  boost::mutex::scoped_lock lock(m_mutex);
173 
174  benchmark::chrono_set(*datagram);
175 
176  container_t::const_iterator const it = m_container.find(idx);
177 
178  benchmark::chrono_set(*datagram);
179 
180  if (it != m_container.end()) {
181  insert_value = it->second->insert(datagram);
182  } else {
183  PuzzledFrame* const pframe = getEmptyPuzzledFrame();
184 
185  pframe->setSeqNumber(seq_num.first);
186  pframe->setFrameIndex(idx);
187 
188  pframe->setDataType(header->dataType());
189  pframe->setDetectorId(m_detector_id);
190  insert_value = pframe->insert(datagram);
191 
192  container_t::value_type const pair(idx, pframe);
193 
194  m_container.insert(pair);
195  }
196 
197  return insert_value;
198 }
199 
201 {
202  PuzzledFrame* pframe;
203 
204  boost::mutex::scoped_lock lock(m_ebf_mutex);
205 
206  if (m_empty_buffers.size()) {
207  pframe = m_empty_buffers.front();
208 
209  m_empty_buffers.pop_front();
210  lock.unlock();
211  } else {
212  lock.unlock();
213  pframe = new PuzzledFrame;
214  }
215 
216  pframe->setTimer(Timer(sc_expsec, sc_expusec));
217 
218  return pframe;
219 }
const CLBCommonHeader * getCLBCommonHeader() const
Definition: clb_datagram.hh:90
bool hasValidTimeStamp() const
bool write(const CLBDataGram *data, unsigned int detector_id)
Definition: dump_file.hh:74
void getFrame(Frame &frame)
Definition: frame_farm.cc:36
ptrDispatcher m_frames
Definition: frame_farm.hh:31
int runNumber() const
Definition: frame_farm.hh:87
std::size_t m_max_dump_size
Definition: frame_farm.hh:45
unsigned int m_detector_id
Definition: frame_farm.hh:48
int m_run_number
Definition: frame_farm.hh:47
void operator()()
Definition: frame_farm.cc:57
bool insert(CLBDataGram *datagram)
Definition: frame_farm.cc:100
PuzzledFrame * getEmptyPuzzledFrame()
Definition: frame_farm.cc:200
DumpFile m_dumpfile
Definition: frame_farm.hh:50
container_t m_container
Definition: frame_farm.hh:29
unsigned int m_timeslice_duration
Definition: frame_farm.hh:41
boost::atomic< bool > m_running
Definition: frame_farm.hh:39
boost::mutex m_ebf_mutex
Definition: frame_farm.hh:37
boost::mutex m_mutex
Definition: frame_farm.hh:35
uint64_t m_start_run_ms
Definition: frame_farm.hh:43
buffers_collector_t m_empty_buffers
Definition: frame_farm.hh:33
Template Frame for ARS data.
Definition: frame.hh:13
void putDataGram(CLBDataGram *p)
static InBufferCollector & getCollector()
void add(std::string const &tag)
Definition: log.hh:60
static Counter & get()
Definition: log.hh:53
void setDetectorId(unsigned int detector_id)
void setSeqNumber(unsigned int seqnumber)
void setFrameIndex(frame_idx_t frame_idx)
void getFrame(Frame &frame)
bool insert(CLBDataGram *datagram)
void setDataType(unsigned int datatype)
bool hasExpiredWrt(Timer const &t) const
void setTimer(const Timer &exp)
Definition: timer.hh:11
void put_nolock(void *pointer)
frame_idx_t data2idx(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
uint64_t frame_idx_t
std::pair< uint32_t, bool > seq_number(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
static const int sc_expusec
Definition: frame_farm.cc:13
static const int error_tf
Definition: frame_farm.cc:15
static const useconds_t sc_sleep
Definition: frame_farm.cc:9
static const int sc_expsec
Definition: frame_farm.cc:11
#define LOG_DEBUG
Definition: log.hh:109
#define LOG_ERROR
Definition: log.hh:111
void chrono_set(chronometrable &chr)
uint32_t dataType() const
UTCTime timeStamp() const
uint32_t runNumber() const
uint32_t domIdentifier() const
uint32_t sec() const
Definition: utctime.hh:17
uint32_t tics() const
Definition: utctime.hh:22
uint64_t inMilliSeconds() const
Definition: utctime.hh:27