Jpp 19.3.0-rc.2
the software that should make you happy
Loading...
Searching...
No Matches
FrameFarm Class Reference

#include <frame_farm.hh>

Public Member Functions

 FrameFarm (unsigned int DeltaTS, uint64_t run_start_time, std::size_t max_dump_size, const std::string &dump_prefix, const std::string &dump_postfix)
 
 ~FrameFarm ()
 
void operator() ()
 
void stop ()
 
unsigned int getTimeSliceDuration ()
 
bool insert (CLBDataGram *datagram)
 
void getFrame (Frame &frame)
 
int runNumber () const
 
int runNumber (int rn)
 
unsigned int detectorId (unsigned int detector_id)
 

Private Types

typedef std::map< frame_idx_t, PuzzledFrame * > container_t
 
typedef std::deque< PuzzledFrame * > buffers_collector_t
 

Private Member Functions

PuzzledFramegetEmptyPuzzledFrame ()
 

Private Attributes

container_t m_container
 
ptrDispatcher m_frames
 
buffers_collector_t m_empty_buffers
 
boost::mutex m_mutex
 
boost::mutex m_ebf_mutex
 
boost::atomic< bool > m_running
 
unsigned int m_timeslice_duration
 
uint64_t m_start_run_ms
 
std::size_t m_max_dump_size
 
int m_run_number
 
unsigned int m_detector_id
 
DumpFile m_dumpfile
 

Detailed Description

Author
cpellegrino

Definition at line 23 of file frame_farm.hh.

Member Typedef Documentation

◆ container_t

Definition at line 25 of file frame_farm.hh.

◆ buffers_collector_t

Definition at line 27 of file frame_farm.hh.

Constructor & Destructor Documentation

◆ FrameFarm()

FrameFarm::FrameFarm ( unsigned int DeltaTS,
uint64_t run_start_time,
std::size_t max_dump_size,
const std::string & dump_prefix,
const std::string & dump_postfix )
inline

Definition at line 54 of file frame_farm.hh.

59 :
60 m_timeslice_duration(DeltaTS),
61 m_start_run_ms(run_start_time * 1000),
62 m_max_dump_size(max_dump_size),
63 m_run_number(0),
65 m_dumpfile(FilenameGenerator(dump_prefix, dump_postfix), m_max_dump_size)
66 {}
std::size_t m_max_dump_size
Definition frame_farm.hh:45
unsigned int m_detector_id
Definition frame_farm.hh:48
int m_run_number
Definition frame_farm.hh:47
DumpFile m_dumpfile
Definition frame_farm.hh:50
unsigned int m_timeslice_duration
Definition frame_farm.hh:41
uint64_t m_start_run_ms
Definition frame_farm.hh:43

◆ ~FrameFarm()

FrameFarm::~FrameFarm ( )

Definition at line 17 of file frame_farm.cc.

18{
19 for (
20 container_t::iterator it = m_container.begin(), et = m_container.end();
21 it != et;
22 ++it
23 ) {
24 delete it->second;
25 }
26
27 for (
28 buffers_collector_t::iterator it = m_empty_buffers.begin(), et = m_empty_buffers.end();
29 it != et;
30 ++it
31 ) {
32 delete *it;
33 }
34}
container_t m_container
Definition frame_farm.hh:29
buffers_collector_t m_empty_buffers
Definition frame_farm.hh:33

Member Function Documentation

◆ operator()()

void FrameFarm::operator() ( )

This method runs on an independent thread the polling of completed or expired frames.

Definition at line 57 of file frame_farm.cc.

58{
59 /** This method runs on an independent thread the polling of
60 * completed or expired frames.
61 *
62 *
63 **/
64
65 m_running = true;
66
67 while (m_running.load()) {
68 {
69 boost::mutex::scoped_lock lock(m_mutex);
70
71 container_t::iterator it = m_container.begin();
72
73 container_t::iterator et = m_container.end();
74
75 m_frames.lock();
76
77 Timer const t(0, 0);
78
79 while (it != et) {
80 PuzzledFrame* pframe = it->second;
81 if (pframe->hasExpiredWrt(t)) {
82 // Signal the completion!
83 m_container.erase(it++);
84 et = m_container.end();
85
86 benchmark::chrono_set(*pframe);
87
88 m_frames.put_nolock(pframe);
89 } else {
90 ++it;
91 }
92 }
95 }
96 usleep(sc_sleep);
97 }
98}
ptrDispatcher m_frames
Definition frame_farm.hh:31
boost::atomic< bool > m_running
Definition frame_farm.hh:39
boost::mutex m_mutex
Definition frame_farm.hh:35
bool hasExpiredWrt(Timer const &t) const
Definition timer.hh:11
void put_nolock(void *pointer)
static const useconds_t sc_sleep
Definition frame_farm.cc:9
void chrono_set(chronometrable &chr)

◆ stop()

void FrameFarm::stop ( )
inline

Definition at line 72 of file frame_farm.hh.

73 {
74 m_running = false;
76 }

◆ getTimeSliceDuration()

unsigned int FrameFarm::getTimeSliceDuration ( )
inline

Definition at line 78 of file frame_farm.hh.

79 {
81 }

◆ insert()

bool FrameFarm::insert ( CLBDataGram * datagram)

Definition at line 100 of file frame_farm.cc.

101{
102 CLBCommonHeader const*const header = datagram->getCLBCommonHeader();
103
104 if (!datagram->hasValidTimeStamp()) {
106 << "Time-unreliable datagram detected: "
107 << "DOM ID: " << header->domIdentifier() << ", "
108 << "time stamp secs: " << header->timeStamp().sec() << ", "
109 << "time stamp tics: " << header->timeStamp().tics() << ", "
110 << (m_max_dump_size ? "dumping to file" : "not dumping to file");
111
112 if (m_max_dump_size) {
113 bool const ok = m_dumpfile.write(datagram, m_detector_id);
114 if (!ok) {
115 LOG_ERROR << "Impossible to write time-unreliable data to dump file";
116 }
117 }
118
119 Log::Counter::get().add("dumped dgrams");
121 return false;
122 }
123
125 *header,
128
129 if (!seq_num.second) {
131 LOG_ERROR << "Discarding data which timestamp is before the run start time. "
132 << "DOM ID: " << header->domIdentifier() << ", "
133 << "seq_num: " << seq_num.first << ", "
134 << "run start time in ms: " << m_start_run_ms << ", "
135 << "time stamp secs: " << header->timeStamp().sec() << ", "
136 << "time stamp tics: " << header->timeStamp().tics() << ", "
137 << "timeslice duration in ms: " << m_timeslice_duration;
138
139 else LOG_DEBUG << "Discarding data which timestamp is before the run start time. "
140 << "DOM ID: " << header->domIdentifier() << ", "
141 << "seq_num: " << seq_num.first << ", "
142 << "run start time in ms: " << m_start_run_ms << ", "
143 << "time stamp secs: " << header->timeStamp().sec() << ", "
144 << "time stamp tics: " << header->timeStamp().tics() << ", "
145 << "timeslice duration in ms: " << m_timeslice_duration;
146
147 Log::Counter::get().add("old data");
149 return false;
150 }
151
152 if (
153 m_run_number != -1
154 && header->runNumber() != static_cast<unsigned int>(m_run_number)
155 ) {
156 LOG_DEBUG << "Discarding data which run number ("
157 << header->runNumber()
158 << ") is different from the one of the current run: "
159 << runNumber();
160 Log::Counter::get().add("wrong runno");
162 return false;
163 }
164
165 frame_idx_t const idx = data2idx(
166 *header,
169
170 bool insert_value;
171
172 boost::mutex::scoped_lock lock(m_mutex);
173
174 benchmark::chrono_set(*datagram);
175
176 container_t::const_iterator const it = m_container.find(idx);
177
178 benchmark::chrono_set(*datagram);
179
180 if (it != m_container.end()) {
181 insert_value = it->second->insert(datagram);
182 } else {
183 PuzzledFrame* const pframe = getEmptyPuzzledFrame();
184
185 pframe->setSeqNumber(seq_num.first);
186 pframe->setFrameIndex(idx);
187
188 pframe->setDataType(header->dataType());
190 insert_value = pframe->insert(datagram);
191
192 container_t::value_type const pair(idx, pframe);
193
194 m_container.insert(pair);
195 }
196
197 return insert_value;
198}
const CLBCommonHeader * getCLBCommonHeader() const
bool hasValidTimeStamp() const
bool write(const CLBDataGram *data, unsigned int detector_id)
Definition dump_file.hh:74
int runNumber() const
Definition frame_farm.hh:87
PuzzledFrame * getEmptyPuzzledFrame()
void putDataGram(CLBDataGram *p)
static InBufferCollector & getCollector()
void add(std::string const &tag)
Definition log.hh:60
static Counter & get()
Definition log.hh:53
void setDetectorId(unsigned int detector_id)
void setSeqNumber(unsigned int seqnumber)
void setFrameIndex(frame_idx_t frame_idx)
bool insert(CLBDataGram *datagram)
void setDataType(unsigned int datatype)
std::pair< uint32_t, bool > seq_number(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
frame_idx_t data2idx(CLBCommonHeader const &header, uint64_t start_run_ms, int ts_duration_ms)
uint64_t frame_idx_t
static const int error_tf
Definition frame_farm.cc:15
#define LOG_DEBUG
Definition log.hh:109
#define LOG_ERROR
Definition log.hh:111
uint32_t dataType() const
UTCTime timeStamp() const
uint32_t runNumber() const
uint32_t domIdentifier() const
uint32_t sec() const
Definition utctime.hh:17
uint32_t tics() const
Definition utctime.hh:22
uint64_t inMilliSeconds() const
Definition utctime.hh:27

◆ getFrame()

void FrameFarm::getFrame ( Frame & frame)

Definition at line 36 of file frame_farm.cc.

37{
38 frame.clear();
39 // Block until data is coming
40 PuzzledFrame* pframe = static_cast<PuzzledFrame*>(m_frames.get());
41
42 if (pframe) {
43 pframe->getFrame(frame);
44
45 pframe->reset();
46
47 boost::mutex::scoped_lock lock(m_ebf_mutex);
48
49 if (m_empty_buffers.size() > 1000) {
50 delete pframe;
51 } else {
52 m_empty_buffers.push_back(pframe);
53 }
54 }
55}
boost::mutex m_ebf_mutex
Definition frame_farm.hh:37
void getFrame(Frame &frame)

◆ runNumber() [1/2]

int FrameFarm::runNumber ( ) const
inline

Definition at line 87 of file frame_farm.hh.

88 {
89 return m_run_number;
90 }

◆ runNumber() [2/2]

int FrameFarm::runNumber ( int rn)
inline

Definition at line 92 of file frame_farm.hh.

93 {
94 return m_run_number = rn;
95 }

◆ detectorId()

unsigned int FrameFarm::detectorId ( unsigned int detector_id)
inline

Definition at line 97 of file frame_farm.hh.

98 {
99 return m_detector_id = detector_id;
100 }

◆ getEmptyPuzzledFrame()

PuzzledFrame * FrameFarm::getEmptyPuzzledFrame ( )
private

Definition at line 200 of file frame_farm.cc.

201{
202 PuzzledFrame* pframe;
203
204 boost::mutex::scoped_lock lock(m_ebf_mutex);
205
206 if (m_empty_buffers.size()) {
207 pframe = m_empty_buffers.front();
208
209 m_empty_buffers.pop_front();
210 lock.unlock();
211 } else {
212 lock.unlock();
213 pframe = new PuzzledFrame;
214 }
215
217
218 return pframe;
219}
void setTimer(const Timer &exp)
static const int sc_expusec
Definition frame_farm.cc:13
static const int sc_expsec
Definition frame_farm.cc:11

Member Data Documentation

◆ m_container

container_t FrameFarm::m_container
private

Definition at line 29 of file frame_farm.hh.

◆ m_frames

ptrDispatcher FrameFarm::m_frames
private

Definition at line 31 of file frame_farm.hh.

◆ m_empty_buffers

buffers_collector_t FrameFarm::m_empty_buffers
private

Definition at line 33 of file frame_farm.hh.

◆ m_mutex

boost::mutex FrameFarm::m_mutex
private

Definition at line 35 of file frame_farm.hh.

◆ m_ebf_mutex

boost::mutex FrameFarm::m_ebf_mutex
private

Definition at line 37 of file frame_farm.hh.

◆ m_running

boost::atomic<bool> FrameFarm::m_running
private

Definition at line 39 of file frame_farm.hh.

◆ m_timeslice_duration

unsigned int FrameFarm::m_timeslice_duration
private

Definition at line 41 of file frame_farm.hh.

◆ m_start_run_ms

uint64_t FrameFarm::m_start_run_ms
private

Definition at line 43 of file frame_farm.hh.

◆ m_max_dump_size

std::size_t FrameFarm::m_max_dump_size
private

Definition at line 45 of file frame_farm.hh.

◆ m_run_number

int FrameFarm::m_run_number
private

Definition at line 47 of file frame_farm.hh.

◆ m_detector_id

unsigned int FrameFarm::m_detector_id
private

Definition at line 48 of file frame_farm.hh.

◆ m_dumpfile

DumpFile FrameFarm::m_dumpfile
private

Definition at line 50 of file frame_farm.hh.


The documentation for this class was generated from the following files: