Jpp  17.3.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDQSimulator.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <limits>
6 
7 #include "TRandom3.h"
8 #include "JMath/JRandom.hh"
9 
10 #include "Jeep/JParser.hh"
11 #include "Jeep/JProperties.hh"
12 #include "Jeep/JTimer.hh"
13 #include "Jeep/JTimekeeper.hh"
15 #include "JDAQ/JDAQHeaderIO.hh"
16 #include "JDAQ/JDAQTimesliceIO.hh"
18 #include "JSupport/JSupport.hh"
19 #include "JLang/JException.hh"
20 #include "JLang/JRedirectStream.hh"
22 #include "JIO/JByteArrayIO.hh"
23 #include "JNet/JSocket.hh"
24 #include "JNet/JHostname.hh"
25 #include "JNet/JSocketBlocking.hh"
26 
27 
28 namespace KM3NETDAQ {
29 
30 
31  using namespace JPP;
32 
33 
34  /**
35  * Data structure for configuration of JDQSimulator.
36  */
37  class JSource :
38  public std::string
39  {
40  public:
41  /**
42  * Default constructor.
43  */
44  JSource() :
45  std::string()
46  {}
47 
48 
49  /**
50  * Read JSource from input stream.
51  *
52  * \param in input stream
53  * \param source JSource
54  * \return input stream
55  */
56  friend inline std::istream& operator>>(std::istream& in, JSource& source)
57  {
58  int index;
59 
60  in >> index >> static_cast<std::string&>(source);
61 
62  return in;
63  }
64 
65 
66  /**
67  * Write JSource to output stream.
68  *
69  * \param out output stream
70  * \param source JSource
71  * \return output stream
72  */
73  friend inline std::ostream& operator<<(std::ostream& out, const JSource& source)
74  {
75  out << static_cast<const std::string&>(source);
76 
77  return out;
78  }
79  };
80 
81 
82  /**
83  * Data structure for configuration of JDataFilter.
84  */
85  class JTarget :
86  public JSocketBlocking
87  {
88  public:
89  /**
90  * Default constructor.
91  */
92  JTarget() :
94  {}
95 
96 
97  /**
98  * Read JTarget from input stream.
99  *
100  * \param in input stream
101  * \param target JTarget
102  * \return input stream
103  */
104  friend inline std::istream& operator>>(std::istream& in, JTarget& target)
105  {
106  using namespace std;
107  using namespace JPP;
108 
109  int index;
110  JHostname hostname;
111 
112  if (in >> index >> hostname) {
113 
114  try {
115 
116  target.connect(hostname.hostname, hostname.port);
117 
118  target.setTcpNoDelay (true);
119  target.setReuseAddress(true);
120  target.setKeepAlive (true);
121  target.setReceiveBufferSize(1024);
122  target.setSendBufferSize (1024*1024);
123  target.setNonBlocking (false);
124  }
125  catch(const JException& error) {
126  cout << error << endl;
127  target.close();
128  }
129  }
130 
131  return in;
132  }
133 
134 
135  /**
136  * Write JTarget to output stream.
137  *
138  * \param out output stream
139  * \param target JTarget
140  * \return output stream
141  */
142  friend inline std::ostream& operator<<(std::ostream& out, const JTarget& target)
143  {
144  using namespace std;
145 
146  out << "TCP no-delay " << target.getTcpNoDelay() << endl;
147  out << "Reuse address " << target.getReuseAddress() << endl;
148  out << "Keep alive " << target.getKeepAlive() << endl;
149  out << "Receive buffer " << target.getReceiveBufferSize() << endl;
150  out << "Send buffer " << target.getSendBufferSize() << endl;
151  out << "Non blocking " << target.getNonBlocking() << endl;
152 
153  return out;
154  }
155  };
156 
157 
158  /**
159  * Runcontrol client to simulate data queue.
160  * In state running, this application will send raw data to the data filters
161  * in a round robin way, based on the frame index.
162  */
163  class JDQSimulator :
164  public JDAQClient
165  {
166  public:
167  /**
168  * Constructor.
169  *
170  * \param name name of client
171  * \param server name of command message server
172  * \param logger pointer to logger
173  * \param level debug level
174  */
176  const std::string& server,
177  JLogger* logger,
178  const int level) :
179  JDAQClient(name, server, logger, level)
180  {
181  replaceEvent(RC_CMD, RC_DQSIMULATOR, ev_configure);
182  }
183 
184 
185  virtual void actionConfigure(int length, const char* buffer) override
186  {
187  using namespace std;
188  using namespace KM3NETDAQ;
189  using namespace JPP;
190 
191 
193  Long64_t numberOfEvents = 1;
194  int numberOfFrames = numeric_limits<int>::max();
195  double P = 0.0;
196 
197  JProperties properties(JEquationParameters("=", ";", "", ""));
198 
199  properties["source"] = source;
200  properties["target"] = target;
201  properties["inputFile"] = inputFile;
202  properties["numberOfEvents"] = numberOfEvents;
203  properties["numberOfFrames"] = numberOfFrames;
204  properties["probability"] = P;
205 
206  properties.read(string(buffer, length));
207 
208 
209  for (vector<JTarget>::iterator i = target.begin(); i != target.end(); ) {
210  if (i->is_open())
211  ++i;
212  else
213  i = target.erase(i);
214  }
215 
216  if (inputFile.empty()) { JErrorStream(logger) << "No input files"; }
217  if (target .empty()) { JErrorStream(logger) << "No targets"; }
218 
219  const unsigned int index = distance(source.begin(), find(source.begin(), source.end(), getName()));
220 
221  int number_of_hits = 0;
222  int number_of_errors = 0;
223 
224  if (index < source.size()) {
225 
226  while (inputFile.hasNext()) {
227 
228  JDAQTimeslice* timeslice = inputFile.next();
229 
230  int i1 = (timeslice->size() * (index + 0)) / source.size();
231  int i2 = (timeslice->size() * (index + 1)) / source.size();
232 
233  if (i2 - i1 > numberOfFrames) {
234  i2 = i1 + numberOfFrames;
235  }
236 
237  for (int i = i1; i != i2; ++i) {
238 
239  JDAQSuperFrame& frame = timeslice->at(i);
240 
241  for (JDAQSuperFrame::iterator hit = frame.begin(); hit != frame.end(); ++hit) {
242 
243  ++number_of_hits;
244 
245  if (gRandom->Rndm() <= P) {
246 
247  *hit = JDAQHit(hit->getPMT(), getRandom<JDAQHit::JTDC_t>(), hit->getToT());
248 
249  ++number_of_errors;
250  }
251  }
252  }
253 
254  JDebugStream(logger) << "Processing timeslice: " << inputFile.getCounter() << " [" << i1 << "," << i2 << "]";
255 
256  data.push_back(JTimeslice(i2 - i1));
257 
258  for (int i = i1; i != i2; ++i) {
259  data.rbegin()->at(i - i1) << timeslice->at(i);
260  }
261  }
262 
263  JNoticeStream(logger) << "Number of errors / hits " << number_of_errors << " / " << number_of_hits << " for P = " << P;
264 
265  } else {
266 
267  JErrorStream(logger) << "Source not found in configuration data: " << getName();
268  }
269 
270  setClockInterval((long long int) (1e-3 * getFrameTime()));
271  }
272 
273 
274  virtual void actionReset(int length, const char* buffer) override
275  {
276  for (std::vector<JTarget>::iterator i = target.begin(); i != target.end(); ++i) {
277  i->close();
278  }
279 
280  target.clear();
281  source.clear();
282  data .clear();
283  }
284 
285 
286  virtual void actionQuit(int length, const char* buffer) override
287  {
288  actionReset(0, NULL);
289  }
290 
291 
292  virtual void actionStart(int length, const char* buffer) override
293  {
294  numberOfSlices = 0;
295  numberOfBytes = 0;
296 
297  data.setRunNumber(getRunNumber());
298  data.reset();
299 
300  timer.reset();
301 
302  resetClock();
303  }
304 
305 
306  virtual void actionStop(int length, const char* buffer) override
307  {
308  if (timer.usec_wall > 0) { JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s"; }
309  if (numberOfSlices > 0) { JNoticeStream(logger) << "Delay/slice " << (int) (getClockDelay() / numberOfSlices) << " us"; }
310  }
311 
312 
313  virtual void actionRunning() override
314  {
315  if (!data.empty() && !target.empty()) {
316 
317  timer.start();
318 
319  try {
320 
321  JDAQPreamble preamble;
322  JDAQSuperFrameHeader header;
323 
324  for (JTimeslice::const_iterator frame = data->begin(); frame != data->end(); ++frame) {
325 
327 
328  in >> preamble;
329  in >> header;
330 
331  JSocketBlocking& socket = target[header.getFrameIndex() % target.size()];
332 
333  socket.write(frame->data(), frame->size());
334 
335  numberOfBytes += frame->size();
336  }
337 
338  numberOfSlices += 1;
339  }
340  catch(const JException& exception) {
341  JErrorStream(logger) << exception;
342  }
343 
344  data.next();
345 
346  timer.stop();
347  }
348  }
349 
350  private:
353 
354  /**
355  * Memory management for sending of raw data.
356  */
359 
360  class JData :
361  public std::vector<JTimeslice>
362  {
363  public:
364  /**
365  * Default constructor.
366  */
367  JData() :
368  std::vector<JTimeslice>()
369  {}
370 
371 
372  /**
373  * Set run number.
374  *
375  * \param run_number run number
376  */
377  void setRunNumber(int run_number)
378  {
379  JDAQSuperFrameHeader header;
380 
381  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
382 
383  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
384 
386 
387  in.seekg(getSizeof<JDAQPreamble>());
388 
389  in >> header;
390 
391  header.setRunNumber(run_number);
392 
393  frame->seekp(getSizeof<JDAQPreamble>());
394 
395  *frame << header;
396  }
397  }
398  }
399 
400 
401  /**
402  * Reset internal iterator to begin.
403  */
404  void reset()
405  {
406  page = begin();
407  }
408 
409 
410  /**
411  * Increment internal iterator.
412  * When the internal iterator reaches the end of the data,
413  * the frame indices of the data are increased and
414  * the internal iterator is reset to the begin of data.
415  */
416  void next()
417  {
418  if (page != end() && ++page == end()) {
419 
420  JDAQSuperFrameHeader header;
421 
422  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
423 
424  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
425 
427 
428  in.seekg(getSizeof<JDAQPreamble>());
429 
430  in >> header;
431 
432  header.setFrameIndex(header.getFrameIndex() + this->size());
433 
434  frame->seekp(getSizeof<JDAQPreamble>());
435 
436  *frame << header;
437  }
438  }
439 
440  reset();
441  }
442  }
443 
444 
445  /**
446  * Smart pointer operator.
447  *
448  * \return current iterator
449  */
450  const_iterator operator->()
451  {
452  return page;
453  }
454 
455 
456  private:
457  JData::const_iterator page;
458  };
459 
460 
462 
463  long long int numberOfSlices; // total number of timeslices
464  long long int numberOfBytes; // total number of bytes
465 
467  };
468 }
469 
470 
471 /**
472  * \file
473  *
474  * Program for real-time simulation of data queue.
475  * \author rbruijn
476  */
477 int main(int argc, char* argv[])
478 {
479  using namespace std;
480 
481  string server;
482  string logger;
483  string client_name;
484  bool use_cout;
485  int debug;
486 
487  try {
488 
489  JParser<> zap("Program for real-time simulation of data queue.");
490 
491  zap['H'] = make_field(server) = "localhost";
492  zap['M'] = make_field(logger) = "localhost";
493  zap['u'] = make_field(client_name) = "JDQSimulator";
494  zap['c'] = make_field(use_cout);
495  zap['d'] = make_field(debug) = 3;
496 
497  zap(argc, argv);
498  }
499  catch(const exception &error) {
500  FATAL(error.what() << endl);
501  }
502 
503 
504  using namespace KM3NETDAQ;
505  using namespace JPP;
506 
507  JLogger* out = NULL;
508 
509  if (use_cout)
510  out = new JStreamLogger(cout);
511  else
512  out = new JControlHostLogger(logger);
513 
514  JDQSimulator simbad(client_name, server, out, debug);
515 
516  simbad.enter();
517  simbad.run();
518 }
void setReuseAddress(const bool on)
Set reuse address.
Definition: JSocket.hh:109
Utility class to parse command line options.
Definition: JParser.hh:1517
General exception.
Definition: JException.hh:23
std::vector< value_type >::const_iterator const_iterator
Definition: JTimeslice.hh:34
int getSendBufferSize() const
Get send buffer size.
Definition: JSocket.hh:164
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
Exceptions.
int main(int argc, char *argv[])
Definition: Main.cc:15
Target.
Definition: JHead.hh:298
ROOT TTree parameter settings of various packages.
virtual void actionStart(int length, const char *buffer) override
bool getReuseAddress() const
Get reuse address.
Definition: JSocket.hh:120
JTarget()
Default constructor.
Definition: JDQSimulator.cc:92
void close()
Close file.
Definition: JFile.hh:55
Message logging based on std::ostream.
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
void setSendBufferSize(const int size)
Set send buffer size.
Definition: JSocket.hh:153
JData()
Default constructor.
void next()
Increment internal iterator.
bool getTcpNoDelay() const
Get TCP no-delay.
Definition: JTCPSocket.hh:128
int getReceiveBufferSize() const
Set receive buffer size.
Definition: JSocket.hh:142
int write(const char *buffer, const int length) override
Write data to socket.
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
std::vector< value_type >::iterator iterator
Definition: JTimeslice.hh:33
virtual void actionReset(int length, const char *buffer) override
Simple data structure to support I/O of equations (see class JLANG::JEquation).
std::vector< JFrame > JTimeslice
Definition of random value generator.
Runcontrol client to simulate data queue.
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:662
Auxiliary data structure for hostname and port number.
Definition: JHostname.hh:30
std::vector< JSource > source
JSource()
Default constructor.
Definition: JDQSimulator.cc:44
Utility class to parse parameter values.
bool getKeepAlive() const
Get keep alive of socket.
Definition: JSocket.hh:98
friend std::istream & operator>>(std::istream &in, JSource &source)
Read JSource from input stream.
Definition: JDQSimulator.cc:56
long long int numberOfSlices
Scheduling of actions via fixed latency intervals.
virtual void actionStop(int length, const char *buffer) override
virtual void actionConfigure(int length, const char *buffer) override
JIO::JByteArrayWriter JFrame
Memory management for sending of raw data.
JDQSimulator(const std::string &name, const std::string &server, JLogger *logger, const int level)
Constructor.
Hit data structure.
Definition: JDAQHit.hh:34
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1993
Byte array binary input.
Definition: JByteArrayIO.hh:25
const_iterator begin() const
Definition: JDAQFrame.hh:164
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
long long int numberOfBytes
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
void setTcpNoDelay(const bool on)
Set TCP no-delay.
Definition: JTCPSocket.hh:117
then awk string
Data time slice.
Data structure for configuration of JDQSimulator.
Definition: JDQSimulator.cc:37
Level specific message streamers.
bool getNonBlocking() const
Get non-blocking of I/O.
Definition: JTCPSocket.hh:66
std::vector< JTarget > target
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:131
Data structure for configuration of JDataFilter.
Definition: JDQSimulator.cc:85
#define FATAL(A)
Definition: JMessage.hh:67
Scanning of objects from multiple files according a format that follows from the extension of each fi...
Blocking socket I/O.
void reset(T &value)
Reset value.
Control unit client base class.
Definition: JDAQClient.hh:273
General purpose class for object reading from a list of file names.
Utility class to parse command line options.
JData::const_iterator page
void setRunNumber(int run_number)
Set run number.
friend std::ostream & operator<<(std::ostream &out, const JSource &source)
Write JSource to output stream.
Definition: JDQSimulator.cc:73
void connect(const int port)
Connect to port on local host.
Definition: JTCPSocket.hh:152
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:364
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JTCPSocket.hh:42
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
virtual void actionQuit(int length, const char *buffer) override
size_t getSizeof< JDAQSuperFrameHeader >()
Get size of type.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
static const JNET::JTag RC_DQSIMULATOR
Definition: JDAQTags.hh:47
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:87
void setRunNumber(const int run)
Set run number.
Byte array binary output.
friend std::istream & operator>>(std::istream &in, JTarget &target)
Read JTarget from input stream.
size_t getSizeof< JDAQPreamble >()
Get size of type.
KM3NeT DAQ constants, bit handling, etc.
Base class for interprocess communication.
void setFrameIndex(const int frame_index)
Set frame index.
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
void reset()
Reset internal iterator to begin.
friend std::ostream & operator<<(std::ostream &out, const JTarget &target)
Write JTarget to output stream.
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Data frame of one optical module.
int debug
debug level
then $DIR JPlotNPE PDG P
Definition: JPlotNPE-PDG.sh:62
const_iterator operator->()
Smart pointer operator.
const_iterator end() const
Definition: JDAQFrame.hh:165