Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
FrameFarm Class Reference

#include <frame_farm.hh>

Public Member Functions

 FrameFarm (unsigned int DeltaTS, uint64_t run_start_time, std::size_t max_dump_size, const std::string &dump_prefix, const std::string &dump_postfix)
 
 ~FrameFarm ()
 
void operator() ()
 
void stop ()
 
unsigned int getTimeSliceDuration ()
 
bool insert (CLBDataGram *datagram)
 
void getFrame (Frame &frame)
 
int runNumber () const
 
int runNumber (int rn)
 
unsigned int detectorId (unsigned int detector_id)
 

Private Types

typedef std::map< frame_idx_t,
PuzzledFrame * > 
container_t
 
typedef std::deque
< PuzzledFrame * > 
buffers_collector_t
 

Private Member Functions

PuzzledFramegetEmptyPuzzledFrame ()
 

Private Attributes

container_t m_container
 
ptrDispatcher m_frames
 
buffers_collector_t m_empty_buffers
 
boost::mutex m_mutex
 
boost::mutex m_ebf_mutex
 
boost::atomic< bool > m_running
 
unsigned int m_timeslice_duration
 
uint64_t m_start_run_ms
 
std::size_t m_max_dump_size
 
int m_run_number
 
unsigned int m_detector_id
 
DumpFile m_dumpfile
 

Detailed Description

Author
cpellegrino

Definition at line 23 of file frame_farm.hh.

Member Typedef Documentation

Definition at line 25 of file frame_farm.hh.

typedef std::deque<PuzzledFrame*> FrameFarm::buffers_collector_t
private

Definition at line 27 of file frame_farm.hh.

Constructor & Destructor Documentation

FrameFarm::FrameFarm ( unsigned int  DeltaTS,
uint64_t  run_start_time,
std::size_t  max_dump_size,
const std::string &  dump_prefix,
const std::string &  dump_postfix 
)
inline

Definition at line 54 of file frame_farm.hh.

59  :
60  m_timeslice_duration(DeltaTS),
61  m_start_run_ms(run_start_time * 1000),
62  m_max_dump_size(max_dump_size),
63  m_run_number(0),
64  m_detector_id(0),
65  m_dumpfile(FilenameGenerator(dump_prefix, dump_postfix), m_max_dump_size)
66  {}
DumpFile m_dumpfile
Definition: frame_farm.hh:50
std::size_t m_max_dump_size
Definition: frame_farm.hh:45
unsigned int m_timeslice_duration
Definition: frame_farm.hh:41
unsigned int m_detector_id
Definition: frame_farm.hh:48
uint64_t m_start_run_ms
Definition: frame_farm.hh:43
int m_run_number
Definition: frame_farm.hh:47
FrameFarm::~FrameFarm ( )

Definition at line 15 of file frame_farm.cc.

16 {
17  for (
18  container_t::iterator it = m_container.begin(), et = m_container.end();
19  it != et;
20  ++it
21  ) {
22  delete it->second;
23  }
24 
25  for (
26  buffers_collector_t::iterator it = m_empty_buffers.begin(), et = m_empty_buffers.end();
27  it != et;
28  ++it
29  ) {
30  delete *it;
31  }
32 }
buffers_collector_t m_empty_buffers
Definition: frame_farm.hh:33
container_t m_container
Definition: frame_farm.hh:29

Member Function Documentation

void FrameFarm::operator() ( )

This method runs on an independent thread the polling of completed or expired frames.

Definition at line 55 of file frame_farm.cc.

56 {
57  /** This method runs on an independent thread the polling of
58  * completed or expired frames.
59  *
60  *
61  **/
62 
63  m_running = true;
64 
65  while (m_running.load()) {
66  {
67  boost::mutex::scoped_lock lock(m_mutex);
68 
69  container_t::iterator it = m_container.begin();
70 
71  container_t::iterator et = m_container.end();
72 
73  m_frames.lock();
74 
75  Timer const t(0, 0);
76 
77  while (it != et) {
78  PuzzledFrame* pframe = it->second;
79  if (pframe->hasExpiredWrt(t)) {
80  // Signal the completion!
81  m_container.erase(it++);
82  et = m_container.end();
83 
84  benchmark::chrono_set(*pframe);
85 
86  m_frames.put_nolock(pframe);
87  } else {
88  ++it;
89  }
90  }
91  m_frames.signal();
92  m_frames.unlock();
93  }
94  usleep(sc_sleep);
95  }
96 }
void chrono_set(chronometrable &chr)
static const useconds_t sc_sleep
Definition: frame_farm.cc:9
bool hasExpiredWrt(Timer const &t) const
boost::mutex m_mutex
Definition: frame_farm.hh:35
ptrDispatcher m_frames
Definition: frame_farm.hh:31
container_t m_container
Definition: frame_farm.hh:29
boost::atomic< bool > m_running
Definition: frame_farm.hh:39
void put_nolock(void *pointer)
Definition: timer.hh:10
void FrameFarm::stop ( )
inline

Definition at line 72 of file frame_farm.hh.

73  {
74  m_running = false;
75  m_frames.close();
76  }
ptrDispatcher m_frames
Definition: frame_farm.hh:31
boost::atomic< bool > m_running
Definition: frame_farm.hh:39
unsigned int FrameFarm::getTimeSliceDuration ( )
inline

Definition at line 78 of file frame_farm.hh.

79  {
80  return m_timeslice_duration;
81  }
unsigned int m_timeslice_duration
Definition: frame_farm.hh:41
bool FrameFarm::insert ( CLBDataGram datagram)

Definition at line 98 of file frame_farm.cc.

99 {
100  if (m_max_dump_size && (!datagram->hasValidTimeStamp())) {
101  if (!m_dumpfile.write(datagram, m_detector_id)) {
102  static unsigned int n = 0;
103  const unsigned int n_iter = 10000;
104  if (n % n_iter == 0) {
105  LOG_DEBUG
106  << "Time-unreliable data detected. Dumped to file. "
107  << "This message has a suppression factor of " << n_iter;
108  n = 1;
109  }
110  }
111  Log::Counter::get().add("dumped dgrams");
113  return false;
114  }
115 
116  CLBCommonHeader const*const header = datagram->getCLBCommonHeader();
117 
118  int32_t const seq_num = seq_number(
119  *header,
122 
123  if (seq_num < 0) {
124  LOG_DEBUG << "Discarding data which timestamp is before the run start time. "
125  << "DOM ID: " << header->domIdentifier() << ", "
126  << "seq_num: " << seq_num << ", "
127  << "run start time in ms: " << m_start_run_ms << ", "
128  << "time stamp secs: " << header->timeStamp().sec() << ", "
129  << "timeslice duration in ms: " << m_timeslice_duration;
130 
131  Log::Counter::get().add("old data");
133  return false;
134  }
135 
136  if (
137  m_run_number != -1
138  && header->runNumber() != static_cast<unsigned int>(m_run_number)
139  ) {
140  LOG_DEBUG << "Discarding data which run number ("
141  << header->runNumber()
142  << ") is different from the one of the current run: "
143  << runNumber();
144  Log::Counter::get().add("wrong runno");
146  return false;
147  }
148 
149  frame_idx_t const idx = data2idx(
150  *header,
153 
154  bool insert_value;
155 
156  boost::mutex::scoped_lock lock(m_mutex);
157 
158  benchmark::chrono_set(*datagram);
159 
160  container_t::const_iterator const it = m_container.find(idx);
161 
162  benchmark::chrono_set(*datagram);
163 
164  if (it != m_container.end()) {
165  insert_value = it->second->insert(datagram);
166  } else {
167  PuzzledFrame* const pframe = getEmptyPuzzledFrame();
168 
169  pframe->setSeqNumber(seq_num);
170  pframe->setFrameIndex(idx);
171 
172  pframe->setDataType(header->dataType());
173  pframe->setDetectorId(m_detector_id);
174  insert_value = pframe->insert(datagram);
175 
176  container_t::value_type const pair(idx, pframe);
177 
178  m_container.insert(pair);
179  }
180 
181  return insert_value;
182 }
void chrono_set(chronometrable &chr)
int runNumber() const
Definition: frame_farm.hh:87
#define LOG_DEBUG
Definition: log.hh:109
uint32_t domIdentifier() const
uint32_t sec() const
Definition: utctime.hh:17
bool hasValidTimeStamp() const
const CLBCommonHeader * getCLBCommonHeader() const
Definition: clb_datagram.hh:90
void putDataGram(CLBDataGram *p)
bool write(const CLBDataGram *data, unsigned int detector_id)
Definition: dump_file.hh:74
uint32_t runNumber() const
frame_idx_t data2idx(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
static Counter & get()
Definition: log.hh:53
UTCTime timeStamp() const
bool insert(CLBDataGram *datagram)
PuzzledFrame * getEmptyPuzzledFrame()
Definition: frame_farm.cc:184
boost::mutex m_mutex
Definition: frame_farm.hh:35
DumpFile m_dumpfile
Definition: frame_farm.hh:50
uint64_t frame_idx_t
void setDetectorId(unsigned int detector_id)
std::size_t m_max_dump_size
Definition: frame_farm.hh:45
void setDataType(unsigned int datatype)
unsigned int m_timeslice_duration
Definition: frame_farm.hh:41
void add(std::string const &tag)
Definition: log.hh:60
void setFrameIndex(frame_idx_t frame_idx)
container_t m_container
Definition: frame_farm.hh:29
int32_t seq_number(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
static InBufferCollector & getCollector()
uint32_t dataType() const
unsigned int m_detector_id
Definition: frame_farm.hh:48
void setSeqNumber(unsigned int seqnumber)
uint64_t m_start_run_ms
Definition: frame_farm.hh:43
int m_run_number
Definition: frame_farm.hh:47
void FrameFarm::getFrame ( Frame frame)

Definition at line 34 of file frame_farm.cc.

35 {
36  frame.clear();
37  // Block until data is coming
38  PuzzledFrame* pframe = static_cast<PuzzledFrame*>(m_frames.get());
39 
40  if (pframe) {
41  pframe->getFrame(frame);
42 
43  pframe->reset();
44 
45  boost::mutex::scoped_lock lock(m_ebf_mutex);
46 
47  if (m_empty_buffers.size() > 1000) {
48  delete pframe;
49  } else {
50  m_empty_buffers.push_back(pframe);
51  }
52  }
53 }
buffers_collector_t m_empty_buffers
Definition: frame_farm.hh:33
ptrDispatcher m_frames
Definition: frame_farm.hh:31
boost::mutex m_ebf_mutex
Definition: frame_farm.hh:37
void getFrame(Frame &frame)
int FrameFarm::runNumber ( ) const
inline

Definition at line 87 of file frame_farm.hh.

88  {
89  return m_run_number;
90  }
int m_run_number
Definition: frame_farm.hh:47
int FrameFarm::runNumber ( int  rn)
inline

Definition at line 92 of file frame_farm.hh.

93  {
94  return m_run_number = rn;
95  }
int m_run_number
Definition: frame_farm.hh:47
unsigned int FrameFarm::detectorId ( unsigned int  detector_id)
inline

Definition at line 97 of file frame_farm.hh.

98  {
99  return m_detector_id = detector_id;
100  }
unsigned int m_detector_id
Definition: frame_farm.hh:48
PuzzledFrame * FrameFarm::getEmptyPuzzledFrame ( )
private

Definition at line 184 of file frame_farm.cc.

185 {
186  PuzzledFrame* pframe;
187 
188  boost::mutex::scoped_lock lock(m_ebf_mutex);
189 
190  if (m_empty_buffers.size()) {
191  pframe = m_empty_buffers.front();
192 
193  m_empty_buffers.pop_front();
194  lock.unlock();
195  } else {
196  lock.unlock();
197  pframe = new PuzzledFrame;
198  }
199 
200  pframe->setTimer(Timer(sc_expsec, sc_expusec));
201 
202  return pframe;
203 }
static const int sc_expusec
Definition: frame_farm.cc:13
buffers_collector_t m_empty_buffers
Definition: frame_farm.hh:33
void setTimer(const Timer &exp)
static const int sc_expsec
Definition: frame_farm.cc:11
boost::mutex m_ebf_mutex
Definition: frame_farm.hh:37
Definition: timer.hh:10

Member Data Documentation

container_t FrameFarm::m_container
private

Definition at line 29 of file frame_farm.hh.

ptrDispatcher FrameFarm::m_frames
private

Definition at line 31 of file frame_farm.hh.

buffers_collector_t FrameFarm::m_empty_buffers
private

Definition at line 33 of file frame_farm.hh.

boost::mutex FrameFarm::m_mutex
private

Definition at line 35 of file frame_farm.hh.

boost::mutex FrameFarm::m_ebf_mutex
private

Definition at line 37 of file frame_farm.hh.

boost::atomic<bool> FrameFarm::m_running
private

Definition at line 39 of file frame_farm.hh.

unsigned int FrameFarm::m_timeslice_duration
private

Definition at line 41 of file frame_farm.hh.

uint64_t FrameFarm::m_start_run_ms
private

Definition at line 43 of file frame_farm.hh.

std::size_t FrameFarm::m_max_dump_size
private

Definition at line 45 of file frame_farm.hh.

int FrameFarm::m_run_number
private

Definition at line 47 of file frame_farm.hh.

unsigned int FrameFarm::m_detector_id
private

Definition at line 48 of file frame_farm.hh.

DumpFile FrameFarm::m_dumpfile
private

Definition at line 50 of file frame_farm.hh.


The documentation for this class was generated from the following files: