Jpp  18.5.2
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDQSimulator.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <limits>
6 #include <map>
7 
8 #include "TRandom3.h"
9 #include "JMath/JRandom.hh"
10 
11 #include "Jeep/JParser.hh"
12 #include "Jeep/JProperties.hh"
13 #include "Jeep/JTimer.hh"
14 #include "Jeep/JTimekeeper.hh"
16 #include "JDAQ/JDAQHeaderIO.hh"
17 #include "JDAQ/JDAQTimesliceIO.hh"
19 #include "JSupport/JSupport.hh"
20 #include "JLang/JException.hh"
21 #include "JLang/JRedirectStream.hh"
23 #include "JIO/JByteArrayIO.hh"
24 #include "JNet/JSocket.hh"
25 #include "JNet/JHostname.hh"
26 #include "JNet/JSocketBlocking.hh"
27 #include "JTools/JRange.hh"
28 
29 
30 namespace KM3NETDAQ {
31 
32 
33  using namespace JPP;
34 
35 
36  /**
37  * Data structure for configuration of JDQSimulator.
38  */
39  class JSource :
40  public std::string
41  {
42  public:
43  /**
44  * Default constructor.
45  */
46  JSource() :
47  std::string()
48  {}
49 
50 
51  /**
52  * Read JSource from input stream.
53  *
54  * \param in input stream
55  * \param source JSource
56  * \return input stream
57  */
58  friend inline std::istream& operator>>(std::istream& in, JSource& source)
59  {
60  int index;
61 
62  in >> index >> static_cast<std::string&>(source);
63 
64  return in;
65  }
66 
67 
68  /**
69  * Write JSource to output stream.
70  *
71  * \param out output stream
72  * \param source JSource
73  * \return output stream
74  */
75  friend inline std::ostream& operator<<(std::ostream& out, const JSource& source)
76  {
77  out << static_cast<const std::string&>(source);
78 
79  return out;
80  }
81  };
82 
83 
84  /**
85  * Data structure for configuration of JDataFilter.
86  */
87  class JTarget :
88  public JSocketBlocking
89  {
90  public:
91  /**
92  * Default constructor.
93  */
94  JTarget() :
96  {}
97 
98 
99  /**
100  * Read JTarget from input stream.
101  *
102  * \param in input stream
103  * \param target JTarget
104  * \return input stream
105  */
106  friend inline std::istream& operator>>(std::istream& in, JTarget& target)
107  {
108  using namespace std;
109  using namespace JPP;
110 
111  int index;
112  JHostname hostname;
113 
114  if (in >> index >> hostname) {
115 
116  try {
117 
118  target.connect(hostname.hostname, hostname.port);
119 
120  target.setTcpNoDelay (true);
121  target.setReuseAddress(true);
122  target.setKeepAlive (true);
123  target.setReceiveBufferSize(1024);
124  target.setSendBufferSize (1024*1024);
125  target.setNonBlocking (false);
126  }
127  catch(const JException& error) {
128  cout << error << endl;
129  target.close();
130  }
131  }
132 
133  return in;
134  }
135 
136 
137  /**
138  * Write JTarget to output stream.
139  *
140  * \param out output stream
141  * \param target JTarget
142  * \return output stream
143  */
144  friend inline std::ostream& operator<<(std::ostream& out, const JTarget& target)
145  {
146  using namespace std;
147 
148  out << "TCP no-delay " << target.getTcpNoDelay() << endl;
149  out << "Reuse address " << target.getReuseAddress() << endl;
150  out << "Keep alive " << target.getKeepAlive() << endl;
151  out << "Receive buffer " << target.getReceiveBufferSize() << endl;
152  out << "Send buffer " << target.getSendBufferSize() << endl;
153  out << "Non blocking " << target.getNonBlocking() << endl;
154 
155  return out;
156  }
157  };
158 
159 
160  typedef JRange<int> range_type; // frame index range
161  typedef std::map<int, range_type> map_type; // module inactivity
162 
163  /**
164  * Runcontrol client to simulate data queue.
165  * In state running, this application will send raw data to the data filters
166  * in a round robin way, based on the frame index.
167  */
168  class JDQSimulator :
169  public JDAQClient
170  {
171  public:
172  /**
173  * Constructor.
174  *
175  * \param name name of client
176  * \param server name of command message server
177  * \param logger pointer to logger
178  * \param level debug level
179  */
181  const std::string& server,
182  JLogger* logger,
183  const int level) :
184  JDAQClient(name, server, logger, level)
185  {
186  replaceEvent(RC_CMD, RC_DQSIMULATOR, ev_configure);
187  }
188 
189 
190  virtual void actionConfigure(int length, const char* buffer) override
191  {
192  using namespace std;
193  using namespace KM3NETDAQ;
194  using namespace JPP;
195 
196 
198  Long64_t numberOfEvents = 1;
199  int numberOfFrames = numeric_limits<int>::max();
200  double P = 0.0;
201 
202  JProperties properties(JEquationParameters("=", ";", "", ""));
203 
204  properties["source"] = source;
205  properties["target"] = target;
206  properties["snooze"] = snooze;
207  properties["inputFile"] = inputFile;
208  properties["numberOfEvents"] = numberOfEvents;
209  properties["numberOfFrames"] = numberOfFrames;
210  properties["probability"] = P;
211 
212  properties.read(string(buffer, length));
213 
214 
215  for (vector<JTarget>::iterator i = target.begin(); i != target.end(); ) {
216  if (i->is_open())
217  ++i;
218  else
219  i = target.erase(i);
220  }
221 
222  if (inputFile.empty()) { JErrorStream(logger) << "No input files"; }
223  if (target .empty()) { JErrorStream(logger) << "No targets"; }
224 
225  if (!snooze.empty()) {
226 
227  ostringstream os;
228 
229  os << "snooze";
230 
231  for (map_type::const_iterator i = snooze.begin(); i != snooze.end(); ++i) {
232  os << ' ' << setw(8) << i->first << " [" << FILL(6,'0') << i->second.getLowerLimit() << "," << FILL(6,'0') << i->second.getUpperLimit() << "]";
233  }
234 
235  JNoticeStream(logger) << os.str();
236  }
237 
238  const unsigned int index = distance(source.begin(), find(source.begin(), source.end(), getName()));
239 
240  int number_of_hits = 0;
241  int number_of_errors = 0;
242 
243  if (index < source.size()) {
244 
245  while (inputFile.hasNext()) {
246 
247  JDAQTimeslice* timeslice = inputFile.next();
248 
249  int i1 = (timeslice->size() * (index + 0)) / source.size();
250  int i2 = (timeslice->size() * (index + 1)) / source.size();
251 
252  if (i2 - i1 > numberOfFrames) {
253  i2 = i1 + numberOfFrames;
254  }
255 
256  for (int i = i1; i != i2; ++i) {
257 
258  JDAQSuperFrame& frame = timeslice->at(i);
259 
260  for (JDAQSuperFrame::iterator hit = frame.begin(); hit != frame.end(); ++hit) {
261 
262  ++number_of_hits;
263 
264  if (gRandom->Rndm() <= P) {
265 
266  *hit = JDAQHit(hit->getPMT(), getRandom<JDAQHit::JTDC_t>(), hit->getToT());
267 
268  ++number_of_errors;
269  }
270  }
271  }
272 
273  JDebugStream(logger) << "Processing timeslice: " << inputFile.getCounter() << " [" << i1 << "," << i2 << "]";
274 
275  data.push_back(JTimeslice(i2 - i1));
276 
277  for (int i = i1; i != i2; ++i) {
278  data.rbegin()->at(i - i1) << timeslice->at(i);
279  }
280  }
281 
282  JNoticeStream(logger) << "Number of errors / hits " << number_of_errors << " / " << number_of_hits << " for P = " << P;
283 
284  } else {
285 
286  JErrorStream(logger) << "Source not found in configuration data: " << getName();
287  }
288 
289  setClockInterval((long long int) (1e-3 * getFrameTime()));
290  }
291 
292 
293  virtual void actionReset(int length, const char* buffer) override
294  {
295  for (std::vector<JTarget>::iterator i = target.begin(); i != target.end(); ++i) {
296  i->close();
297  }
298 
299  target.clear();
300  source.clear();
301  data .clear();
302  }
303 
304 
305  virtual void actionQuit(int length, const char* buffer) override
306  {
307  actionReset(0, NULL);
308  }
309 
310 
311  virtual void actionStart(int length, const char* buffer) override
312  {
313  numberOfSlices = 0;
314  numberOfBytes = 0;
315 
316  data.setRunNumber(getRunNumber());
317  data.reset();
318 
319  timer.reset();
320 
321  resetClock();
322  }
323 
324 
325  virtual void actionStop(int length, const char* buffer) override
326  {
327  if (timer.usec_wall > 0) { JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s"; }
328  if (numberOfSlices > 0) { JNoticeStream(logger) << "Delay/slice " << (int) (getClockDelay() / numberOfSlices) << " us"; }
329  }
330 
331 
332  virtual void actionRunning() override
333  {
334  if (!data.empty() && !target.empty()) {
335 
336  timer.start();
337 
338  try {
339 
340  JDAQPreamble preamble;
341  JDAQSuperFrameHeader header;
342 
343  for (JTimeslice::const_iterator frame = data->begin(); frame != data->end(); ++frame) {
344 
346 
347  in >> preamble;
348  in >> header;
349 
350  if (snooze.count(header.getModuleID()) == 0 || !snooze[header.getModuleID()](header.getFrameIndex())) {
351 
352  JSocketBlocking& socket = target[header.getFrameIndex() % target.size()];
353 
354  socket.write(frame->data(), frame->size());
355 
356  numberOfBytes += frame->size();
357  }
358  }
359 
360  numberOfSlices += 1;
361  }
362  catch(const JException& exception) {
363  JErrorStream(logger) << exception;
364  }
365 
366  data.next();
367 
368  timer.stop();
369  }
370  }
371 
372  private:
376 
377 
378  /**
379  * Memory management for sending of raw data.
380  */
383 
384  class JData :
385  public std::vector<JTimeslice>
386  {
387  public:
388  /**
389  * Default constructor.
390  */
391  JData() :
392  std::vector<JTimeslice>()
393  {}
394 
395 
396  /**
397  * Set run number.
398  *
399  * \param run_number run number
400  */
401  void setRunNumber(int run_number)
402  {
403  JDAQSuperFrameHeader header;
404 
405  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
406 
407  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
408 
410 
411  in.seekg(getSizeof<JDAQPreamble>());
412 
413  in >> header;
414 
415  header.setRunNumber(run_number);
416 
417  frame->seekp(getSizeof<JDAQPreamble>());
418 
419  *frame << header;
420  }
421  }
422  }
423 
424 
425  /**
426  * Reset internal iterator to begin.
427  */
428  void reset()
429  {
430  page = begin();
431  }
432 
433 
434  /**
435  * Increment internal iterator.
436  * When the internal iterator reaches the end of the data,
437  * the frame indices of the data are increased and
438  * the internal iterator is reset to the begin of data.
439  */
440  void next()
441  {
442  if (page != end() && ++page == end()) {
443 
444  JDAQSuperFrameHeader header;
445 
446  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
447 
448  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
449 
451 
452  in.seekg(getSizeof<JDAQPreamble>());
453 
454  in >> header;
455 
456  header.setFrameIndex(header.getFrameIndex() + this->size());
457  header.setTimesliceStart(JDAQUTCExtended(getTimeOfFrame(header.getFrameIndex())));
458 
459  frame->seekp(getSizeof<JDAQPreamble>());
460 
461  *frame << header;
462  }
463  }
464 
465  reset();
466  }
467  }
468 
469 
470  /**
471  * Smart pointer operator.
472  *
473  * \return current iterator
474  */
475  const_iterator operator->()
476  {
477  return page;
478  }
479 
480 
481  private:
482  JData::const_iterator page;
483  };
484 
485 
487 
488  long long int numberOfSlices; // total number of timeslices
489  long long int numberOfBytes; // total number of bytes
490 
492  };
493 }
494 
495 
496 /**
497  * \file
498  *
499  * Program for real-time simulation of data queue.
500  * \author rbruijn
501  */
502 int main(int argc, char* argv[])
503 {
504  using namespace std;
505 
506  string server;
507  string logger;
508  string client_name;
509  bool use_cout;
510  int debug;
511 
512  try {
513 
514  JParser<> zap("Program for real-time simulation of data queue.");
515 
516  zap['H'] = make_field(server) = "localhost";
517  zap['M'] = make_field(logger) = "localhost";
518  zap['u'] = make_field(client_name) = "JDQSimulator";
519  zap['c'] = make_field(use_cout);
520  zap['d'] = make_field(debug) = 3;
521 
522  zap(argc, argv);
523  }
524  catch(const exception &error) {
525  FATAL(error.what() << endl);
526  }
527 
528 
529  using namespace KM3NETDAQ;
530  using namespace JPP;
531 
532  JLogger* out = NULL;
533 
534  if (use_cout)
535  out = new JStreamLogger(cout);
536  else
537  out = new JControlHostLogger(logger);
538 
539  JDQSimulator simbad(client_name, server, out, debug);
540 
541  simbad.enter();
542  simbad.run();
543 }
void setReuseAddress(const bool on)
Set reuse address.
Definition: JSocket.hh:111
Utility class to parse command line options.
Definition: JParser.hh:1514
General exception.
Definition: JException.hh:24
std::vector< value_type >::const_iterator const_iterator
Definition: JTimeslice.hh:34
int getSendBufferSize() const
Get send buffer size.
Definition: JSocket.hh:166
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Exceptions.
int main(int argc, char *argv[])
Definition: Main.cc:15
Target.
Definition: JHead.hh:298
ROOT TTree parameter settings of various packages.
virtual void actionStart(int length, const char *buffer) override
bool getReuseAddress() const
Get reuse address.
Definition: JSocket.hh:122
JTarget()
Default constructor.
Definition: JDQSimulator.cc:94
Message logging based on std::ostream.
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
void setSendBufferSize(const int size)
Set send buffer size.
Definition: JSocket.hh:155
JData()
Default constructor.
void next()
Increment internal iterator.
bool getTcpNoDelay() const
Get TCP no-delay.
Definition: JTCPSocket.hh:139
int getReceiveBufferSize() const
Set receive buffer size.
Definition: JSocket.hh:144
int write(const char *buffer, const int length) override
Write data to socket.
Interface for logging messages.
Definition: JLogger.hh:22
int close()
Close file.
Definition: JFile.hh:57
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:497
std::vector< value_type >::iterator iterator
Definition: JTimeslice.hh:33
virtual void actionReset(int length, const char *buffer) override
Simple data structure to support I/O of equations (see class JLANG::JEquation).
std::vector< JFrame > JTimeslice
Definition of random value generator.
Runcontrol client to simulate data queue.
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
Auxiliary data structure for hostname and port number.
Definition: JHostname.hh:33
Data structure for UTC time.
std::vector< JSource > source
JSource()
Default constructor.
Definition: JDQSimulator.cc:46
Utility class to parse parameter values.
JRange< int > range_type
double getTimeOfFrame(const int frame_index)
Get start time of frame in ns since start of run for a given frame index.
Definition: JDAQClock.hh:185
bool getKeepAlive() const
Get keep alive of socket.
Definition: JSocket.hh:100
friend std::istream & operator>>(std::istream &in, JSource &source)
Read JSource from input stream.
Definition: JDQSimulator.cc:58
long long int numberOfSlices
Scheduling of actions via fixed latency intervals.
virtual void actionStop(int length, const char *buffer) override
virtual void actionConfigure(int length, const char *buffer) override
JIO::JByteArrayWriter JFrame
Memory management for sending of raw data.
JDQSimulator(const std::string &name, const std::string &server, JLogger *logger, const int level)
Constructor.
Hit data structure.
Definition: JDAQHit.hh:34
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
const_iterator begin() const
Definition: JDAQFrame.hh:165
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
long long int numberOfBytes
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
void setTcpNoDelay(const bool on)
Set TCP no-delay.
Definition: JTCPSocket.hh:128
then awk string
Data time slice.
Data structure for configuration of JDQSimulator.
Definition: JDQSimulator.cc:39
Level specific message streamers.
bool getNonBlocking() const
Get non-blocking of I/O.
Definition: JTCPSocket.hh:77
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
std::vector< JTarget > target
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:133
Data structure for configuration of JDataFilter.
Definition: JDQSimulator.cc:87
#define FATAL(A)
Definition: JMessage.hh:67
Scanning of objects from multiple files according a format that follows from the extension of each fi...
Blocking socket I/O.
void reset(T &value)
Reset value.
Auxiliary class to define a range between two values.
Control unit client base class.
Definition: JDAQClient.hh:298
General purpose class for object reading from a list of file names.
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
Utility class to parse command line options.
JData::const_iterator page
void setRunNumber(int run_number)
Set run number.
friend std::ostream & operator<<(std::ostream &out, const JSource &source)
Write JSource to output stream.
Definition: JDQSimulator.cc:75
void connect(const int port)
Connect to port on local host.
Definition: JTCPSocket.hh:163
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JTCPSocket.hh:53
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:60
virtual void actionQuit(int length, const char *buffer) override
size_t getSizeof< JDAQSuperFrameHeader >()
Get size of type.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
static const JNET::JTag RC_DQSIMULATOR
Definition: JDAQTags.hh:63
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:89
void setRunNumber(const int run)
Set run number.
std::map< int, range_type > map_type
Byte array binary output.
friend std::istream & operator>>(std::istream &in, JTarget &target)
Read JTarget from input stream.
size_t getSizeof< JDAQPreamble >()
Get size of type.
KM3NeT DAQ constants, bit handling, etc.
Base class for interprocess communication.
void setFrameIndex(const int frame_index)
Set frame index.
void reset()
Reset internal iterator to begin.
friend std::ostream & operator<<(std::ostream &out, const JTarget &target)
Write JTarget to output stream.
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Data frame of one optical module.
int debug
debug level
then $DIR JPlotNPE PDG P
Definition: JPlotNPE-PDG.sh:62
const_iterator operator->()
Smart pointer operator.
const_iterator end() const
Definition: JDAQFrame.hh:166