Jpp - the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JPhysics/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * JDataFilter::setSelect and
93  * JDataFilter::actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
96  * When a JDAQTimeslice is complete,
97  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
98  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
99  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
100  * <pre>
101  * numberOfFramesPerSlice = frames_per_slice = 1;
102  * </pre>
103  * Note that this parameter can change during operation (see below).
104  *
105  * A timeout may occur when the total amount of data or the number of incomplete time slices
106  * in the queue exceeds the corresponding pre-set limit:\n
107  * <pre>
108  * queueSize = <maximal queue size [B]>;
109  * queueDepth = <maximal queue depth>;
110  * </pre>
111  * Note that these values apply per JDataFilter.\n
112  *
113  * When a timeout occurs,
114  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
115  * When data are received with a frame index below the current frame index,
116  * the parameter <tt>frames_per_slice</tt> is incremented by one,
117  * upto the original value set at JDataFilter::actionConfigure.\n
118  *
119  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
120  * The following parameters can be used to configure the circular buffer.\n
121  * <pre>
122  * path = <write directory for circular buffer>;
123  * c_sizeL0 = <L0 buffer size>;
124  * c_sizeL1 = <L1 buffer size>;
125  * c_sizeL2 = <L2 buffer size>;
126  * c_sizeSN = <SN buffer size>;
127  * </pre>
128  *
129  * Note that when path is set to a valid director:
130  * - There will always be a file created which is closed when the state machine is exited;
131  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
132  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
133  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
134  * - After closure of the file, a new file will be opened if path is set;
135  *
136  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
137  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
138  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
139  * (e.g. when there is more than one alert on a given day).
140  *
141  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
142  *
143  * The script JDOMandDataFilter.sh can be used to test this application.
144  */
145  class JDataFilter :
146  public JDAQClient
147  {
148  public:
149 
152 
153  typedef double hit_type;
159 
160  /**
161  * Circular buffer.
162  */
164  public JTreeRecorder<JDAQTimesliceTypes_t>
165  {
167 
168  /**
169  * Default constructor.
170  */
172  {
173  disable();
174  }
175 
176  /**
177  * Open file.
178  *
179  * \param path directory
180  * \param tag unique process tag
181  */
182  void open(const std::string& path, const JTag& tag)
183  {
184  using namespace std;
185 
186  const JDateAndTime cal;
187 
188  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
189 
190  ostringstream os;
191 
192  os << getFullPath(path)
193  << "KM3NeT"
194  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
195  << "_" << tag;
196 
197  if (i != 0) {
198  os << "_" << i;
199  }
200 
201  os << ".root";
202 
203  try {
204  this->open(os.str().c_str());
205  }
206  catch(const exception& error) {}
207  }
208  }
209 
210  /**
211  * Disable writing.
212  */
213  void disable()
214  {
215  sizeL0 = 0;
216  sizeL1 = 0;
217  sizeL2 = 0;
218  sizeSN = 0;
219  }
220 
221  /**
222  * Check whether writing of data is enabled.
223  *
224  * \return true if writing enabled; else false
225  */
226  bool is_enabled() const
227  {
228  return (sizeL0 > 0 ||
229  sizeL1 > 0 ||
230  sizeL2 > 0 ||
231  sizeSN > 0);
232  }
233 
234  /**
235  * Write circular buffer to output stream.
236  *
237  * \param out output stream
238  * \param object circular buffer
239  * \return output stream
240  */
241  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
242  {
243  if (object.is_open())
244  out << object.getFile()->GetName();
245  else
246  out << "void";
247 
248  out << ' ';
249 
250  return out << object.sizeL0 << '/'
251  << object.sizeL1 << '/'
252  << object.sizeL2 << '/'
253  << object.sizeSN << '/';
254  }
255 
256  Long64_t sizeL0; //!< Number of L0 time slices
257  Long64_t sizeL1; //!< Number of L1 time slices
258  Long64_t sizeL2; //!< Number of L2 time slices
259  Long64_t sizeSN; //!< Number of SN time slices
260  };
261 
262 
263  /**
264  * Sort DAQ process by index.
265  *
266  * \param first first DAQ process
267  * \param second second DAQ process
268  * \return true if index of first DAQ process less than that of second; else false
269  */
270  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
271  {
272  return first.index < second.index;
273  }
274 
275 
276  /**
277  * Constructor.
278  *
279  * \param name name of client
280  * \param server name of command message server
281  * \param hostname name of data server
282  * \param logger pointer to logger
283  * \param level debug level
284  * \param port server port
285  * \param backlog server backlog
286  * \param buffer_size server buffer
287  * \param path directory for writing circular buffer
288  */
289  JDataFilter(const std::string& name,
290  const std::string& server,
291  const std::string& hostname,
292  JLogger* logger,
293  const int level,
294  const int port,
295  const int backlog,
296  const int buffer_size,
297  const std::string& path) :
298  JDAQClient (name,server,logger,level),
299  hostname (hostname),
300  port (port),
301  backlog (backlog),
302  buffer_size(buffer_size),
303  path (path)
304  {
305  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
306 
307  addSubscription(JSubscriptionAll(RC_ALERT));
308 
309  totalCPURAM = getRAM();
310  current_slice_index = -1;
311  reporting = false;
312  }
313 
314 
315  virtual void actionEnter() override
316  {}
317 
318 
319  virtual void actionExit() override
320  {
321  if (c_buffer.is_open()) {
322 
323  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
324 
325  c_buffer.close();
326  }
327 
328  datawriter.reset();
329  }
330 
331 
332  virtual void actionInit(int length, const char* buffer) override
333  {
334  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
335 
336  try {
337 
338  JDebugStream(logger) << "Start server.";
339 
340  if (serversocket.is_valid()) {
341  serversocket->shutdown();
342  }
343 
344  serversocket.reset(new JServerSocket(port,backlog));
345  }
346  catch(const std::exception& error) {
347  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
348  ev_error();
349  }
350  }
351 
352 
353  virtual void actionConfigure(int length, const char* buffer) override
354  {
355  using namespace std;
356 
357  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
358 
359  long long int update_s = 10;
360  long long int logger_s = 5;
361 
362  parameters .reset();
363  dataFilters.clear();
364  dataQueues .clear();
365 
366  reporting = false;
367 
368  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
369 
370  properties["dataWriter"] = hostname;
371  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
372  properties["detector"] = detector;
373  properties["triggerParameters"] = parameters;
374  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
375  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
376  properties["frameIndex"] = maximal_frame_index = 100000;
377  properties["logger_s"] = logger_s;
378  properties["update_s"] = update_s;
379  properties["JDataFilter"] = dataFilters;
380  properties["DataQueue"] = dataQueues;
381  properties["path"] = path;
382  properties["c_sizeL0"] = c_buffer.sizeL0;
383  properties["c_sizeL1"] = c_buffer.sizeL1;
384  properties["c_sizeL2"] = c_buffer.sizeL2;
385  properties["c_sizeSN"] = c_buffer.sizeSN;
386 
387  try {
388  properties.read(string(buffer, length));
389  }
390  catch(const exception& error) {
391  JErrorStream(logger) << error.what();
392  }
393 
394  if (update_s <= 0) { update_s = 1; }
395  if (logger_s <= 0) { logger_s = 1; }
396 
397  setClockInterval(update_s * 1000000LL);
398 
399  hostname = trim(hostname);
400 
401  if (hostname != "")
402  datawriter.reset(new JControlHost_t(hostname));
403  else
404  throw JException("Undefined data writer host name.");
405 
406  maximum_frames_per_slice = frames_per_slice;
407 
408  // process processlist
409 
410  if (dataFilters.empty()) {
411  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
412  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
413  }
414 
415  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
416 
417  unsigned int numberOfDataFiltersOnThisMachine = 0;
418  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
419 
421 
422  {
423  JNoticeStream notice(logger);
424 
425  notice << "My IP addresses:";
426 
427  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
428  notice << ' ' << *i;
429  }
430  }
431 
432  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
433 
434  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
435 
436  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
437 
438  numberOfDataFiltersOnThisMachine++;
439 
440  if (i->port == this->port) {
441  thisProcess = i;
442  }
443  }
444  }
445 
446  if (numberOfDataFiltersOnThisMachine == 0) {
447  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
448  << "Assuming one datafilter on this machine.";
449  numberOfDataFiltersOnThisMachine = 1;
450  }
451 
452  if (thisProcess == dataFilters.end()) {
453  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
454  }
455 
456  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
457  JErrorStream(logger) << "Mismatch between given process names: "
458  << "I am called " << getName()
459  << ", but in the process list I am referred to as " << thisProcess->index;
460  }
461 
462  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
463  reporting = true;
464  }
465 
466  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
467 
468  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
469 
470  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
471  << "Queue size reduced to "
472  << maxQueueSize << " bytes." ;
473  }
474 
475  // trigger parameters
476 
478 
479  triggerNB .reset(new JTriggerNB (parameters));
480  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
481  trigger3DShower.reset(new JTrigger3DShower(parameters));
482  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
483 
484  moduleRouter.reset(new JModuleRouter(detector));
485 
486  if (reporting) {
487  JNoticeStream(logger) << "This data filter process will report.";
488  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
489  JDebugStream (logger) << "Trigger parameters: " << parameters;
490  JDebugStream (logger) << "Detector description: " << endl << detector;
491  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
492  }
493 
494  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
495 
496  // set L1, L2 and SN builders
497 
498  buildL1.reset(new JBuildL1_t(parameters));
499  buildL2.reset(new JBuildL2_t(parameters.L2));
500  buildSN.reset(new JBuildL2_t(parameters.SN));
501  buildNB.reset(new JBuildL2_t(parameters.NB));
502 
503  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
504  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
505  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
506  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
507 
508  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
509  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
510  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
511  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
512 
513  if (c_buffer.is_enabled()) {
514 
515  if (!c_buffer.is_open()) {
516 
517  c_buffer.open(path, getUniqueTag());
518 
519  if (c_buffer.is_open()) {
520 
521  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
522 
523  putObject(c_buffer.getFile(), meta);
524 
525  } else {
526 
527  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
528 
529  c_buffer.disable();
530  }
531 
532  } else {
533 
534  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
535  }
536 
537  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
538  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
539  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
540  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
541 
542  } else {
543 
544  if (c_buffer.is_open()) {
545  c_buffer.close();
546  }
547  }
548  }
549 
550 
551  virtual void actionStart(int length, const char* buffer) override
552  {
553  using namespace std;
554 
555  if (reporting) {
556  JNoticeStream(logger) << "Start run " << getRunNumber();
557  }
558 
559  timeslices.clear();
560 
561  current_slice_index = -1;
562  queueSize = 0;
563 
564  numberOfEvents = 0;
565  numberOfBytes = 0;
566  numberOfTimeslicesProcessed = 0;
567  numberOfIncompleteTimeslicesProcessed = 0;
568 
569  number_of_packets_received = 0;
570  number_of_packets_discarded = 0;
571  number_of_bytes_received = 0;
572  number_of_reads = 0;
573 
574  minFrameNumber = numeric_limits<int>::max();
575  maxFrameNumber = numeric_limits<int>::min();
576 
577  // Reset global trigger counter.
578 
580 
581  logErrorRun .reset();
582  logErrorDetector .reset();
583  logErrorIndex .reset();
584  logErrorIncomplete.reset();
585 
586  timer.reset();
587  timer.start();
588 
589  Qt.reset();
590 
591  // send trigger parameters to the datawriter
592 
593  ostringstream os;
594 
595  os << getRunNumber() << ' ' << parameters;
596 
597  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
598  }
599 
600 
601  virtual void actionPause(int length, const char* buffer) override
602  {
603  using namespace std;
604 
605  if (!timeslices.empty()) {
606 
607  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
608 
609  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
610  queueSize -= getSizeof(*i);
611  }
612 
613  timeslices.clear();
614  }
615 
616  { // force clearance of memory
617 
619 
620  timeslices.swap(buffer);
621  }
622 
623  if (queueSize != 0) {
624  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
625  }
626 
627  current_slice_index = -1;
628  queueSize = 0;
629 
630  timer.stop();
631  }
632 
633 
634  virtual void actionContinue(int length, const char* buffer) override
635  {
636  timer.start();
637  }
638 
639 
640  virtual void actionStop(int length, const char* buffer) override
641  {
642  typeout();
643 
644  datawriter.reset();
645  }
646 
647 
648  virtual void actionReset(int length, const char* buffer) override
649  {
650  if (serversocket.is_valid()) {
651  serversocket->shutdown();
652  }
653 
654  serversocket.reset();
655  }
656 
657 
658  virtual void actionQuit(int length, const char* buffer) override
659  {
660  datawriter.reset();
661  }
662 
663 
664  virtual void setSelect(JFileDescriptorMask& mask) const override
665  {
666  if (serversocket.is_valid()) {
667  mask.set(*serversocket);
668  }
669 
670  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
671  if (!channel->isReady()) {
672  mask.set(channel->getFileDescriptor());
673  }
674  }
675  }
676 
677 
678  virtual void actionSelect(const JFileDescriptorMask& mask) override
679  {
680  using namespace std;
681 
682  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
683 
684  try {
685 
686  if (mask.has(channel->getFileDescriptor())) {
687  channel->read();
688  }
689 
690  if (channel->isReady()) {
691 
692  number_of_packets_received += 1;
693  number_of_reads += channel->getCounter();
694  number_of_bytes_received += channel->size();
695 
696  if (isRunning()) {
697 
698  updateFrameQueue(channel);
699 
700  } else {
701 
702  JErrorStream(logErrorRun) << "Receiving data while not running.";
703 
704  number_of_packets_discarded += 1;
705  }
706 
707  channel->reset();
708  }
709 
710  ++channel;
711  }
712  catch(const exception& error) {
713 
714  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
715 
716  channel->shutdown();
717 
718  channel = channelList.erase(channel);
719  }
720  }
721 
722 
723  if (serversocket.is_valid()) {
724 
725  if (mask.has(*serversocket)) {
726 
727  JSocket socket;
728 
729  socket.accept(serversocket->getFileDescriptor());
730 
731  //socket.setSendBufferSize (buffer_size);
733 
734  socket.setKeepAlive (true);
735  socket.setNonBlocking(true);
736 
737  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
738 
739  channelList.push_back(JSocketInputChannel_t(socket));
740  }
741  }
742 
743 
744  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
745  timeslices.size() >= maxQueueDepth ||
746  queueSize >= maxQueueSize)) {
747 
748  const JDAQTimesliceL0& pending_slice = timeslices.front();
749  queueSize -= getSizeof(pending_slice);
750 
751  current_slice_index = pending_slice.getFrameIndex();
752  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
753  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
754 
755  if (pending_slice.size() > frames_per_slice) {
756 
757  JErrorStream(logger) << "More frames in timeslice than expected "
758  << pending_slice.size() << " > " << frames_per_slice;
759 
760  if (pending_slice.size() <= maximum_frames_per_slice) {
761 
762  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
763 
764  frames_per_slice = pending_slice.size();
765  }
766  }
767 
768  if (!pending_slice.empty()) {
769 
770  const localtime_t t0 = getLocalTime();
771 
772  processTimeSlice(pending_slice);
773 
774  const localtime_t t1 = getLocalTime();
775 
776  numberOfTimeslicesProcessed += 1;
777 
778  Qt.put(t1 - t0);
779 
780  if (pending_slice.size() < frames_per_slice) {
781 
782  numberOfIncompleteTimeslicesProcessed += 1;
783 
784  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
785  << "Frame index = " << pending_slice.getFrameIndex() << ';'
786  << "Size of timeslice = " << pending_slice.size() << ';'
787  << "Queue depth = " << timeslices.size() << ';'
788  << "Queue size = " << queueSize;
789 
790  if (!timeslices.empty()) {
791 
792  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
793 
794  frames_per_slice = pending_slice.size();
795  }
796  }
797  }
798 
799  timeslices.pop_front();
800  }
801  }
802 
803 
804  virtual void actionRunning() override
805  {
806  if (reporting) {
807  typeout();
808  }
809  }
810 
811 
812  /**
813  * Update queue with data frames.
814  *
815  * Note that any discarded data will be reported.
816  *
817  * \param channel incoming data channel
818  */
819  void updateFrameQueue(const JChannelList_t::const_iterator channel)
820  {
821  using namespace std;
822 
823  JByteArrayReader in(channel->data(), channel->size());
824 
825  JDAQPreamble preamble;
826  JDAQSuperFrameHeader header;
827 
828  in >> preamble;
829  in >> header;
830 
831  if (preamble.getLength() != channel->size()) {
832 
833  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
834  << "preamble.getLength() = " << preamble.getLength() << ';'
835  << "channel->size(): " << channel->size() << ';';
836 
837  number_of_packets_discarded += 1;
838 
839  return;
840  }
841 
842  if (header.getRunNumber() != getRunNumber()) {
843 
844  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
845  << " unequal to current run " << getRunNumber()
846  << " -> Dropping frame.";
847 
848  number_of_packets_discarded += 1;
849 
850  return;
851  }
852 
853  if (header.getFrameIndex() <= current_slice_index) {
854 
855  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
856  << " -> Dropping frame.";
857 
858  number_of_packets_discarded += 1;
859 
860  if (frames_per_slice < maximum_frames_per_slice) {
861 
862  frames_per_slice++;
863 
864  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
865  }
866 
867  return;
868  }
869 
870  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
871 
872  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
873  << " -> Dropping frame.";
874 
875  number_of_packets_discarded += 1;
876 
877  return;
878  }
879 
880  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
881 
882  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
883  ++timesliceIterator;
884  }
885 
886  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
887 
888  // The corresponding time slice already exists
889 
890  } else {
891 
892  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
893 
894  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
895 
896  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
897 
898  queueSize += getSizeof(*timesliceIterator);
899  }
900 
901  timesliceIterator->push_back(JDAQSuperFrame(header));
902 
903  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
904 
905  queueSize += getSizeof(*timesliceIterator->rbegin());
906  }
907 
908 
909  /**
910  * Process time slice.
911  *
912  * \param timeslice time slice
913  */
914  void processTimeSlice(const JDAQTimesliceL0& timeslice)
915  {
916  using namespace std;
917 
918  try {
919 
920  if (parameters.writeSummary()) {
921  this->put(JDAQSummaryslice(timeslice));
922  }
923 
924  if (parameters.trigger3DMuon.enabled ||
925  parameters.trigger3DShower.enabled ||
926  parameters.triggerMXShower.enabled ||
927  parameters.triggerNB.enabled ||
928  parameters.writeL0.prescale ||
929  parameters.writeL1.prescale ||
930  parameters.writeL2.prescale ||
931  parameters.writeSN.prescale ||
932  c_buffer.is_enabled()) {
933 
934  timesliceRouter->configure(timeslice);
935 
936  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
937  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
938  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
939  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
940  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
941  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
942 
943  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
944 
945  if (moduleRouter->hasModule(frame->getModuleID())) {
946 
947  if (!checksum(*frame)) {
948 
949  JErrorStream(logger) << "Invalid data at "
950  << "run = " << timeslice.getRunNumber() << ";"
951  << "frame index = " << timeslice.getFrameIndex() << ";"
952  << "module = " << frame->getModuleID() << ";"
953  << "discard and dump";
954 
955  timesliceTX.push_back(*frame);
956 
957  continue;
958  }
959 
960  const JModule& module = moduleRouter->getModule(frame->getModuleID());
961  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
962 
963  // Apply high-rate veto
964 
965  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
966 
967  // L0
968 
969  timesliceL0.push_back(JSuperFrame1D_t(buffer));
970 
971  // Nano-beacon trigger
972 
973  if (parameters.triggerNB.enabled) {
974 
975  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
976 
977  if (buffer.begin() != __end) {
978 
979  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
980  frame->getModuleIdentifier(),
981  module.getPosition()));
982 
983  JSuperFrame1D_t zbuf;
984 
985  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
986 
987  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
988  }
989  }
990 
991  // L1
992 
993  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
994  frame->getModuleIdentifier(),
995  module.getPosition()));
996 
997  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
998 
999  // L2
1000 
1001  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1002  frame->getModuleIdentifier(),
1003  module.getPosition()));
1004 
1005  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1006 
1007  // SN
1008 
1009  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1010  frame->getModuleIdentifier(),
1011  module.getPosition()));
1012 
1013  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1014 
1015  } else {
1016 
1017  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1018  }
1019  }
1020 
1021  if (!timesliceTX.empty()) {
1022  this->put(timesliceTX);
1023  }
1024 
1025  // Trigger
1026 
1027  if (parameters.triggerNB.enabled) {
1028 
1029  const JTriggerInput trigger_input(timesliceNB);
1030 
1031  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1032 
1033  if (parameters.triggerNB.write()) {
1034 
1035  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1036  getTriggerMask(triggerNB->getTriggerBit()),
1037  *hit,
1038  *timesliceRouter,
1039  *moduleRouter,
1040  parameters.TMaxLocal_ns,
1041  parameters.triggerNB.DMax_m,
1042  getTimeRange(parameters.triggerNB));
1043 
1044  this->put(tev);
1045  }
1046  }
1047  }
1048 
1049  JTriggerInput trigger_input(timesliceL2);
1050  JTriggerOutput trigger_output;
1051 
1052  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1053  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1054  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1055 
1056  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1057 
1058  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1059 
1060  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1061 
1062  if (this->put(object)) {
1063  numberOfEvents += 1;
1064  }
1065  }
1066 
1067  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1068 
1069  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1070 
1071  if (parameters.writeL1) { this->put(object); }
1072  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1073  }
1074 
1075  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1076 
1077  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1078 
1079  if (parameters.writeL2) { this->put(object); }
1080  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1081  }
1082 
1083  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1084 
1085  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1086 
1087  if (parameters.writeSN) { this->put(object); }
1088  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1089  }
1090 
1091  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1092 
1093  if (parameters.writeL0) { this->put(timeslice); }
1094  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1095  }
1096  }
1097 
1098  } catch(const exception& error) {
1099  JErrorStream(logger) << "Error = " << error.what() << ";"
1100  << "run = " << timeslice.getRunNumber() << ";"
1101  << "frame index = " << timeslice.getFrameIndex() << ";"
1102  << "time slice not correctly processed!";
1103  }
1104  }
1105 
1106 
1107  /**
1108  * Report status to message logger.
1109  */
1110  void typeout()
1111  {
1112  timer.stop();
1113 
1114  const double T_us = (double) timer.usec_wall;
1115 
1116  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1117  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1118  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1119  try {
1120  JNoticeStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1121  }
1122  catch(const std::exception&) {}
1123  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1124  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1125 
1126  if (number_of_packets_received > 0) {
1127  JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1128  }
1129 
1130  JNoticeStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1131  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1132 
1133  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1134 
1135  if (numberOfTimeslicesProcessed > 0) {
1136  JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1137  JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1138  JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1139  }
1140 
1141  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1142  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1143 
1144  if (processedSlicesTime_us > 0) {
1145  JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1146  }
1147  if (processedDetectorTime_us > 0) {
1148  JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1149  }
1150 
1151  timer.start();
1152  }
1153 
1154 
1155  /**
1156  * Tagged action to handle alerts.
1157  *
1158  * \param tag tag
1159  * \param length number of characters
1160  * \param buffer message
1161  */
1162  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1163  {
1164  using namespace std;
1165 
1166  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1167 
1168  if (tag == RC_ALERT) {
1169 
1170  if (c_buffer.is_open()) {
1171 
1172  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1173 
1174  c_buffer.close();
1175  }
1176 
1177  if (c_buffer.is_enabled()) {
1178 
1179  c_buffer.open(path, getUniqueTag());
1180 
1181  if (c_buffer.is_open()) {
1182 
1183  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1184 
1185  putObject(c_buffer.getFile(), meta);
1186 
1187  } else {
1188 
1189  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1190 
1191  c_buffer.disable();
1192  }
1193  }
1194 
1195  } else {
1196 
1197  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1198  }
1199  }
1200 
1201  JMeta meta; //!< meta data
1202 
1203  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1204 
1205  private:
1206 
1208  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1209  std::string hostname; //!< host name of data server
1210 
1211  /**
1212  * Auxiliary method to send object to data server.
1213  *
1214  * \param object object to be sent
1215  * \return true if sent; else false
1216  */
1217  template<class T>
1218  bool put(const T& object)
1219  {
1220  try {
1221 
1222  datawriter->put(object);
1223 
1224  numberOfBytes += getSizeof(object);
1225 
1226  return true;
1227  }
1228  catch(const std::exception& error) {
1229  JErrorStream(logger) << error.what();
1230  }
1231 
1232  return false;
1233  }
1234 
1235 
1236  int port; //!< server socket port
1237  int backlog;
1239 
1240  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1241  JChannelList_t channelList; //!< connections to data queue
1242 
1244  JQuantile Qt;
1245 
1246  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1248  unsigned int frames_per_slice;
1251 
1252  // trigger
1253 
1256 
1263 
1268 
1273 
1274  // process management
1275 
1278 
1279  // memory management
1280 
1281  long long int totalCPURAM;
1282  unsigned int maxQueueDepth;
1283  long long int maxQueueSize;
1284  long long int queueSize;
1285 
1286  // statistics
1287 
1289 
1290  long long int numberOfEvents;
1291  long long int numberOfBytes;
1294 
1297 
1298  // temporary
1299 
1302  long long int number_of_reads;
1304 
1305  // circular buffer
1306 
1308  std::string path; //!< directory for writing circular buffer
1309  };
1310 }
1311 
1312 /**
1313  * \file
1314  *
1315  * Application for real-time filtering of data.
1316  * For more information, see KM3NETDAQ::JDataFilter.
1317  *
1318  * \author rbruijn and mdejong
1319  */
1320 int main(int argc, char* argv[])
1321 {
1322  using namespace std;
1323  using namespace JPP;
1324  using namespace KM3NETDAQ;
1325 
1326  string server;
1327  string logger;
1328  string hostname;
1329  string client_name;
1330  int port;
1331  int backlog;
1332  int buffer_size;
1333  bool use_cout;
1334  string path;
1335  int debug;
1336 
1337 
1338  try {
1339 
1340  JParser<> zap("Application for real-time filtering of data.");
1341 
1342  zap['H'] = make_field(server) = "localhost";
1343  zap['M'] = make_field(logger) = "localhost";
1344  zap['D'] = make_field(hostname) = "";
1345  zap['u'] = make_field(client_name) = "JDataFilter";
1346  zap['P'] = make_field(port);
1347  zap['q'] = make_field(backlog) = 1024;
1348  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1349  zap['c'] = make_field(use_cout);
1350  zap['p'] = make_field(path) = "/tmp/";
1351  zap['d'] = make_field(debug) = 0;
1352 
1353  zap(argc, argv);
1354  }
1355  catch(const exception& error) {
1356  FATAL(error.what() << endl);
1357  }
1358 
1359 
1360  JLogger* out = NULL;
1361 
1362  if (use_cout)
1363  out = new JStreamLogger(cout);
1364  else
1365  out = new JControlHostLogger(logger);
1366 
1367  JDataFilter dfilter(client_name,
1368  server,
1369  hostname,
1370  out,
1371  debug,
1372  port,
1373  backlog,
1374  buffer_size,
1375  path);
1376 
1377  dfilter.meta = JMeta(argc, argv);
1378 
1379  dfilter.enter();
1380  dfilter.run();
1381 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:804
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1500
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:914
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:182
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
then JLigiers sh continue fi cat driver txt<< EOFprocess dfilter $HOST1 ssh\$HOST\$"JDataFilter -H \$SERVER\$ -M \$LOGGER\$ -d $DEBUG &";enterevent ev_init{RC_CMD%< ev_init.txt > from me< ev_init.txt > event ev_configure
Data structure for a composite optical module.
Definition: JModule.hh:57
JSinglePointer< JTriggerNB > triggerNB
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:270
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:158
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:155
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:150
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:257
Detector data structure.
Definition: JDetector.hh:80
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:289
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:42
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:145
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:128
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:306
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:167
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:157
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:171
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:226
Basic data structure for L0 hit.
std::string index
index in process list
do cat driver txt<< EOFevent ev_configure{RC_EVT%< ev_configure.txt > RC_DWRT path
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:154
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
const_iterator end() const
get iterator to end of data (without end marker)
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:156
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:196
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:634
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:151
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:241
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1961
Byte array binary input.
Definition: JByteArrayIO.hh:25
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:819
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:315
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:63
long long int number_of_packets_received
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:658
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:259
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:551
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
then echo n User name
Definition: JCookie.sh:45
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:648
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:319
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:678
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:256
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:54
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:640
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:332
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY source JAcoustics sh $DETECTOR_ID CHECK_EXIT_CODE typeset A TRIPODS get_tripods $WORKDIR tripod txt TRIPODS for EMITTER in
Definition: JCanberra.sh:38
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:601
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:258
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:664
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:353
Time slice with calibrated data.
Definition: JTimeslice.hh:26
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.