Jpp  18.5.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "JLang/JVectorize.hh"
14 #include "Jeep/JParser.hh"
15 #include "Jeep/JProperties.hh"
16 #include "Jeep/JTimer.hh"
17 #include "Jeep/JTimekeeper.hh"
18 #include "Jeep/JMessage.hh"
19 #include "Jeep/JPrint.hh"
20 #include "Jeep/JeepToolkit.hh"
21 #include "Jeep/JStatus.hh"
28 #include "JDAQ/JDAQTags.hh"
29 #include "JDAQ/JDAQEventIO.hh"
30 #include "JDAQ/JDAQTimesliceIO.hh"
32 #include "JDetector/JDetector.hh"
33 #include "JTrigger/JHit.hh"
34 #include "JTrigger/JHitToolkit.hh"
37 #include "JTrigger/JTimeslice.hh"
38 #include "JTrigger/JHitL0.hh"
39 #include "JTrigger/JHitL1.hh"
40 #include "JTrigger/JBuildL1.hh"
41 #include "JTrigger/JBuildL2.hh"
45 #include "JTrigger/JTriggerNB.hh"
46 #include "JTrigger/JTriggerBits.hh"
50 #include "JTrigger/JTimesliceL1.hh"
53 #include "JTrigger/JChecksum.hh"
56 #include "JNet/JControlHost.hh"
58 #include "JNet/JTCPSocket.hh"
59 #include "JNet/JSocketChannel.hh"
60 #include "JNet/JServerSocket.hh"
61 #include "JPhysics/JConstants.hh"
62 #include "JTools/JQuantile.hh"
63 #include "JSupport/JSupport.hh"
65 #include "JSupport/JMeta.hh"
66 #include "JSystem/JStat.hh"
67 #include "JSystem/JTime.hh"
69 #include "JSystem/JNetwork.hh"
70 #include "JSystem/JFilesystem.hh"
71 
72 
73 namespace JNET {
74 
75  /**
76  * Get size of packeet.
77  *
78  * \param preamble DAQ data preamble
79  * \return size [B]
80  */
81  template<>
83  {
84  return preamble.getLength();
85  }
86 }
87 
88 namespace KM3NETDAQ {
89 
90  using namespace JPP;
91 
92 
93  /**
94  * Get maximum of values.
95  *
96  * \param buffer input values
97  * \return maximum value
98  */
99  inline size_t getMaximum(const array_type<size_t>& buffer)
100  {
101  size_t max = 0;
102 
103  for (array_type<size_t>::const_iterator i = buffer.begin(); i != buffer.end(); ++i) {
104  if (*i > max) {
105  max = *i;
106  }
107  }
108 
109  return max;
110  }
111 
112 
113  /**
114  * Get expected number of frames according a given allowed fraction of active modules.
115  *
116  * \param number_of_frames number of frames
117  * \param factor factor
118  * \return number of frames
119  */
120  inline size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
121  {
122  if (factor <= 0.0) {
123  return 1;
124  }
125 
126  if (factor >= 1.0) {
127  return number_of_frames;
128  }
129 
130  const size_t n = (size_t) (number_of_frames * factor);
131 
132  return (n == 0 ? 1 : n);
133  }
134 
135 
136  /**
137  * Main class for real-time filtering of data.
138  *
139  * This class implements the action methods for each transition of the state machine.\n
140  * When the state machine is entered, data are continually collected using
141  * custom implementation of virtual methods
142  * setSelect and
143  * actionSelect.
144  *
145  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
146  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
147  * When a JDAQTimeslice is complete,
148  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.
149  *
150  * The completeness of a time slice is defined by two parameters, namely
151  * <tt>frames_per_slice</tt> and
152  * <tt>factor_per_slice</tt>.\n
153  * These are parsed in method actionConfigure as follows.
154  * <pre>
155  * numberOfFramesPerSlice = <frames per slice> = 1;
156  * factorOfFramesPerSlice = <factor per slice> = 1.0;
157  * </pre>
158  * The expected number of frames in a time slices is defined by their product.\n
159  * Note that the first parameter can change during operation (see below).
160  *
161  * A timeout may occur when the total amount of data or the number of incomplete time slices
162  * in the queue exceeds one of the limits <tt>queueSize</tt> or <tt>queueDepth</tt>.\n
163  * These are parsed in method actionConfigure as follows.
164  * <pre>
165  * queueSize = <maximal queue size [B]>;
166  * queueDepth = <maximal queue depth>;
167  * </pre>
168  * Note that these values apply per JDataFilter.\n
169  *
170  * The parameter <tt>frames_per_slice</tt> is subject to a servo mechanism.\n
171  * When a timeout occurs,
172  * it is set to the number of frames in the oldest time slice.\n
173  * When data are received with a frame index below the current frame index,
174  * it is incremented by one.
175  *
176  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
177  * The following parameters can be used to configure the circular buffer.\n
178  * <pre>
179  * path = <write directory for temporary circular buffer>;
180  * archive = <write directory for archival of circular buffer>;
181  * c_sizeL0 = <L0 buffer size>;
182  * c_sizeL1 = <L1 buffer size>;
183  * c_sizeL2 = <L2 buffer size>;
184  * c_sizeSN = <SN buffer size>;
185  * </pre>
186  *
187  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
188  * - There will always be a file created which is deleted when the state machine is exited;
189  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
190  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
191  * - After archival of the temporary file, a new temporary file will be opened;
192  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
193  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
194  *
195  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
196  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
197  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
198  * (e.g. when there is more than one alert on a given day).
199  *
200  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
201  *
202  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
203  */
204  class JDataFilter :
205  public JDAQClient
206  {
207  public:
208 
211 
212  typedef double hit_type;
218 
219 
220 
221  /**
222  * Circular buffer.
223  */
225  public JTreeRecorder<JDAQTimesliceTypes_t>
226  {
229 
230 
231  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
232 
233 
234  /**
235  * Constructor.
236  *
237  * \param path directory for temporary storage
238  * \param archive directory for permanent archival
239  * \param tag tag
240  */
242  const std::string& archive,
243  const JTag& tag) :
244  path (path),
245  archive(archive),
246  tag (tag)
247  {
248  disable();
249  }
250 
251 
252  /**
253  * Open file.
254  *
255  * If file with same name exists, remove it beforehand.
256  */
257  void open()
258  {
259  using namespace std;
260  using namespace JPP;
261 
262  gErrorIgnoreLevel = kFatal;
263 
264  std::ostringstream os;
265 
266  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
267 
268  if (getFileStatus(os.str().c_str())) {
269  std::remove(os.str().c_str());
270  }
271 
272  this->open(os.str().c_str());
273  }
274 
275 
276  /**
277  * Close file.
278  *
279  * If option is true, archive file; else delete file.
280  *
281  * \param option option
282  */
283  void close(const bool option)
284  {
285  using namespace std;
286  using namespace JPP;
287 
288  const JDateAndTime cal;
289 
290  if (this->is_open()) {
291 
292  const string file_name = this->getFile()->GetName();
293 
294  this->close();
295 
296  if (option) {
297 
298  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
299 
300  ostringstream os;
301 
302  os << getFullPath(this->archive)
303  << "KM3NeT"
304  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
305  << "_" << this->tag;
306 
307  if (i != 0) {
308  os << "_" << i;
309  }
310 
311  os << ".root";
312 
313  if (!getFileStatus(os.str().c_str())) {
314 
315  if (JSYSTEM::rename(file_name, os.str()) == 0)
316  return;
317  else
318  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
319  }
320  }
321 
322  } else {
323 
324  std::remove(file_name.c_str());
325  }
326  }
327  }
328 
329 
330  /**
331  * Disable writing.
332  */
333  void disable()
334  {
335  sizeL0 = 0;
336  sizeL1 = 0;
337  sizeL2 = 0;
338  sizeSN = 0;
339  }
340 
341 
342  /**
343  * Check whether writing of data is enabled.
344  *
345  * \return true if writing enabled; else false
346  */
347  bool is_enabled() const
348  {
349  return (sizeL0 > 0 ||
350  sizeL1 > 0 ||
351  sizeL2 > 0 ||
352  sizeSN > 0);
353  }
354 
355 
356  /**
357  * Write circular buffer to output stream.
358  *
359  * \param out output stream
360  * \param object circular buffer
361  * \return output stream
362  */
363  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
364  {
365  if (object.is_open())
366  out << object.getFile()->GetName();
367  else
368  out << "void";
369 
370  out << ' ';
371 
372  out << object.sizeL0 << '/'
373  << object.sizeL1 << '/'
374  << object.sizeL2 << '/'
375  << object.sizeSN << '/';
376 
377  return out;
378  }
379 
380  Long64_t sizeL0; //!< Number of L0 time slices
381  Long64_t sizeL1; //!< Number of L1 time slices
382  Long64_t sizeL2; //!< Number of L2 time slices
383  Long64_t sizeSN; //!< Number of SN time slices
384 
385  std::string path; //!< Directory for temporary storage
386  std::string archive; //!< Directory for permanent archival
387  JTag tag; //!< Unique tag of this process
388  };
389 
390 
391  /**
392  * Sort DAQ process by index.
393  *
394  * \param first first DAQ process
395  * \param second second DAQ process
396  * \return true if index of first DAQ process less than that of second; else false
397  */
398  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
399  {
400  return first.index < second.index;
401  }
402 
403 
404  /**
405  * Constructor.
406  *
407  * \param name name of client
408  * \param server name of command message server
409  * \param hostname name of data server
410  * \param logger pointer to logger
411  * \param level debug level
412  * \param port server port
413  * \param backlog server backlog
414  * \param path directory for temporary storage
415  * \param archive directory for parmanent archival
416  */
418  const std::string& server,
419  const std::string& hostname,
420  JLogger* logger,
421  const int level,
422  const int port,
423  const int backlog,
424  const std::string& path,
425  const std::string& archive) :
426  JDAQClient (name,server,logger,level),
427  hostname (hostname),
428  port (port),
429  backlog (backlog),
430  c_buffer (path, archive, getUniqueTag())
431  {
432  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
433 
434  addSubscription(JSubscriptionAll(RC_ALERT));
435 
436  totalCPURAM = getRAM();
437  current_slice_index = -1;
438  dataqueue_slice_index.clear();
439  reporting = false;
440 
441  this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
442  }
443 
444 
445  virtual void actionEnter() override
446  {}
447 
448 
449  virtual void actionExit() override
450  {
451  if (c_buffer.is_open()) {
452 
453  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
454 
455  c_buffer.close(false);
456  }
457 
458  datawriter.reset();
459  }
460 
461 
462  virtual void actionInit(int length, const char* buffer) override
463  {
464  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
465 
466  try {
467 
468  JDebugStream(logger) << "Start server.";
469 
470  if (serversocket.is_valid()) {
471  serversocket->shutdown();
472  }
473 
474  serversocket.reset(new JServerSocket(port,backlog));
475  }
476  catch(const std::exception& error) {
477  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
478  ev_error();
479  }
480  }
481 
482 
483  virtual void actionConfigure(int length, const char* buffer) override
484  {
485  using namespace std;
486 
487  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
488 
489  string _hostname_ = "";
490 
491  long long int update_s = 20;
492  long long int logger_s = 10;
493 
494  parameters .reset();
495  dataFilters.clear();
496  dataQueues .clear();
497 
498  reporting = false;
499  dumpCount = 0;
500  dumpLimit = numeric_limits<int>::max();
501  dumpMask = 0;
502  dumpMask.set(JChecksum::EPMT_t);
503  dumpMask.set(JChecksum::ETDC_t);
504  dumpMask.set(JChecksum::TIME_t);
505  //dumpMask.set(JChecksum::EUDP_t);
506  dumpMask.set(JChecksum::SIZE_t);
507 
508  detector.comment.clear();
509  detector.clear();
510 
511  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
512 
513  properties["dataWriter"] = _hostname_;
514  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
515  properties["factorOfFramesPerSlice"] = factor_per_slice = 1.0;
516  properties["detector"] = detector;
517  properties["triggerParameters"] = parameters;
518  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
519  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
520  properties["frameIndex"] = maximal_frame_index = 100000;
521  properties["logger_s"] = logger_s;
522  properties["update_s"] = update_s;
523  properties["JDataFilter"] = dataFilters;
524  properties["DataQueue"] = dataQueues;
525  properties["path"] = c_buffer.path;
526  properties["archive"] = c_buffer.archive;
527  properties["c_sizeL0"] = c_buffer.sizeL0;
528  properties["c_sizeL1"] = c_buffer.sizeL1;
529  properties["c_sizeL2"] = c_buffer.sizeL2;
530  properties["c_sizeSN"] = c_buffer.sizeSN;
531  properties["dumpLimit"] = dumpLimit;
532  properties["dumpMask"] = dumpMask;
533 
534  try {
535  properties.read(string(buffer, length));
536  }
537  catch(const std::exception& error) {
538  JErrorStream(logger) << error.what();
539  }
540 
541  if (update_s <= 0) { update_s = 20; }
542  if (logger_s <= 0) { logger_s = 10; }
543 
544  setClockInterval(update_s * 1000000LL);
545 
546  _hostname_ = trim(_hostname_);
547 
548  if (_hostname_ != "" && _hostname_ != hostname) {
549 
550  datawriter.reset();
551 
552  hostname = _hostname_;
553  }
554 
555  bool status = datawriter.is_valid();
556 
557  if (status) {
558 
559  try {
560  status = datawriter->Connected() == 0;
561  }
562  catch (const exception&) {
563  status = false;
564  }
565  }
566 
567  if (!status) {
568 
569  datawriter.reset(new JControlHost_t(hostname));
570 
571  datawriter->MyId(getFullName());
572  }
573 
574  datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
575 
576  // process processlist
577 
578  if (dataFilters.empty()) {
579  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
580  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
581  }
582 
583  sort(dataFilters.begin(), dataFilters.end(), compare);
584 
585  unsigned int numberOfDataFiltersOnThisMachine = 0;
586  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
587 
589 
590  {
591  JNoticeStream notice(logger);
592 
593  notice << "My IP addresses:";
594 
595  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
596  notice << ' ' << *i;
597  }
598  }
599 
600  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
601 
602  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
603 
604  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
605 
606  numberOfDataFiltersOnThisMachine++;
607 
608  if (i->port == this->port) {
609  thisProcess = i;
610  }
611  }
612  }
613 
614  if (numberOfDataFiltersOnThisMachine == 0) {
615  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
616  << "Assuming one datafilter on this machine.";
617  numberOfDataFiltersOnThisMachine = 1;
618  }
619 
620  if (thisProcess == dataFilters.end()) {
621  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
622  }
623 
624  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
625  JErrorStream(logger) << "Mismatch between given process names: "
626  << "I am called " << getName()
627  << ", but in the process list I am referred to as " << thisProcess->index;
628  }
629 
630  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
631  reporting = true;
632  }
633 
634  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
635 
636  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
637 
638  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
639  << "Queue size reduced to "
640  << maxQueueSize << " bytes." ;
641  }
642 
643  // detector
644 
645  if (parameters.disableHighRateVeto) {
646 
647  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
648 
649  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
650  }
651 
652  // trigger parameters
653 
655 
656  triggerNB .reset(new JTriggerNB (parameters));
657  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
658  trigger3DShower.reset(new JTrigger3DShower(parameters));
659  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
660 
661  moduleRouter.reset(new JModuleRouter(detector));
662 
663  if (reporting) {
664  JNoticeStream(logger) << "This data filter process will report.";
665  JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
666  JDebugStream (logger) << "Trigger parameters: " << parameters;
667  JDebugStream (logger) << "Detector description: " << endl << detector;
668  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
669  }
670 
671  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
672 
673  // set L1, L2 and SN builders
674 
675  buildL1.reset(new JBuildL1_t(parameters));
676  buildL2.reset(new JBuildL2_t(parameters.L2));
677  buildSN.reset(new JBuildL2_t(parameters.SN));
678  buildNB.reset(new JBuildL2_t(parameters.NB));
679 
680  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
681  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
682  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
683  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
684 
685  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
686  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
687  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
688  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
689  logErrorOvercomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
690 
691  if (c_buffer.is_enabled()) {
692 
693  if (!c_buffer.is_open()) {
694 
695  c_buffer.open();
696 
697  if (c_buffer.is_open()) {
698 
699  putObject(c_buffer.getFile(), meta);
700 
701  JStatusStream(logger) << "Created circular buffer " << c_buffer;
702 
703  } else {
704 
705  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
706  }
707 
708  } else {
709 
710  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
711  }
712  }
713 
714  if (c_buffer.is_open()) {
715  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
716  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
717  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
718  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
719  } else {
720  c_buffer.disable();
721  }
722  }
723 
724 
725  virtual void actionStart(int length, const char* buffer) override
726  {
727  using namespace std;
728 
729  if (reporting) {
730  JNoticeStream(logger) << "Start run " << getRunNumber();
731  }
732 
733  timeslices.clear();
734 
735  current_slice_index = -1;
736  dataqueue_slice_index.clear();
737  queueSize = 0;
738 
739  numberOfEvents = 0;
740  numberOfBytes = 0;
741  numberOfTimeslicesProcessed = 0;
742  numberOfIncompleteTimeslicesProcessed = 0;
743 
744  number_of_packets_received = 0;
745  number_of_packets_discarded = 0;
746  number_of_bytes_received = 0;
747  number_of_reads = 0;
748 
749  minFrameNumber = numeric_limits<int>::max();
750  maxFrameNumber = numeric_limits<int>::min();
751 
752  // Reset global trigger counter.
753 
755 
756  logErrorRun .reset();
757  logErrorDetector .reset();
758  logErrorIndex .reset();
759  logErrorIncomplete .reset();
760  logErrorOvercomplete.reset();
761 
762  timer.reset();
763  timer.start();
764 
765  Qt.reset();
766  Qx.reset();
767 
768  // send trigger parameters to the datawriter
769 
770  ostringstream os;
771 
772  os << getRunNumber() << ' ' << parameters;
773 
774  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
775  }
776 
777 
778  virtual void actionPause(int length, const char* buffer) override
779  {
780  using namespace std;
781 
782  if (!timeslices.empty()) {
783 
784  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
785 
786  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
787  queueSize -= getSizeof(*i);
788  }
789 
790  timeslices.clear();
791  }
792 
793  { // force clearance of memory
794 
795  deque<JDAQTimesliceL0> buffer;
796 
797  timeslices.swap(buffer);
798  }
799 
800  if (queueSize != 0) {
801  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
802  }
803 
804  current_slice_index = -1;
805  dataqueue_slice_index.clear();
806  queueSize = 0;
807 
808  timer.stop();
809  }
810 
811 
812  virtual void actionContinue(int length, const char* buffer) override
813  {
814  timer.start();
815  }
816 
817 
818  virtual void actionStop(int length, const char* buffer) override
819  {
820  typeout();
821  }
822 
823 
824  virtual void actionReset(int length, const char* buffer) override
825  {
826  if (serversocket.is_valid()) {
827  serversocket->shutdown();
828  }
829 
830  serversocket.reset();
831  }
832 
833 
834  virtual void actionQuit(int length, const char* buffer) override
835  {
836  datawriter.reset();
837  }
838 
839 
840  virtual void setSelect(JFileDescriptorMask& mask) const override
841  {
842  if (serversocket.is_valid()) {
843  mask.set(*serversocket);
844  }
845 
846  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
847  if (!channel->isReady()) {
848  mask.set(channel->getFileDescriptor());
849  }
850  }
851  }
852 
853 
854  virtual void actionSelect(const JFileDescriptorMask& mask) override
855  {
856  using namespace std;
857 
858  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
859 
860  try {
861 
862  if (mask.has(channel->getFileDescriptor())) {
863  channel->read();
864  }
865 
866  if (channel->isReady()) {
867 
868  number_of_packets_received += 1;
869  number_of_reads += channel->getCounter();
870  number_of_bytes_received += channel->size();
871 
872  if (isRunning()) {
873 
874  try {
875  updateFrameQueue(*channel);
876  }
877  catch(const std::exception& error) {
878 
879  JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
880 
881  number_of_packets_discarded += 1;
882  }
883 
884  } else {
885 
886  JErrorStream(logErrorRun) << "Receiving data while not running.";
887 
888  number_of_packets_discarded += 1;
889  }
890 
891  channel->reset();
892  }
893 
894  ++channel;
895  }
896  catch(const std::exception& error) {
897 
898  if (isRunning()) {
899  JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
900  }
901 
902  channel->shutdown();
903 
904  channel = channelList.erase(channel);
905  }
906  }
907 
908 
909  if (serversocket.is_valid()) {
910 
911  if (mask.has(*serversocket)) {
912 
913  JTCPSocket socket(serversocket->getFileDescriptor());
914 
916 
917  socket.setKeepAlive (true);
918  socket.setNonBlocking(false);
919 
920  JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
921 
922  channelList.push_back(JSocketInputChannel_t(socket));
923  }
924  }
925 
926 
927  if (!timeslices.empty()) {
928 
929  const size_t number_of_frames = getNumberOfFrames(frames_per_slice, factor_per_slice);
930  const size_t maximum_in_queue = getMaximum(make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size));
931 
932  if (((timeslices[0].size() >= number_of_frames && // normal
933  timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
934 
935  (maximum_in_queue >= number_of_frames && // intermittent problem
936  timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
937 
938  (timeslices.size() >= maxQueueDepth) || // timeout
939  (queueSize >= maxQueueSize))) {
940 
941 
942  const JDAQTimesliceL0& pending_slice = timeslices.front();
943  queueSize -= getSizeof(pending_slice);
944 
945  current_slice_index = pending_slice.getFrameIndex();
946  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
947  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
948 
949  for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
950  modules.insert(i->getModuleID());
951  }
952 
953 
954  if (isRunning()) {
955 
956  const localtime_t t0 = getLocalTime();
957 
958  if (!pending_slice.empty()) {
959  processTimeSlice(pending_slice);
960  }
961 
962  const localtime_t t1 = getLocalTime();
963 
964  numberOfTimeslicesProcessed += 1;
965 
966  Qt.put(t1 - t0);
967 
968  } else {
969 
970  JErrorStream(logErrorRun) << "Skip processing of data while not running.";
971  }
972 
973 
974  if (modules.size() > frames_per_slice) {
975 
976  JErrorStream(logErrorOvercomplete) << "More active modules than expected "
977  << modules.size() << " > " << frames_per_slice
978  << " adjusting frames per slice to " << modules.size();
979 
980  frames_per_slice = modules.size();
981  }
982 
983 
984  if (pending_slice.size() < number_of_frames) {
985 
986  numberOfIncompleteTimeslicesProcessed += 1;
987 
988  ostringstream error;
989 
990  error << "Timeout -> processed incomplete timeslice: "
991  << "Frame index = " << pending_slice.getFrameIndex() << ';'
992  << "Size of timeslice = " << pending_slice.size() << ';'
993  << "Queue depth = " << timeslices.size() << ';'
994  << "Queue size = " << queueSize << ';'
995  << "DataQueue min = " << dataqueue_slice_index.min() << ';'
996  << "DataQueue max = " << dataqueue_slice_index.max() << ';';
997 
998  if (maximum_in_queue >= number_of_frames) {
999 
1000  error << " intermittent problem -> continues as-is";
1001 
1002  } else {
1003 
1004  modules.clear(); // remove history
1005 
1006  for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
1007  modules.insert(i->getModuleID());
1008  }
1009 
1010  error << " adjusting frames per timeslice from " << frames_per_slice << " to " << modules.size();
1011 
1012  frames_per_slice = modules.size();
1013  }
1014 
1015  JErrorStream(logErrorIncomplete) << error.str();
1016  }
1017 
1018 
1019  timeslices.pop_front();
1020  }
1021  }
1022  }
1023 
1024 
1025  virtual void actionRunning() override
1026  {
1027  if (reporting) {
1028  typeout();
1029  }
1030  }
1031 
1032 
1033  /**
1034  * Update queue with data frames.
1035  *
1036  * Note that any discarded data will be reported.
1037  *
1038  * \param channel incoming data channel
1039  */
1041  {
1042  using namespace std;
1043 
1044  JByteArrayReader in(channel.data(), channel.size());
1045 
1046  JDAQPreamble preamble;
1047  JDAQSuperFrameHeader header;
1048 
1049  in >> preamble;
1050  in >> header;
1051 
1052  if (preamble.getLength() != channel.size()) {
1053 
1054  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
1055  << "preamble.getLength() = " << preamble.getLength() << ';'
1056  << "channel.size(): " << channel.size() << ';';
1057 
1058  number_of_packets_discarded += 1;
1059 
1060  return;
1061  }
1062 
1063  if (header.getRunNumber() != getRunNumber()) {
1064 
1065  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
1066  << " != " << getRunNumber()
1067  << " -> Dropping frame.";
1068 
1069  number_of_packets_discarded += 1;
1070 
1071  return;
1072  }
1073 
1074  if (header.getFrameIndex() <= current_slice_index) {
1075 
1076  number_of_packets_discarded += 1;
1077 
1078  if (modules.insert(header.getModuleID()).second) {
1079 
1080  frames_per_slice = modules.size();
1081 
1082  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
1083  << " module " << header.getModuleID()
1084  << " -> dropping frame;"
1085  << " increase number of frames expected to: " << frames_per_slice;
1086  }
1087 
1088  return;
1089  }
1090 
1091  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
1092 
1093  number_of_packets_discarded += 1;
1094 
1095  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1096  << " module " << header.getModuleID()
1097  << " -> Dropping frame.";
1098 
1099  return;
1100  }
1101 
1102  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1103 
1104  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1105  ++timesliceIterator;
1106  }
1107 
1108  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1109 
1110  // The corresponding time slice already exists
1111 
1112  } else {
1113 
1114  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1115 
1116  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1117 
1118  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1119 
1120  queueSize += getSizeof(*timesliceIterator);
1121  }
1122 
1123  timesliceIterator->push_back(JDAQSuperFrame(header));
1124 
1125  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1126 
1127  queueSize += getSizeof(*timesliceIterator->rbegin());
1128 
1129  dataqueue_slice_index[channel.getFileDescriptor()] = header.getFrameIndex();
1130  }
1131 
1132 
1133  /**
1134  * Process time slice.
1135  *
1136  * \param timeslice time slice
1137  */
1138  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1139  {
1140  using namespace std;
1141 
1142  try {
1143 
1144  timesliceRouter->configure(timeslice);
1145 
1146  if (parameters.writeSummary()) {
1147  this->put(JDAQSummaryslice(timeslice));
1148  }
1149 
1150  if (parameters.trigger3DMuon.enabled ||
1151  parameters.trigger3DShower.enabled ||
1152  parameters.triggerMXShower.enabled ||
1153  parameters.triggerNB.enabled ||
1154  parameters.writeL0.prescale ||
1155  parameters.writeL1.prescale ||
1156  parameters.writeL2.prescale ||
1157  parameters.writeSN.prescale ||
1158  c_buffer.is_enabled()) {
1159 
1160  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1161  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1162  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1163  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1164  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1165  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1166 
1167  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1168 
1169  if (moduleRouter->hasModule(frame->getModuleID())) {
1170 
1171  const JChecksum::result_type& result = checksum(*frame);
1172 
1173  if (!result) {
1174 
1175  JWarningStream(logger) << "Invalid data at "
1176  << "run = " << timeslice.getRunNumber() << ";"
1177  << "frame index = " << timeslice.getFrameIndex() << ";"
1178  << "module = " << frame->getModuleID() << ";"
1179  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1180 
1181  if (dumpCount < dumpLimit && result.has(dumpMask)) {
1182  timesliceTX.push_back(*frame);
1183  }
1184 
1185  continue;
1186  }
1187 
1188  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1189  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1190 
1191  // Apply high-rate veto
1192 
1193  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1194 
1195  // L0
1196 
1197  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1198 
1199  // Nano-beacon trigger
1200 
1201  if (parameters.triggerNB.enabled) {
1202 
1203  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1204 
1205  if (buffer.begin() != __end) {
1206 
1207  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1208  frame->getModuleIdentifier(),
1209  module.getPosition()));
1210 
1211  JSuperFrame1D_t zbuf;
1212 
1213  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1214 
1215  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1216  }
1217  }
1218 
1219  // L1
1220 
1221  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1222  frame->getModuleIdentifier(),
1223  module.getPosition()));
1224 
1225  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1226 
1227  // L2
1228 
1229  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1230  frame->getModuleIdentifier(),
1231  module.getPosition()));
1232 
1233  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1234 
1235  // SN
1236 
1237  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1238  frame->getModuleIdentifier(),
1239  module.getPosition()));
1240 
1241  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1242 
1243  } else {
1244 
1245  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1246  }
1247  }
1248 
1249  if (!timesliceTX.empty()) {
1250 
1251  if (dumpCount < dumpLimit) {
1252 
1253  this->put(timesliceTX);
1254 
1255  dumpCount += 1;
1256  }
1257  }
1258 
1259  // Trigger
1260 
1261  if (parameters.triggerNB.enabled) {
1262 
1263  const JTriggerInput trigger_input(timesliceNB);
1264 
1265  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1266 
1267  if (parameters.triggerNB.write()) {
1268 
1269  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1270  getTriggerMask(triggerNB->getTriggerBit()),
1271  *hit,
1272  *timesliceRouter,
1273  *moduleRouter,
1274  parameters.TMaxLocal_ns,
1275  parameters.triggerNB.DMax_m,
1276  getTimeRange(parameters.triggerNB));
1277 
1278  this->put(tev);
1279  }
1280  }
1281  }
1282 
1283  JTriggerInput trigger_input(timesliceL2);
1284  JTriggerOutput trigger_output;
1285 
1286  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1287  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1288  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1289 
1290  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1291 
1292  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1293 
1294  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1295 
1296  this->put(object);
1297 
1298  numberOfEvents += 1;
1299  }
1300 
1301  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1302 
1303  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1304 
1305  if (parameters.writeL1) { this->put(object); }
1306  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1307  }
1308 
1309  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1310 
1311  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1312 
1313  if (parameters.writeL2) { this->put(object); }
1314  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1315  }
1316 
1317  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1318 
1319  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1320 
1321  if (parameters.writeSN) { this->put(object); }
1322  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1323  }
1324 
1325  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1326 
1327  if (parameters.writeL0) { this->put(timeslice); }
1328  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1329  }
1330  }
1331 
1332  } catch(const std::exception& error) {
1333 
1334  JErrorStream(logger) << "Error = " << error.what() << ";"
1335  << "run = " << timeslice.getRunNumber() << ";"
1336  << "frame index = " << timeslice.getFrameIndex() << ";"
1337  << "time slice not correctly processed;"
1338  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1339 
1340  if (dumpCount < dumpLimit) {
1341 
1342  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1343 
1344  dumpCount += 1;
1345  }
1346  }
1347 
1348  timesliceRouter->reset();
1349  }
1350 
1351 
1352  /**
1353  * Report status to message logger.
1354  */
1355  void typeout()
1356  {
1357  timer.stop();
1358 
1359  const double T_us = (double) timer.usec_wall;
1360 
1361  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1362  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1363  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1364  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1365  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1366  JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1367 
1368  if (number_of_packets_received > 0) {
1369  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1370  }
1371 
1372  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1373  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice << ' ' << FIXED(5,3) << factor_per_slice;
1374 
1375  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1376 
1377  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1378  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1379 
1380  if (processedSlicesTime_us > 0) {
1381  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1382  }
1383  if (processedDetectorTime_us > 0) {
1384  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1385  }
1386 
1387  timer.start();
1388  }
1389 
1390 
1391  /**
1392  * Tagged action to handle alerts.
1393  *
1394  * \param tag tag
1395  * \param length number of characters
1396  * \param buffer message
1397  */
1398  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1399  {
1400  using namespace std;
1401 
1402  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1403 
1404  if (tag == RC_ALERT) {
1405 
1406  if (c_buffer.is_open()) {
1407 
1408  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1409 
1410  c_buffer.close(true);
1411  }
1412 
1413  if (c_buffer.is_enabled()) {
1414 
1415  c_buffer.open();
1416 
1417  if (c_buffer.is_open()) {
1418 
1419  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1420 
1421  putObject(c_buffer.getFile(), meta);
1422 
1423  } else {
1424 
1425  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1426 
1427  c_buffer.disable();
1428  }
1429  }
1430 
1431  } else {
1432 
1433  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1434  }
1435  }
1436 
1437  JMeta meta; //!< meta data
1438 
1439  private:
1440 
1442  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1443  std::string hostname; //!< host name of data server
1444 
1445  /**
1446  * Auxiliary method to send object to data server.
1447  *
1448  * \param object object to be sent
1449  */
1450  template<class T>
1451  void put(const T& object)
1452  {
1453  try {
1454 
1455  const localtime_t t0 = getLocalTime();
1456 
1457  datawriter->put(object);
1458 
1459  const localtime_t t1 = getLocalTime();
1460 
1461  numberOfBytes += getSizeof(object);
1462 
1463  Qx.put(t1 - t0);
1464  }
1465  catch(const std::exception& error) {
1466  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1467  ev_error();
1468  }
1469  }
1470 
1471 
1472  int port; //!< server socket port
1473  int backlog;
1474 
1475  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1476  JChannelList_t channelList; //!< connections to data queue
1477 
1478  JTimer timer;
1480 
1481  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1487 
1488  struct :
1489  public std::map<int, int>
1490  {
1491  /**
1492  * Get minimal frame index.
1493  *
1494  * \return frame index
1495  */
1496  int min() const
1497  {
1498  if (this->empty()) {
1499 
1500  return -1;
1501 
1502  } else {
1503 
1504  int min = std::numeric_limits<int>::max();
1505 
1506  for (const_iterator i = this->begin(); i != this->end(); ++i) {
1507  if (i->second < min) {
1508  min = i->second;
1509  }
1510  }
1511 
1512  return min;
1513  }
1514  }
1515 
1516  /**
1517  * Get maximal frame index.
1518  *
1519  * \return frame index
1520  */
1521  int max() const
1522  {
1523  if (this->empty()) {
1524 
1525  return -1;
1526 
1527  } else {
1528 
1529  int max = std::numeric_limits<int>::lowest();
1530 
1531  for (const_iterator i = this->begin(); i != this->end(); ++i) {
1532  if (i->second > max) {
1533  max = i->second;
1534  }
1535  }
1536 
1537  return max;
1538  }
1539  }
1540 
1541  } dataqueue_slice_index;
1542 
1543  // trigger
1544 
1547 
1554 
1559 
1565 
1569 
1570  // process management
1571 
1574 
1575  // memory management
1576 
1577  long long int totalCPURAM;
1579  long long int maxQueueSize;
1580  long long int queueSize;
1581 
1582  // statistics
1583 
1585 
1586  long long int numberOfEvents;
1587  long long int numberOfBytes;
1590 
1593 
1594  // temporary
1595 
1598  long long int number_of_reads;
1600 
1601  // circular buffer
1602 
1604  };
1605 }
1606 
1607 /**
1608  * \file
1609  *
1610  * Application for real-time filtering of data.
1611  * For more information, see KM3NETDAQ::JDataFilter.
1612  *
1613  * \author rbruijn and mdejong
1614  */
1615 int main(int argc, char* argv[])
1616 {
1617  using namespace std;
1618  using namespace JPP;
1619  using namespace KM3NETDAQ;
1620 
1621  string server;
1622  string logger;
1623  string hostname;
1624  string client_name;
1625  int port;
1626  int backlog;
1627  bool use_cout;
1628  string path;
1629  string archive;
1630  int debug;
1631 
1632 
1633  try {
1634 
1635  JParser<> zap("Application for real-time filtering of data.");
1636 
1637  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1638  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1639  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1640  zap['u'] = make_field(client_name, "client name") = "%";
1641  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1642  zap['q'] = make_field(backlog, "back log") = 1024;
1643  zap['c'] = make_field(use_cout, "print to terminal");
1644  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1645  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1646  zap['d'] = make_field(debug, "debug level") = 0;
1647 
1648  zap(argc, argv);
1649  }
1650  catch(const std::exception& error) {
1651  FATAL(error.what() << endl);
1652  }
1653 
1654 
1655  JLogger* out = NULL;
1656 
1657  if (use_cout)
1658  out = new JStreamLogger(cout);
1659  else
1660  out = new JControlHostLogger(logger);
1661 
1662  JDataFilter dfilter(client_name,
1663  server,
1664  hostname,
1665  out,
1666  debug,
1667  port,
1668  backlog,
1669  path,
1670  archive);
1671 
1672  dfilter.meta = JMeta(argc, argv);
1673 
1674  dfilter.enter();
1675  dfilter.run();
1676 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Exception for opening of file.
Definition: JException.hh:358
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:398
Utility class to parse command line options.
Definition: JParser.hh:1514
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:82
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:283
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:67
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:217
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:214
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:209
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:381
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:67
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -&gt; JDataWriter.cc
Definition: JDAQTags.hh:32
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:497
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:84
JTriggerParameters parameters
Auxiliary data structure for floating point format specification.
Definition: JManip.hh:446
Main class for real-time filtering of data.
Definition: JDataFilter.cc:204
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
Definition: JFilesystem.hh:49
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:200
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:241
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:385
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:216
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:347
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:213
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
const int n
Definition: JPolint.hh:786
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
Definition: getArchive.sh:42
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc &lt;- JLigier.cc
Definition: JDAQTags.hh:31
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:215
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:812
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:210
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:363
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
bool has(const JStatus &mask) const
Check for errors with given error mask.
Definition: JChecksum.hh:107
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
int getFrameIndex(const double t_ns)
Get frame index for a given time in ns.
Definition: JDAQClock.hh:251
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc &lt;- DataQueue.cc
Definition: JDAQTags.hh:30
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
Definition: JVectorize.hh:54
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for handling status.
Definition: JStatus.hh:37
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:65
then awk string
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
size_t getMaximum(const array_type< size_t > &buffer)
Get maximum of values.
Definition: JDataFilter.cc:99
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:445
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:387
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:834
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:383
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
Definition: JDataFilter.cc:120
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:133
std::string hostname
host name of data server
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
TCP socket.
Definition: JTCPSocket.hh:25
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:725
const char * data() const
Get data.
Definition: JByteArrayIO.hh:68
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:824
long long int queueSize
TDC value error.
Definition: JChecksum.hh:47
Time order error.
Definition: JChecksum.hh:48
virtual void actionExit() override
Definition: JDataFilter.cc:449
JSinglePointer< JTimesliceRouter > timesliceRouter
PMT number error.
Definition: JChecksum.hh:46
Auxiliary data structure for average.
Definition: JKatoomba_t.hh:76
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:298
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
Auxiliary class for date and time.
Definition: JDateAndTime.hh:78
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:73
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:854
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:380
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JMessageScheduler logErrorOvercomplete
System auxiliaries.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
std::set< int > modules
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:60
Data structure for input to trigger algorithm.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:818
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:417
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:462
Auxiliary data structure for result of checksum.
Definition: JChecksum.hh:88
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:778
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:382
Data frame of one optical module.
long long int number_of_reads
Auxiliary data structure for return type of make methods.
Definition: JVectorize.hh:26
long long int numberOfEvents
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:386
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:840
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:483
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.
int size() const
Get size.
Definition: JByteArrayIO.hh:57