Jpp  master_rocky-40-g5f0272dcd
the software that should make you happy
JDQSimulator.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <limits>
6 #include <map>
7 
8 #include "TRandom3.h"
9 #include "JMath/JRandom.hh"
10 
11 #include "Jeep/JParser.hh"
12 #include "Jeep/JProperties.hh"
13 #include "Jeep/JTimer.hh"
14 #include "Jeep/JTimekeeper.hh"
16 #include "JDAQ/JDAQHeaderIO.hh"
17 #include "JDAQ/JDAQTimesliceIO.hh"
19 #include "JSupport/JSupport.hh"
20 #include "JLang/JException.hh"
21 #include "JLang/JRedirectStream.hh"
23 #include "JIO/JByteArrayIO.hh"
24 #include "JNet/JSocket.hh"
25 #include "JNet/JHostname.hh"
26 #include "JNet/JSocketBlocking.hh"
27 #include "JTools/JRange.hh"
28 
29 
30 namespace KM3NETDAQ {
31 
32 
33  using namespace JPP;
34 
35 
36  /**
37  * Data structure for configuration of JDQSimulator.
38  */
39  class JSource :
40  public std::string
41  {
42  public:
43  /**
44  * Default constructor.
45  */
46  JSource() :
47  std::string()
48  {}
49 
50 
51  /**
52  * Read JSource from input stream.
53  *
54  * \param in input stream
55  * \param source JSource
56  * \return input stream
57  */
58  friend inline std::istream& operator>>(std::istream& in, JSource& source)
59  {
60  int index;
61 
62  in >> index >> static_cast<std::string&>(source);
63 
64  return in;
65  }
66 
67 
68  /**
69  * Write JSource to output stream.
70  *
71  * \param out output stream
72  * \param source JSource
73  * \return output stream
74  */
75  friend inline std::ostream& operator<<(std::ostream& out, const JSource& source)
76  {
77  out << static_cast<const std::string&>(source);
78 
79  return out;
80  }
81  };
82 
83 
84  /**
85  * Data structure for configuration of JDataFilter.
86  */
87  class JTarget :
88  public JSocketBlocking
89  {
90  public:
91  /**
92  * Default constructor.
93  */
94  JTarget() :
96  {}
97 
98 
99  /**
100  * Read JTarget from input stream.
101  *
102  * \param in input stream
103  * \param target JTarget
104  * \return input stream
105  */
106  friend inline std::istream& operator>>(std::istream& in, JTarget& target)
107  {
108  using namespace std;
109  using namespace JPP;
110 
111  int index;
112  JHostname hostname;
113 
114  if (in >> index >> hostname) {
115 
116  try {
117 
118  target.connect(hostname.hostname, hostname.port);
119 
120  target.setTcpNoDelay (true);
121  target.setReuseAddress(true);
122  target.setKeepAlive (true);
123  target.setReceiveBufferSize(1024);
124  target.setSendBufferSize (1024*1024);
125  target.setNonBlocking (false);
126  }
127  catch(const JException& error) {
128  cout << error << endl;
129  target.close();
130  }
131  }
132 
133  return in;
134  }
135 
136 
137  /**
138  * Write JTarget to output stream.
139  *
140  * \param out output stream
141  * \param target JTarget
142  * \return output stream
143  */
144  friend inline std::ostream& operator<<(std::ostream& out, const JTarget& target)
145  {
146  using namespace std;
147 
148  out << "TCP no-delay " << target.getTcpNoDelay() << endl;
149  out << "Reuse address " << target.getReuseAddress() << endl;
150  out << "Keep alive " << target.getKeepAlive() << endl;
151  out << "Receive buffer " << target.getReceiveBufferSize() << endl;
152  out << "Send buffer " << target.getSendBufferSize() << endl;
153  out << "Non blocking " << target.getNonBlocking() << endl;
154 
155  return out;
156  }
157  };
158 
159 
160  typedef JRange<int> range_type; // frame index range
161  typedef std::map<int, range_type> map_type; // module inactivity
162 
163  /**
164  * Runcontrol client to simulate data queue.
165  * In state running, this application will send raw data to the data filters
166  * in a round robin way, based on the frame index.
167  */
168  class JDQSimulator :
169  public JDAQClient
170  {
171  public:
172  /**
173  * Constructor.
174  *
175  * \param name name of client
176  * \param server name of command message server
177  * \param logger pointer to logger
178  * \param level debug level
179  */
180  JDQSimulator(const std::string& name,
181  const std::string& server,
182  JLogger* logger,
183  const int level) :
184  JDAQClient(name, server, logger, level)
185  {
186  replaceEvent(RC_CMD, RC_DQSIMULATOR, ev_configure);
187  }
188 
189 
190  virtual void actionConfigure(int length, const char* buffer) override
191  {
192  using namespace std;
193  using namespace KM3NETDAQ;
194  using namespace JPP;
195 
196 
198  Long64_t numberOfEvents = 1;
199  int numberOfFrames = numeric_limits<int>::max();
200  double P = 0.0;
201 
202  JProperties properties(JEquationParameters("=", ";", "", ""));
203 
204  properties["source"] = source;
205  properties["target"] = target;
206  properties["snooze"] = snooze;
207  properties["inputFile"] = inputFile;
208  properties["numberOfEvents"] = numberOfEvents;
209  properties["numberOfFrames"] = numberOfFrames;
210  properties["probability"] = P;
211 
212  properties.read(string(buffer, length));
213 
214 
215  for (vector<JTarget>::iterator i = target.begin(); i != target.end(); ) {
216  if (i->is_open())
217  ++i;
218  else
219  i = target.erase(i);
220  }
221 
222  if (inputFile.empty()) { JErrorStream(logger) << "No input files"; }
223  if (target .empty()) { JErrorStream(logger) << "No targets"; }
224 
225  if (!snooze.empty()) {
226 
227  ostringstream os;
228 
229  os << "snooze";
230 
231  for (map_type::const_iterator i = snooze.begin(); i != snooze.end(); ++i) {
232  os << ' ' << setw(8) << i->first << " [" << FILL(6,'0') << i->second.getLowerLimit() << "," << FILL(6,'0') << i->second.getUpperLimit() << "]";
233  }
234 
235  JNoticeStream(logger) << os.str();
236  }
237 
238  const unsigned int index = distance(source.begin(), find(source.begin(), source.end(), getName()));
239 
240  int number_of_hits = 0;
241  int number_of_errors = 0;
242 
243  if (index < source.size()) {
244 
245  while (inputFile.hasNext()) {
246 
247  JDAQTimeslice* timeslice = inputFile.next();
248 
249  int i1 = (timeslice->size() * (index + 0)) / source.size();
250  int i2 = (timeslice->size() * (index + 1)) / source.size();
251 
252  if (i2 - i1 > numberOfFrames) {
253  i2 = i1 + numberOfFrames;
254  }
255 
256  for (int i = i1; i != i2; ++i) {
257 
258  JDAQSuperFrame& frame = timeslice->at(i);
259 
260  for (JDAQSuperFrame::iterator hit = frame.begin(); hit != frame.end(); ++hit) {
261 
262  ++number_of_hits;
263 
264  if (gRandom->Rndm() <= P) {
265 
266  *hit = JDAQHit(hit->getPMT(), getRandom<JDAQHit::JTDC_t>(), hit->getToT());
267 
268  ++number_of_errors;
269  }
270  }
271  }
272 
273  JDebugStream(logger) << "Processing timeslice: " << inputFile.getCounter() << " [" << i1 << "," << i2 << "]";
274 
275  data.push_back(JTimeslice(i2 - i1));
276 
277  for (int i = i1; i != i2; ++i) {
278  data.rbegin()->at(i - i1) << timeslice->at(i);
279  }
280  }
281 
282  JNoticeStream(logger) << "Number of errors / hits " << number_of_errors << " / " << number_of_hits << " for P = " << P;
283 
284  } else {
285 
286  JErrorStream(logger) << "Source not found in configuration data: " << getName();
287  }
288 
289  setClockInterval((long long int) (1e-3 * getFrameTime()));
290  }
291 
292 
293  virtual void actionReset(int length, const char* buffer) override
294  {
295  for (std::vector<JTarget>::iterator i = target.begin(); i != target.end(); ++i) {
296  i->close();
297  }
298 
299  target.clear();
300  source.clear();
301  data .clear();
302  }
303 
304 
305  virtual void actionQuit(int length, const char* buffer) override
306  {
307  actionReset(0, NULL);
308  }
309 
310 
311  virtual void actionStart(int length, const char* buffer) override
312  {
313  numberOfSlices = 0;
314  numberOfBytes = 0;
315 
316  data.setRunNumber(getRunNumber());
317  data.reset();
318 
319  timer.reset();
320 
321  resetClock();
322  }
323 
324 
325  virtual void actionStop(int length, const char* buffer) override
326  {
327  if (timer.usec_wall > 0) { JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s"; }
328  if (numberOfSlices > 0) { JNoticeStream(logger) << "Delay/slice " << (int) (getClockDelay() / numberOfSlices) << " us"; }
329  }
330 
331 
332  virtual void actionRunning() override
333  {
334  if (!data.empty() && !target.empty()) {
335 
336  timer.start();
337 
338  try {
339 
340  JDAQPreamble preamble;
341  JDAQSuperFrameHeader header;
342 
343  for (JTimeslice::const_iterator frame = data->begin(); frame != data->end(); ++frame) {
344 
346 
347  in >> preamble;
348  in >> header;
349 
350  if (snooze.count(header.getModuleID()) == 0 || !snooze[header.getModuleID()](header.getFrameIndex())) {
351 
352  JSocketBlocking& socket = target[header.getFrameIndex() % target.size()];
353 
354  socket.write(frame->data(), frame->size());
355 
356  numberOfBytes += frame->size();
357  }
358  }
359 
360  numberOfSlices += 1;
361  }
362  catch(const JException& exception) {
363  JErrorStream(logger) << exception;
364  }
365 
366  data.next();
367 
368  timer.stop();
369  }
370  }
371 
372  private:
376 
377 
378  /**
379  * Memory management for sending of raw data.
380  */
383 
384  class JData :
385  public std::vector<JTimeslice>
386  {
387  public:
388  /**
389  * Default constructor.
390  */
391  JData() :
392  std::vector<JTimeslice>()
393  {}
394 
395 
396  /**
397  * Set run number.
398  *
399  * \param run_number run number
400  */
401  void setRunNumber(int run_number)
402  {
403  JDAQSuperFrameHeader header;
404 
405  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
406 
407  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
408 
410 
412 
413  in >> header;
414 
415  header.setRunNumber(run_number);
416 
417  frame->seekp(getSizeof<JDAQPreamble>());
418 
419  *frame << header;
420  }
421  }
422  }
423 
424 
425  /**
426  * Reset internal iterator to begin.
427  */
428  void reset()
429  {
430  page = begin();
431  }
432 
433 
434  /**
435  * Increment internal iterator.
436  * When the internal iterator reaches the end of the data,
437  * the frame indices of the data are increased and
438  * the internal iterator is reset to the begin of data.
439  */
440  void next()
441  {
442  if (page != end() && ++page == end()) {
443 
444  JDAQSuperFrameHeader header;
445 
446  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
447 
448  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
449 
451 
453 
454  in >> header;
455 
456  header.setFrameIndex(header.getFrameIndex() + this->size());
457  header.setTimesliceStart(JDAQUTCExtended(getTimeOfFrame(header.getFrameIndex())));
458 
459  frame->seekp(getSizeof<JDAQPreamble>());
460 
461  *frame << header;
462  }
463  }
464 
465  reset();
466  }
467  }
468 
469 
470  /**
471  * Smart pointer operator.
472  *
473  * \return current iterator
474  */
475  const_iterator operator->()
476  {
477  return page;
478  }
479 
480 
481  private:
482  JData::const_iterator page;
483  };
484 
485 
487 
488  long long int numberOfSlices; // total number of timeslices
489  long long int numberOfBytes; // total number of bytes
490 
492  };
493 }
494 
495 
496 /**
497  * \file
498  *
499  * Program for real-time simulation of data queue.
500  * \author rbruijn
501  */
502 int main(int argc, char* argv[])
503 {
504  using namespace std;
505 
506  string server;
507  string logger;
508  string client_name;
509  bool use_cout;
510  int debug;
511 
512  try {
513 
514  JParser<> zap("Program for real-time simulation of data queue.");
515 
516  zap['H'] = make_field(server) = "localhost";
517  zap['M'] = make_field(logger) = "localhost";
518  zap['u'] = make_field(client_name) = "JDQSimulator";
519  zap['c'] = make_field(use_cout);
520  zap['d'] = make_field(debug) = 3;
521 
522  zap(argc, argv);
523  }
524  catch(const exception &error) {
525  FATAL(error.what() << endl);
526  }
527 
528 
529  using namespace KM3NETDAQ;
530  using namespace JPP;
531 
532  JLogger* out = NULL;
533 
534  if (use_cout)
535  out = new JStreamLogger(cout);
536  else
537  out = new JControlHostLogger(logger);
538 
539  JDQSimulator simbad(client_name, server, out, debug);
540 
541  simbad.enter();
542  simbad.run();
543 }
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Exceptions.
#define FATAL(A)
Definition: JMessage.hh:67
int debug
debug level
Definition: JSirene.cc:69
Scanning of objects from multiple files according a format that follows from the extension of each fi...
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2142
Utility class to parse parameter values.
Definition of random value generator.
Auxiliary class to define a range between two values.
Base class for interprocess communication.
ROOT TTree parameter settings of various packages.
Scheduling of actions via fixed latency intervals.
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
Utility class to parse parameter values.
Definition: JProperties.hh:501
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:33
Byte array binary input.
Definition: JByteArrayIO.hh:27
void seekg(const int pos)
Set read position.
Byte array binary output.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition: JException.hh:24
Message logging based on ControlHost.
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on std::ostream.
Blocking socket I/O.
int write(const char *buffer, const int length) override
Write data to socket.
Utility class to parse command line options.
Definition: JParser.hh:1698
General purpose class for object reading from a list of file names.
virtual bool hasNext() override
Check availability of next element.
counter_type getCounter() const
Get counter.
virtual const pointer_type & next() override
Get next element.
std::vector< value_type >::const_iterator const_iterator
Definition: JTimeslice.hh:34
std::vector< value_type >::iterator iterator
Definition: JTimeslice.hh:33
void setFrameIndex(const int frame_index)
Set frame index.
void setRunNumber(const int run)
Set run number.
Control unit client base class.
Definition: JDAQClient.hh:301
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
const_iterator end() const
Definition: JDAQFrame.hh:166
const_iterator begin() const
Definition: JDAQFrame.hh:165
Hit data structure.
Definition: JDAQHit.hh:35
Data frame of one optical module.
Data structure for UTC time.
void reset()
Reset internal iterator to begin.
JData()
Default constructor.
JData::const_iterator page
void setRunNumber(int run_number)
Set run number.
void next()
Increment internal iterator.
const_iterator operator->()
Smart pointer operator.
Runcontrol client to simulate data queue.
long long int numberOfBytes
std::vector< JFrame > JTimeslice
virtual void actionConfigure(int length, const char *buffer) override
virtual void actionStart(int length, const char *buffer) override
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
virtual void actionStop(int length, const char *buffer) override
std::vector< JTarget > target
JIO::JByteArrayWriter JFrame
Memory management for sending of raw data.
virtual void actionReset(int length, const char *buffer) override
virtual void actionQuit(int length, const char *buffer) override
JDQSimulator(const std::string &name, const std::string &server, JLogger *logger, const int level)
Constructor.
long long int numberOfSlices
std::vector< JSource > source
Data structure for configuration of JDQSimulator.
Definition: JDQSimulator.cc:41
friend std::ostream & operator<<(std::ostream &out, const JSource &source)
Write JSource to output stream.
Definition: JDQSimulator.cc:75
friend std::istream & operator>>(std::istream &in, JSource &source)
Read JSource from input stream.
Definition: JDQSimulator.cc:58
JSource()
Default constructor.
Definition: JDQSimulator.cc:46
Data structure for configuration of JDataFilter.
Definition: JDQSimulator.cc:89
friend std::istream & operator>>(std::istream &in, JTarget &target)
Read JTarget from input stream.
friend std::ostream & operator<<(std::ostream &out, const JTarget &target)
Write JTarget to output stream.
JTarget()
Default constructor.
Definition: JDQSimulator.cc:94
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:62
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
void reset(T &value)
Reset value.
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
double getTimeOfFrame(const int frame_index)
Get start time of frame in ns since start of run for a given frame index.
Definition: JDAQClock.hh:185
JRange< int > range_type
size_t getSizeof< JDAQSuperFrameHeader >()
Get size of type.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:66
size_t getSizeof< JDAQPreamble >()
Get size of type.
std::map< int, range_type > map_type
static const JNET::JTag RC_DQSIMULATOR
Definition: JDAQTags.hh:69
Definition: JSTDTypes.hh:14
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:330
Target.
Definition: JHead.hh:300
Level specific message streamers.
Auxiliary data structure for hostname and port number.
Definition: JHostname.hh:35
std::string hostname
Definition: JHostname.hh:171