Jpp  18.6.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "JLang/JVectorize.hh"
14 #include "Jeep/JParser.hh"
15 #include "Jeep/JProperties.hh"
16 #include "Jeep/JTimer.hh"
17 #include "Jeep/JTimekeeper.hh"
18 #include "Jeep/JMessage.hh"
19 #include "Jeep/JPrint.hh"
20 #include "Jeep/JeepToolkit.hh"
21 #include "Jeep/JStatus.hh"
28 #include "JDAQ/JDAQTags.hh"
29 #include "JDAQ/JDAQEventIO.hh"
30 #include "JDAQ/JDAQTimesliceIO.hh"
32 #include "JDetector/JDetector.hh"
33 #include "JTrigger/JHit.hh"
34 #include "JTrigger/JHitToolkit.hh"
37 #include "JTrigger/JTimeslice.hh"
38 #include "JTrigger/JHitL0.hh"
39 #include "JTrigger/JHitL1.hh"
40 #include "JTrigger/JBuildL1.hh"
41 #include "JTrigger/JBuildL2.hh"
45 #include "JTrigger/JTriggerNB.hh"
46 #include "JTrigger/JTriggerBits.hh"
50 #include "JTrigger/JTimesliceL1.hh"
53 #include "JTrigger/JChecksum.hh"
56 #include "JNet/JControlHost.hh"
58 #include "JNet/JTCPSocket.hh"
59 #include "JNet/JSocketChannel.hh"
60 #include "JNet/JServerSocket.hh"
61 #include "JPhysics/JConstants.hh"
62 #include "JTools/JQuantile.hh"
63 #include "JSupport/JSupport.hh"
65 #include "JSupport/JMeta.hh"
66 #include "JSystem/JStat.hh"
67 #include "JSystem/JTime.hh"
69 #include "JSystem/JNetwork.hh"
70 #include "JSystem/JFilesystem.hh"
71 
72 
73 namespace JNET {
74 
75  /**
76  * Get size of packeet.
77  *
78  * \param preamble DAQ data preamble
79  * \return size [B]
80  */
81  template<>
83  {
84  return preamble.getLength();
85  }
86 }
87 
88 namespace KM3NETDAQ {
89 
90  using namespace JPP;
91 
92 
93  /**
94  * Get expected number of frames according a given allowed fraction of active modules.
95  *
96  * \param number_of_frames number of frames
97  * \param factor factor
98  * \return number of frames
99  */
100  inline size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
101  {
102  if (factor <= 0.0) {
103  return 1;
104  }
105 
106  if (factor >= 1.0) {
107  return number_of_frames;
108  }
109 
110  const size_t n = (size_t) (number_of_frames * factor);
111 
112  return (n == 0 ? 1 : n);
113  }
114 
115 
116  /**
117  * Main class for real-time filtering of data.
118  *
119  * This class implements the action methods for each transition of the state machine.\n
120  * When the state machine is entered, data are continually collected using
121  * custom implementation of virtual methods
122  * setSelect and
123  * actionSelect.
124  *
125  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
126  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
127  * When a JDAQTimeslice is complete,
128  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.
129  *
130  * The completeness of a time slice is defined by two parameters, namely
131  * <tt>frames_per_slice</tt> and
132  * <tt>factor_per_slice</tt>.\n
133  * These are parsed in method actionConfigure as follows.
134  * <pre>
135  * numberOfFramesPerSlice = <frames per slice> = 1;
136  * factorOfFramesPerSlice = <factor per slice> = 1.0;
137  * </pre>
138  * The expected number of frames in a time slices is defined by their product.\n
139  * Note that the first parameter can change during operation (see below).
140  *
141  * A timeout may occur when the total amount of data or the number of incomplete time slices
142  * in the queue exceeds one of the limits <tt>queueSize</tt> or <tt>queueDepth</tt>.\n
143  * These are parsed in method actionConfigure as follows.
144  * <pre>
145  * queueSize = <maximal queue size [B]>;
146  * queueDepth = <maximal queue depth>;
147  * </pre>
148  * Note that these values apply per JDataFilter.\n
149  *
150  * The parameter <tt>frames_per_slice</tt> is subject to a servo mechanism.\n
151  * When a timeout occurs,
152  * it is set to the number of frames in the oldest time slice.\n
153  * When data are received with a frame index below the current frame index,
154  * it is incremented by one.
155  *
156  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
157  * The following parameters can be used to configure the circular buffer.\n
158  * <pre>
159  * path = <write directory for temporary circular buffer>;
160  * archive = <write directory for archival of circular buffer>;
161  * c_sizeL0 = <L0 buffer size>;
162  * c_sizeL1 = <L1 buffer size>;
163  * c_sizeL2 = <L2 buffer size>;
164  * c_sizeSN = <SN buffer size>;
165  * </pre>
166  *
167  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
168  * - There will always be a file created which is deleted when the state machine is exited;
169  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
170  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
171  * - After archival of the temporary file, a new temporary file will be opened;
172  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
173  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
174  *
175  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
176  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
177  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
178  * (e.g. when there is more than one alert on a given day).
179  *
180  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
181  *
182  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
183  */
184  class JDataFilter :
185  public JDAQClient
186  {
187  public:
188 
191 
192  typedef double hit_type;
198 
199 
200 
201  /**
202  * Circular buffer.
203  */
205  public JTreeRecorder<JDAQTimesliceTypes_t>
206  {
209 
210 
211  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
212 
213 
214  /**
215  * Constructor.
216  *
217  * \param path directory for temporary storage
218  * \param archive directory for permanent archival
219  * \param tag tag
220  */
221  JCircularBuffer_t(const std::string& path,
222  const std::string& archive,
223  const JTag& tag) :
224  path (path),
225  archive(archive),
226  tag (tag)
227  {
228  disable();
229  }
230 
231 
232  /**
233  * Open file.
234  *
235  * If file with same name exists, remove it beforehand.
236  */
237  void open()
238  {
239  using namespace std;
240  using namespace JPP;
241 
242  gErrorIgnoreLevel = kFatal;
243 
244  std::ostringstream os;
245 
246  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
247 
248  if (getFileStatus(os.str().c_str())) {
249  std::remove(os.str().c_str());
250  }
251 
252  this->open(os.str().c_str());
253  }
254 
255 
256  /**
257  * Close file.
258  *
259  * If option is true, archive file; else delete file.
260  *
261  * \param option option
262  */
263  void close(const bool option)
264  {
265  using namespace std;
266  using namespace JPP;
267 
268  const JDateAndTime cal;
269 
270  if (this->is_open()) {
271 
272  const string file_name = this->getFile()->GetName();
273 
274  this->close();
275 
276  if (option) {
277 
278  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
279 
280  ostringstream os;
281 
282  os << getFullPath(this->archive)
283  << "KM3NeT"
284  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
285  << "_" << this->tag;
286 
287  if (i != 0) {
288  os << "_" << i;
289  }
290 
291  os << ".root";
292 
293  if (!getFileStatus(os.str().c_str())) {
294 
295  if (JSYSTEM::rename(file_name, os.str()) == 0)
296  return;
297  else
298  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
299  }
300  }
301 
302  } else {
303 
304  std::remove(file_name.c_str());
305  }
306  }
307  }
308 
309 
310  /**
311  * Disable writing.
312  */
313  void disable()
314  {
315  sizeL0 = 0;
316  sizeL1 = 0;
317  sizeL2 = 0;
318  sizeSN = 0;
319  }
320 
321 
322  /**
323  * Check whether writing of data is enabled.
324  *
325  * \return true if writing enabled; else false
326  */
327  bool is_enabled() const
328  {
329  return (sizeL0 > 0 ||
330  sizeL1 > 0 ||
331  sizeL2 > 0 ||
332  sizeSN > 0);
333  }
334 
335 
336  /**
337  * Write circular buffer to output stream.
338  *
339  * \param out output stream
340  * \param object circular buffer
341  * \return output stream
342  */
343  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
344  {
345  if (object.is_open())
346  out << object.getFile()->GetName();
347  else
348  out << "void";
349 
350  out << ' ';
351 
352  out << object.sizeL0 << '/'
353  << object.sizeL1 << '/'
354  << object.sizeL2 << '/'
355  << object.sizeSN << '/';
356 
357  return out;
358  }
359 
360  Long64_t sizeL0; //!< Number of L0 time slices
361  Long64_t sizeL1; //!< Number of L1 time slices
362  Long64_t sizeL2; //!< Number of L2 time slices
363  Long64_t sizeSN; //!< Number of SN time slices
364 
365  std::string path; //!< Directory for temporary storage
366  std::string archive; //!< Directory for permanent archival
367  JTag tag; //!< Unique tag of this process
368  };
369 
370 
371  /**
372  * Sort DAQ process by index.
373  *
374  * \param first first DAQ process
375  * \param second second DAQ process
376  * \return true if index of first DAQ process less than that of second; else false
377  */
378  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
379  {
380  return first.index < second.index;
381  }
382 
383 
384  /**
385  * Constructor.
386  *
387  * \param name name of client
388  * \param server name of command message server
389  * \param hostname name of data server
390  * \param logger pointer to logger
391  * \param level debug level
392  * \param port server port
393  * \param backlog server backlog
394  * \param path directory for temporary storage
395  * \param archive directory for parmanent archival
396  */
397  JDataFilter(const std::string& name,
398  const std::string& server,
399  const std::string& hostname,
400  JLogger* logger,
401  const int level,
402  const int port,
403  const int backlog,
404  const std::string& path,
405  const std::string& archive) :
406  JDAQClient (name,server,logger,level),
407  hostname (hostname),
408  port (port),
409  backlog (backlog),
410  c_buffer (path, archive, getUniqueTag())
411  {
412  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
413 
414  addSubscription(JSubscriptionAll(RC_ALERT));
415 
416  totalCPURAM = getRAM();
417  current_slice_index = -1;
418  dataqueue_slice_index.clear();
419  reporting = false;
420 
421  this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
422  }
423 
424 
425  virtual void actionEnter() override
426  {}
427 
428 
429  virtual void actionExit() override
430  {
431  if (c_buffer.is_open()) {
432 
433  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
434 
435  c_buffer.close(false);
436  }
437 
438  datawriter.reset();
439  }
440 
441 
442  virtual void actionInit(int length, const char* buffer) override
443  {
444  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
445 
446  try {
447 
448  JDebugStream(logger) << "Start server.";
449 
450  if (serversocket.is_valid()) {
451  serversocket->shutdown();
452  }
453 
454  serversocket.reset(new JServerSocket(port,backlog));
455  }
456  catch(const std::exception& error) {
457  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
458  ev_error();
459  }
460  }
461 
462 
463  virtual void actionConfigure(int length, const char* buffer) override
464  {
465  using namespace std;
466 
467  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
468 
469  string _hostname_ = "";
470 
471  long long int update_s = 20;
472  long long int logger_s = 10;
473 
474  parameters .reset();
475  dataFilters.clear();
476  dataQueues .clear();
477 
478  reporting = false;
479  dumpCount = 0;
480  dumpLimit = numeric_limits<int>::max();
481  dumpMask = 0;
482  dumpMask.set(JChecksum::EPMT_t);
483  dumpMask.set(JChecksum::ETDC_t);
484  dumpMask.set(JChecksum::TIME_t);
485  //dumpMask.set(JChecksum::EUDP_t);
486  dumpMask.set(JChecksum::SIZE_t);
487 
488  detector.comment.clear();
489  detector.clear();
490 
491  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
492 
493  properties["dataWriter"] = _hostname_;
494  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
495  properties["factorOfFramesPerSlice"] = factor_per_slice = 1.0;
496  properties["detector"] = detector;
497  properties["triggerParameters"] = parameters;
498  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
499  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
500  properties["frameIndex"] = maximal_frame_index = 100000;
501  properties["logger_s"] = logger_s;
502  properties["update_s"] = update_s;
503  properties["JDataFilter"] = dataFilters;
504  properties["DataQueue"] = dataQueues;
505  properties["path"] = c_buffer.path;
506  properties["archive"] = c_buffer.archive;
507  properties["c_sizeL0"] = c_buffer.sizeL0;
508  properties["c_sizeL1"] = c_buffer.sizeL1;
509  properties["c_sizeL2"] = c_buffer.sizeL2;
510  properties["c_sizeSN"] = c_buffer.sizeSN;
511  properties["dumpLimit"] = dumpLimit;
512  properties["dumpMask"] = dumpMask;
513 
514  try {
515  properties.read(string(buffer, length));
516  }
517  catch(const std::exception& error) {
518  JErrorStream(logger) << error.what();
519  }
520 
521  if (update_s <= 0) { update_s = 20; }
522  if (logger_s <= 0) { logger_s = 10; }
523 
524  setClockInterval(update_s * 1000000LL);
525 
526  _hostname_ = trim(_hostname_);
527 
528  if (_hostname_ != "" && _hostname_ != hostname) {
529 
530  datawriter.reset();
531 
532  hostname = _hostname_;
533  }
534 
535  bool status = datawriter.is_valid();
536 
537  if (status) {
538 
539  try {
540  status = datawriter->Connected() == 0;
541  }
542  catch (const exception&) {
543  status = false;
544  }
545  }
546 
547  if (!status) {
548 
549  datawriter.reset(new JControlHost_t(hostname));
550 
551  datawriter->MyId(getFullName());
552  }
553 
554  datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
555 
556  // process processlist
557 
558  if (dataFilters.empty()) {
559  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
560  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
561  }
562 
563  sort(dataFilters.begin(), dataFilters.end(), compare);
564 
565  unsigned int numberOfDataFiltersOnThisMachine = 0;
566  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
567 
569 
570  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
571 
572  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
573 
574  numberOfDataFiltersOnThisMachine++;
575 
576  if (i->port == this->port) {
577  thisProcess = i;
578  }
579  }
580  }
581 
582  if (numberOfDataFiltersOnThisMachine == 0) {
583  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
584  << "Assuming one datafilter on this machine.";
585  numberOfDataFiltersOnThisMachine = 1;
586  }
587 
588  if (thisProcess == dataFilters.end()) {
589 
590  JErrorStream error(logger);
591 
592  error << "This process cannot be found in the process list. Why do I exist?";
593  error << " my IP addresses:";
594 
595  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
596  error << ' ' << *i;
597  }
598 
599  error << " my port: " << this->port;
600  error << " process list";
601 
602  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
603  error << ' ' << i->hostname << ':' << i->port;
604  }
605  }
606 
607  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
608  JErrorStream(logger) << "Mismatch between given process names: "
609  << "I am called " << getName()
610  << ", but in the process list I am referred to as " << thisProcess->index;
611  }
612 
613  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
614  reporting = true;
615  }
616 
617  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
618 
619  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
620 
621  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
622  << "Queue size reduced to "
623  << maxQueueSize << " bytes." ;
624  }
625 
626  // detector
627 
628  if (parameters.disableHighRateVeto) {
629 
630  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
631 
632  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
633  }
634 
635  // trigger parameters
636 
638 
639  triggerNB .reset(new JTriggerNB (parameters));
640  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
641  trigger3DShower.reset(new JTrigger3DShower(parameters));
642  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
643 
644  moduleRouter.reset(new JModuleRouter(detector));
645 
646  if (reporting) {
647  JNoticeStream(logger) << "This data filter process will report.";
648  JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
649  JDebugStream (logger) << "Trigger parameters: " << parameters;
650  JDebugStream (logger) << "Detector description: " << endl << detector;
651  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
652  }
653 
654  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
655 
656  // set L1, L2 and SN builders
657 
658  buildL1.reset(new JBuildL1_t(parameters));
659  buildL2.reset(new JBuildL2_t(parameters.L2));
660  buildSN.reset(new JBuildL2_t(parameters.SN));
661  buildNB.reset(new JBuildL2_t(parameters.NB));
662 
663  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
664  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
665  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
666  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
667 
668  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
669  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
670  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
671  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
672  logErrorOvercomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
673 
674  if (c_buffer.is_enabled()) {
675 
676  if (!c_buffer.is_open()) {
677 
678  c_buffer.open();
679 
680  if (c_buffer.is_open()) {
681 
682  putObject(c_buffer.getFile(), meta);
683 
684  JStatusStream(logger) << "Created circular buffer " << c_buffer;
685 
686  } else {
687 
688  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
689  }
690 
691  } else {
692 
693  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
694  }
695  }
696 
697  if (c_buffer.is_open()) {
698  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
699  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
700  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
701  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
702  } else {
703  c_buffer.disable();
704  }
705  }
706 
707 
708  virtual void actionStart(int length, const char* buffer) override
709  {
710  using namespace std;
711 
712  if (reporting) {
713  JNoticeStream(logger) << "Start run " << getRunNumber();
714  }
715 
716  timeslices.clear();
717 
718  current_slice_index = -1;
719  dataqueue_slice_index.clear();
720  queueSize = 0;
721 
722  numberOfEvents = 0;
723  numberOfBytes = 0;
724  numberOfTimeslicesProcessed = 0;
725  numberOfIncompleteTimeslicesProcessed = 0;
726 
727  number_of_packets_received = 0;
728  number_of_packets_discarded = 0;
729  number_of_bytes_received = 0;
730  number_of_reads = 0;
731 
732  minFrameNumber = numeric_limits<int>::max();
733  maxFrameNumber = numeric_limits<int>::min();
734 
735  // Reset global trigger counter.
736 
738 
739  logErrorRun .reset();
740  logErrorDetector .reset();
741  logErrorIndex .reset();
742  logErrorIncomplete .reset();
743  logErrorOvercomplete.reset();
744 
745  timer.reset();
746  timer.start();
747 
748  Qt.reset();
749  Qx.reset();
750 
751  // send trigger parameters to the datawriter
752 
753  ostringstream os;
754 
755  os << getRunNumber() << ' ' << parameters;
756 
757  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
758  }
759 
760 
761  virtual void actionPause(int length, const char* buffer) override
762  {
763  using namespace std;
764 
765  if (!timeslices.empty()) {
766 
767  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
768 
769  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
770  queueSize -= getSizeof(*i);
771  }
772 
773  timeslices.clear();
774  }
775 
776  { // force clearance of memory
777 
778  deque<JDAQTimesliceL0> buffer;
779 
780  timeslices.swap(buffer);
781  }
782 
783  if (queueSize != 0) {
784  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
785  }
786 
787  current_slice_index = -1;
788  dataqueue_slice_index.clear();
789  queueSize = 0;
790 
791  timer.stop();
792  }
793 
794 
795  virtual void actionContinue(int length, const char* buffer) override
796  {
797  timer.start();
798  }
799 
800 
801  virtual void actionStop(int length, const char* buffer) override
802  {
803  typeout();
804  }
805 
806 
807  virtual void actionReset(int length, const char* buffer) override
808  {
809  if (serversocket.is_valid()) {
810  serversocket->shutdown();
811  }
812 
813  serversocket.reset();
814  }
815 
816 
817  virtual void actionQuit(int length, const char* buffer) override
818  {
819  datawriter.reset();
820  }
821 
822 
823  virtual void setSelect(JFileDescriptorMask& mask) const override
824  {
825  if (serversocket.is_valid()) {
826  mask.set(*serversocket);
827  }
828 
829  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
830  if (!channel->isReady()) {
831  mask.set(channel->getFileDescriptor());
832  }
833  }
834  }
835 
836 
837  virtual void actionSelect(const JFileDescriptorMask& mask) override
838  {
839  using namespace std;
840 
841  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
842 
843  try {
844 
845  if (mask.has(channel->getFileDescriptor())) {
846  channel->read();
847  }
848 
849  if (channel->isReady()) {
850 
851  number_of_packets_received += 1;
852  number_of_reads += channel->getCounter();
853  number_of_bytes_received += channel->size();
854 
855  if (isRunning()) {
856 
857  try {
858  updateFrameQueue(*channel);
859  }
860  catch(const std::exception& error) {
861 
862  JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
863 
864  number_of_packets_discarded += 1;
865  }
866 
867  } else {
868 
869  JErrorStream(logErrorRun) << "Receiving data while not running.";
870 
871  number_of_packets_discarded += 1;
872  }
873 
874  channel->reset();
875  }
876 
877  ++channel;
878  }
879  catch(const std::exception& error) {
880 
881  if (isRunning()) {
882  JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
883  }
884 
885  channel->shutdown();
886 
887  channel = channelList.erase(channel);
888  }
889  }
890 
891 
892  if (serversocket.is_valid()) {
893 
894  if (mask.has(*serversocket)) {
895 
896  JTCPSocket socket(serversocket->getFileDescriptor());
897 
899 
900  socket.setKeepAlive (true);
901  socket.setNonBlocking(false);
902 
903  JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']' << ' ' << socket.getReceiveBufferSize();
904 
905  channelList.push_back(JSocketInputChannel_t(socket));
906  }
907  }
908 
909 
910  if (!timeslices.empty()) {
911 
912  const size_t number_of_frames = getNumberOfFrames(frames_per_slice, factor_per_slice);
913  const size_t maximum_in_queue = getMaximum(make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (size_t) 0);
914 
915  if (((timeslices[0].size() >= number_of_frames && // normal
916  timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
917 
918  (maximum_in_queue >= number_of_frames && // intermittent problem
919  timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
920 
921  (timeslices.size() >= maxQueueDepth) || // timeout
922  (queueSize >= maxQueueSize))) {
923 
924 
925  const JDAQTimesliceL0& pending_slice = timeslices.front();
926  queueSize -= getSizeof(pending_slice);
927 
928  current_slice_index = pending_slice.getFrameIndex();
929  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
930  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
931 
932  for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
933  modules.insert(i->getModuleID());
934  }
935 
936 
937  if (isRunning()) {
938 
939  const localtime_t t0 = getLocalTime();
940 
941  if (!pending_slice.empty()) {
942  processTimeSlice(pending_slice);
943  }
944 
945  const localtime_t t1 = getLocalTime();
946 
947  numberOfTimeslicesProcessed += 1;
948 
949  Qt.put(t1 - t0);
950 
951  } else {
952 
953  JErrorStream(logErrorRun) << "Skip processing of data while not running.";
954  }
955 
956 
957  if (modules.size() > frames_per_slice) {
958 
959  JErrorStream(logErrorOvercomplete) << "More active modules than expected "
960  << modules.size() << " > " << frames_per_slice
961  << " adjusting frames per slice to " << modules.size();
962 
963  frames_per_slice = modules.size();
964  }
965 
966 
967  if (pending_slice.size() < number_of_frames) {
968 
969  numberOfIncompleteTimeslicesProcessed += 1;
970 
971  ostringstream error;
972 
973  error << "Timeout -> processed incomplete timeslice: "
974  << "Frame index = " << pending_slice.getFrameIndex() << ';'
975  << "Size of timeslice = " << pending_slice.size() << ';'
976  << "Queue depth = " << timeslices.size() << ';'
977  << "Queue size = " << queueSize << ';'
978  << "DataQueue min = " << dataqueue_slice_index.min() << ';'
979  << "DataQueue max = " << dataqueue_slice_index.max() << ';';
980 
981  if (maximum_in_queue >= number_of_frames) {
982 
983  error << " intermittent problem -> continues as-is";
984 
985  } else {
986 
987  modules.clear(); // remove history
988 
989  for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
990  modules.insert(i->getModuleID());
991  }
992 
993  error << " adjusting frames per timeslice from " << frames_per_slice << " to " << modules.size();
994 
995  frames_per_slice = modules.size();
996  }
997 
998  JErrorStream(logErrorIncomplete) << error.str();
999  }
1000 
1001 
1002  timeslices.pop_front();
1003  }
1004  }
1005  }
1006 
1007 
1008  virtual void actionRunning() override
1009  {
1010  if (reporting) {
1011  typeout();
1012  }
1013  }
1014 
1015 
1016  /**
1017  * Update queue with data frames.
1018  *
1019  * Note that any discarded data will be reported.
1020  *
1021  * \param channel incoming data channel
1022  */
1024  {
1025  using namespace std;
1026 
1027  JByteArrayReader in(channel.data(), channel.size());
1028 
1029  JDAQPreamble preamble;
1030  JDAQSuperFrameHeader header;
1031 
1032  in >> preamble;
1033  in >> header;
1034 
1035  if (preamble.getLength() != channel.size()) {
1036 
1037  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
1038  << "preamble.getLength() = " << preamble.getLength() << ';'
1039  << "channel.size(): " << channel.size() << ';';
1040 
1041  number_of_packets_discarded += 1;
1042 
1043  return;
1044  }
1045 
1046  if (header.getRunNumber() != getRunNumber()) {
1047 
1048  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
1049  << " != " << getRunNumber()
1050  << " -> Dropping frame.";
1051 
1052  number_of_packets_discarded += 1;
1053 
1054  return;
1055  }
1056 
1057  if (header.getFrameIndex() <= current_slice_index) {
1058 
1059  number_of_packets_discarded += 1;
1060 
1061  if (modules.insert(header.getModuleID()).second) {
1062 
1063  frames_per_slice = modules.size();
1064 
1065  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
1066  << " module " << header.getModuleID()
1067  << " -> dropping frame;"
1068  << " increase number of frames expected to: " << frames_per_slice;
1069  }
1070 
1071  return;
1072  }
1073 
1074  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
1075 
1076  number_of_packets_discarded += 1;
1077 
1078  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1079  << " module " << header.getModuleID()
1080  << " -> Dropping frame.";
1081 
1082  return;
1083  }
1084 
1085  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1086 
1087  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1088  ++timesliceIterator;
1089  }
1090 
1091  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1092 
1093  // The corresponding time slice already exists
1094 
1095  } else {
1096 
1097  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1098 
1099  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1100 
1101  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1102 
1103  queueSize += getSizeof(*timesliceIterator);
1104  }
1105 
1106  timesliceIterator->push_back(JDAQSuperFrame(header));
1107 
1108  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1109 
1110  queueSize += getSizeof(*timesliceIterator->rbegin());
1111 
1112  dataqueue_slice_index[channel.getFileDescriptor()] = header.getFrameIndex();
1113  }
1114 
1115 
1116  /**
1117  * Process time slice.
1118  *
1119  * \param timeslice time slice
1120  */
1121  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1122  {
1123  using namespace std;
1124 
1125  try {
1126 
1127  timesliceRouter->configure(timeslice);
1128 
1129  if (parameters.writeSummary()) {
1130  this->put(JDAQSummaryslice(timeslice));
1131  }
1132 
1133  if (parameters.trigger3DMuon.enabled ||
1134  parameters.trigger3DShower.enabled ||
1135  parameters.triggerMXShower.enabled ||
1136  parameters.triggerNB.enabled ||
1137  parameters.writeL0.prescale ||
1138  parameters.writeL1.prescale ||
1139  parameters.writeL2.prescale ||
1140  parameters.writeSN.prescale ||
1141  c_buffer.is_enabled()) {
1142 
1143  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1144  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1145  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1146  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1147  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1148  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1149 
1150  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1151 
1152  if (moduleRouter->hasModule(frame->getModuleID())) {
1153 
1154  const JChecksum::result_type& result = checksum(*frame);
1155 
1156  if (!result) {
1157 
1158  JWarningStream(logger) << "Invalid data at "
1159  << "run = " << timeslice.getRunNumber() << ";"
1160  << "frame index = " << timeslice.getFrameIndex() << ";"
1161  << "module = " << frame->getModuleID() << ";"
1162  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1163 
1164  if (dumpCount < dumpLimit && result.has(dumpMask)) {
1165  timesliceTX.push_back(*frame);
1166  }
1167 
1168  continue;
1169  }
1170 
1171  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1172  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1173 
1174  // Apply high-rate veto
1175 
1176  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1177 
1178  // L0
1179 
1180  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1181 
1182  // Nano-beacon trigger
1183 
1184  if (parameters.triggerNB.enabled) {
1185 
1186  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1187 
1188  if (buffer.begin() != __end) {
1189 
1190  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1191  frame->getModuleIdentifier(),
1192  module.getPosition()));
1193 
1194  JSuperFrame1D_t zbuf;
1195 
1196  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1197 
1198  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1199  }
1200  }
1201 
1202  // L1
1203 
1204  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1205  frame->getModuleIdentifier(),
1206  module.getPosition()));
1207 
1208  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1209 
1210  // L2
1211 
1212  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1213  frame->getModuleIdentifier(),
1214  module.getPosition()));
1215 
1216  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1217 
1218  // SN
1219 
1220  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1221  frame->getModuleIdentifier(),
1222  module.getPosition()));
1223 
1224  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1225 
1226  } else {
1227 
1228  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1229  }
1230  }
1231 
1232  if (!timesliceTX.empty()) {
1233 
1234  if (dumpCount < dumpLimit) {
1235 
1236  this->put(timesliceTX);
1237 
1238  dumpCount += 1;
1239  }
1240  }
1241 
1242  // Trigger
1243 
1244  if (parameters.triggerNB.enabled) {
1245 
1246  const JTriggerInput trigger_input(timesliceNB);
1247 
1248  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1249 
1250  if (parameters.triggerNB.write()) {
1251 
1252  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1253  getTriggerMask(triggerNB->getTriggerBit()),
1254  *hit,
1255  *timesliceRouter,
1256  *moduleRouter,
1257  parameters.TMaxLocal_ns,
1258  parameters.triggerNB.DMax_m,
1259  getTimeRange(parameters.triggerNB));
1260 
1261  this->put(tev);
1262  }
1263  }
1264  }
1265 
1266  JTriggerInput trigger_input(timesliceL2);
1267  JTriggerOutput trigger_output;
1268 
1269  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1270  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1271  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1272 
1273  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1274 
1275  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1276 
1277  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1278 
1279  this->put(object);
1280 
1281  numberOfEvents += 1;
1282  }
1283 
1284  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1285 
1286  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1287 
1288  if (parameters.writeL1) { this->put(object); }
1289  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1290  }
1291 
1292  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1293 
1294  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1295 
1296  if (parameters.writeL2) { this->put(object); }
1297  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1298  }
1299 
1300  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1301 
1302  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1303 
1304  if (parameters.writeSN) { this->put(object); }
1305  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1306  }
1307 
1308  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1309 
1310  if (parameters.writeL0) { this->put(timeslice); }
1311  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1312  }
1313  }
1314 
1315  } catch(const std::exception& error) {
1316 
1317  JErrorStream(logger) << "Error = " << error.what() << ";"
1318  << "run = " << timeslice.getRunNumber() << ";"
1319  << "frame index = " << timeslice.getFrameIndex() << ";"
1320  << "time slice not correctly processed;"
1321  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1322 
1323  if (dumpCount < dumpLimit) {
1324 
1325  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1326 
1327  dumpCount += 1;
1328  }
1329  }
1330 
1331  timesliceRouter->reset();
1332  }
1333 
1334 
1335  /**
1336  * Report status to message logger.
1337  */
1338  void typeout()
1339  {
1340  timer.stop();
1341 
1342  const double T_us = (double) timer.usec_wall;
1343 
1344  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1345  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1346  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1347  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1348  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1349  JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1350 
1351  if (number_of_packets_received > 0) {
1352  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1353  }
1354 
1355  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1356  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice << ' ' << FIXED(5,3) << factor_per_slice;
1357 
1358  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1359 
1360  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1361  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1362 
1363  if (processedSlicesTime_us > 0) {
1364  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1365  }
1366  if (processedDetectorTime_us > 0) {
1367  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1368  }
1369 
1370  timer.start();
1371  }
1372 
1373 
1374  /**
1375  * Tagged action to handle alerts.
1376  *
1377  * \param tag tag
1378  * \param length number of characters
1379  * \param buffer message
1380  */
1381  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1382  {
1383  using namespace std;
1384 
1385  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1386 
1387  if (tag == RC_ALERT) {
1388 
1389  if (c_buffer.is_open()) {
1390 
1391  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1392 
1393  c_buffer.close(true);
1394  }
1395 
1396  if (c_buffer.is_enabled()) {
1397 
1398  c_buffer.open();
1399 
1400  if (c_buffer.is_open()) {
1401 
1402  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1403 
1404  putObject(c_buffer.getFile(), meta);
1405 
1406  } else {
1407 
1408  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1409 
1410  c_buffer.disable();
1411  }
1412  }
1413 
1414  } else {
1415 
1416  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1417  }
1418  }
1419 
1420  JMeta meta; //!< meta data
1421 
1422  private:
1423 
1425  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1426  std::string hostname; //!< host name of data server
1427 
1428  /**
1429  * Auxiliary method to send object to data server.
1430  *
1431  * \param object object to be sent
1432  */
1433  template<class T>
1434  void put(const T& object)
1435  {
1436  try {
1437 
1438  const localtime_t t0 = getLocalTime();
1439 
1440  datawriter->put(object);
1441 
1442  const localtime_t t1 = getLocalTime();
1443 
1444  numberOfBytes += getSizeof(object);
1445 
1446  Qx.put(t1 - t0);
1447  }
1448  catch(const std::exception& error) {
1449  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1450  ev_error();
1451  }
1452  }
1453 
1454 
1455  int port; //!< server socket port
1456  int backlog;
1457 
1458  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1459  JChannelList_t channelList; //!< connections to data queue
1460 
1461  JTimer timer;
1463 
1464  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1470 
1471  struct :
1472  public std::map<int, int>
1473  {
1474  /**
1475  * Get minimal frame index.
1476  *
1477  * \return frame index
1478  */
1479  int min() const
1480  {
1481  if (this->empty()) {
1482 
1483  return -1;
1484 
1485  } else {
1486 
1487  int min = std::numeric_limits<int>::max();
1488 
1489  for (const_iterator i = this->begin(); i != this->end(); ++i) {
1490  if (i->second < min) {
1491  min = i->second;
1492  }
1493  }
1494 
1495  return min;
1496  }
1497  }
1498 
1499  /**
1500  * Get maximal frame index.
1501  *
1502  * \return frame index
1503  */
1504  int max() const
1505  {
1506  if (this->empty()) {
1507 
1508  return -1;
1509 
1510  } else {
1511 
1512  int max = std::numeric_limits<int>::lowest();
1513 
1514  for (const_iterator i = this->begin(); i != this->end(); ++i) {
1515  if (i->second > max) {
1516  max = i->second;
1517  }
1518  }
1519 
1520  return max;
1521  }
1522  }
1523 
1524  } dataqueue_slice_index;
1525 
1526  // trigger
1527 
1530 
1537 
1542 
1548 
1552 
1553  // process management
1554 
1557 
1558  // memory management
1559 
1560  long long int totalCPURAM;
1562  long long int maxQueueSize;
1563  long long int queueSize;
1564 
1565  // statistics
1566 
1568 
1569  long long int numberOfEvents;
1570  long long int numberOfBytes;
1573 
1576 
1577  // temporary
1578 
1581  long long int number_of_reads;
1583 
1584  // circular buffer
1585 
1587  };
1588 }
1589 
1590 /**
1591  * \file
1592  *
1593  * Application for real-time filtering of data.
1594  * For more information, see KM3NETDAQ::JDataFilter.
1595  *
1596  * \author rbruijn and mdejong
1597  */
1598 int main(int argc, char* argv[])
1599 {
1600  using namespace std;
1601  using namespace JPP;
1602  using namespace KM3NETDAQ;
1603 
1604  string server;
1605  string logger;
1606  string hostname;
1607  string client_name;
1608  int port;
1609  int backlog;
1610  bool use_cout;
1611  string path;
1612  string archive;
1613  int debug;
1614 
1615 
1616  try {
1617 
1618  JParser<> zap("Application for real-time filtering of data.");
1619 
1620  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1621  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1622  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1623  zap['u'] = make_field(client_name, "client name") = "%";
1624  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1625  zap['q'] = make_field(backlog, "back log") = 1024;
1626  zap['c'] = make_field(use_cout, "print to terminal");
1627  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1628  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1629  zap['d'] = make_field(debug, "debug level") = 0;
1630 
1631  zap(argc, argv);
1632  }
1633  catch(const std::exception& error) {
1634  FATAL(error.what() << endl);
1635  }
1636 
1637 
1638  JLogger* out = NULL;
1639 
1640  if (use_cout)
1641  out = new JStreamLogger(cout);
1642  else
1643  out = new JControlHostLogger(logger);
1644 
1645  JDataFilter dfilter(client_name,
1646  server,
1647  hostname,
1648  out,
1649  debug,
1650  port,
1651  backlog,
1652  path,
1653  archive);
1654 
1655  dfilter.meta = JMeta(argc, argv);
1656 
1657  dfilter.enter();
1658  dfilter.run();
1659 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Exception for opening of file.
Definition: JException.hh:358
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:378
Utility class to parse command line options.
Definition: JParser.hh:1711
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:82
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:263
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:67
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:197
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:194
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:189
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:361
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:73
Match of two events considering overlap in time.
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -&gt; JDataWriter.cc
Definition: JDAQTags.hh:38
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
Utility class to parse parameter values.
Definition: JProperties.hh:497
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:90
JTriggerParameters parameters
Auxiliary data structure for floating point format specification.
Definition: JManip.hh:446
Main class for real-time filtering of data.
Definition: JDataFilter.cc:184
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:351
Tools for handling different hit types.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
Definition: JFilesystem.hh:49
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:200
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:221
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:365
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:196
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:327
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:193
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
const int n
Definition: JPolint.hh:786
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
Definition: getArchive.sh:42
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc &lt;- JLigier.cc
Definition: JDAQTags.hh:37
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:195
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:795
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:190
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:343
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
bool has(const JStatus &mask) const
Check for errors with given error mask.
Definition: JChecksum.hh:107
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
int getFrameIndex(const double t_ns)
Get frame index for a given time in ns.
Definition: JDAQClock.hh:251
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2158
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:391
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc &lt;- DataQueue.cc
Definition: JDAQTags.hh:36
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
Definition: JVectorize.hh:54
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for handling status.
Definition: JStatus.hh:37
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:65
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:425
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:367
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:817
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:363
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
Definition: JDataFilter.cc:100
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:148
then fatal The output file must have the wildcard in the name
Definition: JCanberra.sh:31
std::string hostname
host name of data server
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
TCP socket.
Definition: JTCPSocket.hh:28
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:708
const char * data() const
Get data.
Definition: JByteArrayIO.hh:68
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:807
long long int queueSize
TDC value error.
Definition: JChecksum.hh:47
Time order error.
Definition: JChecksum.hh:48
virtual void actionExit() override
Definition: JDataFilter.cc:429
JSinglePointer< JTimesliceRouter > timesliceRouter
PMT number error.
Definition: JChecksum.hh:46
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:298
then fatal The output file must have the wildcard in the e g root fi eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
Auxiliary class for date and time.
Definition: JDateAndTime.hh:78
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:79
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:837
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:360
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JMessageScheduler logErrorOvercomplete
System auxiliaries.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
std::set< int > modules
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:60
Data structure for input to trigger algorithm.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:66
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
Definition: JVectorize.hh:218
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:801
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:397
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:442
Auxiliary data structure for result of checksum.
Definition: JChecksum.hh:88
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:761
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:362
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:366
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:823
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:463
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.
int size() const
Get size.
Definition: JByteArrayIO.hh:57