Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDOMSimulator.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <limits>
6 
7 #include "Jeep/JParser.hh"
8 #include "Jeep/JProperties.hh"
9 #include "Jeep/JTimer.hh"
10 #include "Jeep/JTimekeeper.hh"
11 #include "JDAQ/JDAQ.hh"
12 #include "JDAQ/JDAQHeader.hh"
13 #include "JDAQ/JDAQTimeslice.hh"
15 #include "JSupport/JSupport.hh"
16 #include "JLang/JException.hh"
17 #include "JLang/JRedirectStream.hh"
19 #include "JIO/JByteArrayIO.hh"
20 #include "JNet/JSocket.hh"
21 #include "JNet/JHostname.hh"
22 #include "JNet/JSocketBlocking.hh"
23 
24 
25 namespace KM3NETDAQ {
26 
27 
28  using namespace JPP;
29 
30 
31  /**
32  * Data structure for configuration of JDOMSimulator.
33  */
34  class JSource :
35  public std::string
36  {
37  public:
38  /**
39  * Default constructor.
40  */
41  JSource() :
42  std::string()
43  {}
44 
45 
46  /**
47  * Read JSource from input stream.
48  *
49  * \param in input stream
50  * \param source JSource
51  * \return input stream
52  */
53  friend inline std::istream& operator>>(std::istream& in, JSource& source)
54  {
55  int index;
56 
57  in >> index >> static_cast<std::string&>(source);
58 
59  return in;
60  }
61 
62 
63  /**
64  * Write JSource to output stream.
65  *
66  * \param out output stream
67  * \param source JSource
68  * \return output stream
69  */
70  friend inline std::ostream& operator<<(std::ostream& out, const JSource& source)
71  {
72  out << static_cast<const std::string&>(source);
73 
74  return out;
75  }
76  };
77 
78 
79  /**
80  * Data structure for configuration of JDataFilter.
81  */
82  class JTarget :
83  public JSocketBlocking
84  {
85  public:
86  /**
87  * Default constructor.
88  */
89  JTarget() :
91  {}
92 
93 
94  /**
95  * Read JTarget from input stream.
96  *
97  * \param in input stream
98  * \param target JTarget
99  * \return input stream
100  */
101  friend inline std::istream& operator>>(std::istream& in, JTarget& target)
102  {
103  using namespace std;
104  using namespace JPP;
105 
106  int index;
107  JHostname hostname;
108 
109  if (in >> index >> hostname) {
110 
111  try {
112 
113  static_cast<JSocketBlocking&>(target) = JSocketBlocking(SOCK_STREAM);
114 
115  target.connect(hostname.hostname, hostname.port);
116 
117  target.setTcpNoDelay (true);
118  target.setReuseAddress(true);
119  target.setKeepAlive (true);
120  target.setReceiveBufferSize(1024);
121  target.setSendBufferSize (1024*1024);
122  //target.setNonBlocking (true);
123  target.setNonBlocking (false);
124  }
125  catch(const JException& error) {
126  cout << error << endl;
127  target.close();
128  }
129  }
130 
131  return in;
132  }
133 
134 
135  /**
136  * Write JTarget to output stream.
137  *
138  * \param out output stream
139  * \param target JTarget
140  * \return output stream
141  */
142  friend inline std::ostream& operator<<(std::ostream& out, const JTarget& target)
143  {
144  using namespace std;
145 
146  out << "TCP no-delay " << target.getTcpNoDelay() << endl;
147  out << "Reuse address " << target.getReuseAddress() << endl;
148  out << "Keep alive " << target.getKeepAlive() << endl;
149  out << "Receive buffer " << target.getReceiveBufferSize() << endl;
150  out << "Send buffer " << target.getSendBufferSize() << endl;
151  out << "Non blocking " << target.getNonBlocking() << endl;
152 
153  return out;
154  }
155  };
156 
157 
158  /**
159  * Runcontrol client to simulate DOM.
160  * In state running, this application will send raw data to the data filters
161  * in a round robin way, based on the frame index.
162  */
164  public JDAQClient
165  {
166  public:
167  /**
168  * Constructor.
169  *
170  * \param name name of client
171  * \param server name of command message server
172  * \param logger pointer to logger
173  * \param level debug level
174  */
175  JDOMSimulator(const std::string& name,
176  const std::string& server,
177  JLogger* logger,
178  const int level) :
179  JDAQClient(name, server, logger, level)
180  {
181  replaceEvent(RC_CMD, RC_DOMSIMULATOR, ev_configure);
182  }
183 
184 
185  virtual void actionConfigure(int length, const char* buffer)
186  {
187  using namespace std;
188  using namespace KM3NETDAQ;
189  using namespace JPP;
190 
191 
193  Long64_t numberOfEvents = 1;
194  int numberOfFrames = numeric_limits<int>::max();
195 
196  JProperties properties(JEquationParameters("=", ";", "", ""));
197 
198  properties["source"] = source;
199  properties["target"] = target;
200  properties["inputFile"] = inputFile;
201  properties["numberOfEvents"] = numberOfEvents;
202  properties["numberOfFrames"] = numberOfFrames;
203 
204  properties.read(string(buffer, length));
205 
206 
207  for (vector<JTarget>::iterator i = target.begin(); i != target.end(); ) {
208  if (i->is_open())
209  ++i;
210  else
211  i = target.erase(i);
212  }
213 
214  if (inputFile.empty()) JErrorStream(logger) << "No input files ";
215  if (target .empty()) JErrorStream(logger) << "No targets";
216 
217  const unsigned int index = distance(source.begin(), find(source.begin(), source.end(), getName()));
218 
219  if (index < source.size()) {
220 
221  while (inputFile.hasNext()) {
222 
223  JDAQTimeslice* timeslice = inputFile.next();
224 
225  int i1 = (timeslice->size() * (index + 0)) / source.size();
226  int i2 = (timeslice->size() * (index + 1)) / source.size();
227 
228  if (i2 - i1 > numberOfFrames) {
229  i2 = i1 + numberOfFrames;
230  }
231 
232  JDebugStream(logger) << "Processing timeslice: " << inputFile.getCounter() << " [" << i1 << "," << i2 << "]";
233 
234  data.push_back(JTimeslice(i2 - i1));
235 
236  for (int i = i1; i != i2; ++i) {
237  data.rbegin()->at(i - i1) << timeslice->at(i);
238  }
239  }
240 
241  } else {
242 
243  JErrorStream(logger) << "Source not found in configuration data: " << getName();
244  }
245 
246  setClockInterval((long long int) (1e-3 * getFrameTime()));
247  }
248 
249 
250  virtual void actionReset(int length, const char* buffer)
251  {
252  for (std::vector<JTarget>::iterator i = target.begin(); i != target.end(); ++i)
253  i->close();
254 
255  target.clear();
256  source.clear();
257  data .clear();
258  }
259 
260 
261  virtual void actionQuit(int length, const char* buffer)
262  {
263  actionReset(0, NULL);
264  }
265 
266 
267  virtual void actionStart(int length, const char* buffer)
268  {
269  numberOfSlices = 0;
270  numberOfBytes = 0;
271 
272  data.setRunNumber(getRunNumber());
273  data.reset();
274 
275  timer.reset();
276 
277  resetClock();
278  }
279 
280 
281  virtual void actionStop(int length, const char* buffer)
282  {
283  if (timer.usec_wall > 0) JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s";
284  if (numberOfSlices > 0) JNoticeStream(logger) << "Delay/slice " << (int) (getClockDelay() / numberOfSlices) << " us";
285  }
286 
287 
288  virtual void actionRunning()
289  {
290  if (!data.empty() && !target.empty()) {
291 
292  timer.start();
293 
294  try {
295 
296  JDAQPreamble preamble;
297  JDAQSuperFrameHeader header;
298 
299  for (JTimeslice::const_iterator frame = data->begin(); frame != data->end(); ++frame) {
300 
302 
303  in >> preamble;
304  in >> header;
305 
306  JSocketBlocking& socket = target[header.getFrameIndex() % target.size()];
307 
308  socket.write(frame->data(), frame->size());
309 
310  numberOfBytes += frame->size();
311  }
312 
313  numberOfSlices += 1;
314  }
315  catch(const JException& exception) {
316  JErrorStream(logger) << exception;
317  }
318 
319  data.next();
320 
321  timer.stop();
322  }
323  }
324 
325 
326  private:
329 
330  /**
331  * Memory management for sending of raw data.
332  */
335 
336  class JData :
337  public std::vector<JTimeslice>
338  {
339  public:
340  /**
341  * Default constructor.
342  */
343  JData() :
344  std::vector<JTimeslice>()
345  {}
346 
347 
348  /**
349  * Set run number.
350  *
351  * \param run_number run number
352  */
353  void setRunNumber(int run_number)
354  {
355  JDAQSuperFrameHeader header;
356 
357  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
358 
359  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
360 
362 
364 
365  in >> header;
366 
367  header.setRunNumber(run_number);
368 
369  frame->seekp(JDAQPreamble::sizeOf());
370 
371  *frame << header;
372  }
373  }
374  }
375 
376 
377  /**
378  * Reset internal iterator to begin.
379  */
380  void reset()
381  {
382  page = begin();
383  }
384 
385 
386  /**
387  * Increment internal iterator.
388  * When the internal iterator reaches the end of the data,
389  * the frame indices of the data are increased and
390  * the internal iterator is reset to the begin of data.
391  */
392  void next()
393  {
394  if (page != end() && ++page == end()) {
395 
396  JDAQSuperFrameHeader header;
397 
398  for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
399 
400  for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
401 
403 
405 
406  in >> header;
407 
408  header.setFrameIndex(header.getFrameIndex() + this->size());
409 
410  frame->seekp(JDAQPreamble::sizeOf());
411 
412  *frame << header;
413  }
414  }
415 
416  reset();
417  }
418  }
419 
420 
421  /**
422  * Smart pointer operator.
423  *
424  * \return current iterator
425  */
426  const_iterator operator->()
427  {
428  return page;
429  }
430 
431 
432  private:
433  JData::const_iterator page;
434  };
435 
436 
438 
439  long long int numberOfSlices; // total number of timeslices
440  long long int numberOfBytes; // total number of bytes
441 
443  };
444 }
445 
446 
447 /**
448  * \file
449  *
450  * Program for real-time simulation of optical modules.
451  * \author rbruijn
452  */
453 int main(int argc, char* argv[])
454 {
455  using namespace std;
456 
457  string server;
458  string logger;
459  string client_name;
460  bool use_cout;
461  int debug;
462 
463  try {
464 
465  JParser<> zap("Program for real-time simulation of optical modules.");
466 
467  zap['H'] = make_field(server) = "localhost";
468  zap['M'] = make_field(logger) = "localhost";
469  zap['u'] = make_field(client_name) = "JDOMSimulator";
470  zap['c'] = make_field(use_cout);
471  zap['d'] = make_field(debug) = 3;
472 
473  zap(argc, argv);
474  }
475  catch(const exception &error) {
476  FATAL(error.what() << endl);
477  }
478 
479 
480  using namespace KM3NETDAQ;
481  using namespace JPP;
482 
483  JLogger* out = NULL;
484 
485  if (use_cout)
486  out = new JStreamLogger(cout);
487  else
488  out = new JControlHostLogger(logger);
489 
490  JDOMSimulator simbad(client_name, server, out, debug);
491 
492  simbad.enter();
493  simbad.run();
494 }
void setReuseAddress(const bool on)
Set reuse address.
Definition: JSocket.hh:206
Utility class to parse command line options.
Definition: JParser.hh:1410
General exception.
Definition: JException.hh:40
std::vector< value_type >::const_iterator const_iterator
Definition: JTimeslice.hh:34
int getSendBufferSize() const
Get send buffer size.
Definition: JSocket.hh:283
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:665
Exceptions.
Target.
Definition: JHead.hh:146
bool getReuseAddress() const
Get reuse address.
Definition: JSocket.hh:217
void seekg(const int pos)
Set read position.
JTarget()
Default constructor.
void close()
Close file.
Definition: JFile.hh:55
void setRunNumber(int run_number)
Set run number.
JData()
Default constructor.
int write(const char *buffer, const int length)
Write data to socket.
std::vector< JFrame > JTimeslice
void setSendBufferSize(const int size)
Set send buffer size.
Definition: JSocket.hh:272
int getReceiveBufferSize() const
Set receive buffer size.
Definition: JSocket.hh:261
static int sizeOf()
Get size of object.
void next()
Increment internal iterator.
bool getNonBlocking() const
Get non-blocking of I/O.
Definition: JSocket.hh:133
Interface for logging messages.
Definition: JLogger.hh:22
const_iterator operator->()
Smart pointer operator.
Utility class to parse parameter values.
Definition: JProperties.hh:484
std::vector< JTarget > target
void reset(JCLBInput &data, size_t size)
Reset CLB buffers.
std::vector< value_type >::iterator iterator
Definition: JTimeslice.hh:33
virtual void actionReset(int length, const char *buffer)
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Runcontrol client to simulate DOM.
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
virtual void actionConfigure(int length, const char *buffer)
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:493
Auxiliary data structure for hostname and port number.
Definition: JHostname.hh:30
JSource()
Default constructor.
Utility class to parse parameter values.
bool getKeepAlive() const
Get keep alive of socket.
Definition: JSocket.hh:162
friend std::istream & operator>>(std::istream &in, JSource &source)
Read JSource from input stream.
Scheduling of actions via fixed latency intervals.
virtual void actionStop(int length, const char *buffer)
JIO::JByteArrayWriter JFrame
Memory management for sending of raw data.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
Byte array binary input.
Definition: JByteArrayIO.hh:25
std::vector< JSource > source
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
Data time slice.
Data structure for configuration of JDOMSimulator.
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:59
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
Data structure for configuration of JDataFilter.
#define FATAL(A)
Definition: JMessage.hh:65
Scanning of objects from multiple files according a format that follows from the extension of each fi...
Blocking socket I/O.
JDOMSimulator(const std::string &name, const std::string &server, JLogger *logger, const int level)
Constructor.
Run control client base class.
Definition: JDAQClient.hh:88
General purpose class for object reading from a list of file names.
Utility class to parse command line options.
virtual void actionQuit(int length, const char *buffer)
ROOT TTree parameter settings.
void connect(const int port)
Connect to port on local host.
Definition: JSocket.hh:384
friend std::ostream & operator<<(std::ostream &out, const JSource &source)
Write JSource to output stream.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:143
bool getTcpNoDelay() const
Get TCP no-delay.
Definition: JSocket.hh:239
virtual void actionStart(int length, const char *buffer)
const char * getName()
Get ROOT name of given data type.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
void setRunNumber(const int run)
Set run number.
void setTcpNoDelay(const bool on)
Set TCP no-delay.
Definition: JSocket.hh:228
Byte array binary output.
friend std::istream & operator>>(std::istream &in, JTarget &target)
Read JTarget from input stream.
KM3NeT DAQ constants, bit handling, etc.
void setFrameIndex(const int frame_index)
Set frame index.
JData::const_iterator page
friend std::ostream & operator<<(std::ostream &out, const JTarget &target)
Write JTarget to output stream.
void reset()
Reset internal iterator to begin.
long long int numberOfSlices
static const JNET::JTag RC_DOMSIMULATOR
Definition: JDAQTags.hh:46
static int sizeOf()
Get size of object.
int main(int argc, char *argv[])
Definition: Main.cpp:15