Jpp  17.2.1-pre0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "Jeep/JParser.hh"
14 #include "Jeep/JProperties.hh"
15 #include "Jeep/JTimer.hh"
16 #include "Jeep/JTimekeeper.hh"
17 #include "Jeep/JMessage.hh"
18 #include "Jeep/JPrint.hh"
19 #include "Jeep/JeepToolkit.hh"
26 #include "JDAQ/JDAQTags.hh"
27 #include "JDAQ/JDAQEventIO.hh"
28 #include "JDAQ/JDAQTimesliceIO.hh"
30 #include "JDetector/JDetector.hh"
31 #include "JTrigger/JHit.hh"
32 #include "JTrigger/JHitToolkit.hh"
35 #include "JTrigger/JTimeslice.hh"
36 #include "JTrigger/JHitL0.hh"
37 #include "JTrigger/JHitL1.hh"
38 #include "JTrigger/JBuildL1.hh"
39 #include "JTrigger/JBuildL2.hh"
43 #include "JTrigger/JTriggerNB.hh"
44 #include "JTrigger/JTriggerBits.hh"
48 #include "JTrigger/JTimesliceL1.hh"
51 #include "JTrigger/JChecksum.hh"
54 #include "JNet/JControlHost.hh"
56 #include "JNet/JSocket.hh"
57 #include "JNet/JSocketChannel.hh"
58 #include "JNet/JServerSocket.hh"
59 #include "JPhysics/JConstants.hh"
60 #include "JTools/JQuantile.hh"
61 #include "JSupport/JSupport.hh"
63 #include "JSupport/JMeta.hh"
64 #include "JSystem/JStat.hh"
65 #include "JSystem/JTime.hh"
67 #include "JSystem/JNetwork.hh"
68 
69 
70 namespace JNET {
71 
72  /**
73  * Get size of packeet.
74  *
75  * \param preamble DAQ data preamble
76  * \return size [B]
77  */
78  template<>
80  {
81  return preamble.getLength();
82  }
83 }
84 
85 namespace KM3NETDAQ {
86 
87  using namespace JPP;
88 
89  /**
90  * Main class for real-time filtering of data.
91  *
92  * This class implements the action methods for each transition of the state machine.\n
93  * When the state machine is entered, data are continually collected using
94  * custom implementation of virtual methods
95  * setSelect and
96  * actionSelect.
97  *
98  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
99  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
100  * When a JDAQTimeslice is complete,
101  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
102  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
103  * which, together with other parameters, is parsed in method actionConfigure.\n
104  * <pre>
105  * numberOfFramesPerSlice = frames_per_slice = 1;
106  * </pre>
107  * Note that this parameter can change during operation (see below).
108  *
109  * A timeout may occur when the total amount of data or the number of incomplete time slices
110  * in the queue exceeds one of the corresponding pre-set limits:\n
111  * <pre>
112  * queueSize = <maximal queue size [B]>;
113  * queueDepth = <maximal queue depth>;
114  * </pre>
115  * Note that these values apply per JDataFilter.\n
116  *
117  * When a timeout occurs,
118  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
119  * When data are received with a frame index below the current frame index,
120  * the parameter <tt>frames_per_slice</tt> is incremented by one,
121  * up to the original value set at actionConfigure.\n
122  *
123  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
124  * The following parameters can be used to configure the circular buffer.\n
125  * <pre>
126  * path = <write directory for temporary circular buffer>;
127  * archive = <write directory for archival of circular buffer>;
128  * c_sizeL0 = <L0 buffer size>;
129  * c_sizeL1 = <L1 buffer size>;
130  * c_sizeL2 = <L2 buffer size>;
131  * c_sizeSN = <SN buffer size>;
132  * </pre>
133  *
134  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
135  * - There will always be a file created which is deleted when the state machine is exited;
136  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
137  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
138  * - After archival of the temporary file, a new temporary file will be opened;
139  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
140  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
141  *
142  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
143  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
144  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
145  * (e.g. when there is more than one alert on a given day).
146  *
147  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
148  *
149  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
150  */
151  class JDataFilter :
152  public JDAQClient
153  {
154  public:
155 
158 
159  typedef double hit_type;
165 
166 
167  /**
168  * Circular buffer.
169  */
171  public JTreeRecorder<JDAQTimesliceTypes_t>
172  {
175 
176  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
177 
178 
179  /**
180  * Constructor.
181  *
182  * \param path directory for temporary storage
183  * \param archive directory for permanent archival
184  * \param tag tag
185  */
187  const std::string& archive,
188  const JTag& tag) :
189  path (path),
190  archive(archive),
191  tag (tag)
192  {
193  disable();
194  }
195 
196 
197  /**
198  * Open file.
199  *
200  * If file with same name exists, remove it beforehand.
201  */
202  void open()
203  {
204  using namespace std;
205  using namespace JPP;
206 
207  gErrorIgnoreLevel = kFatal;
208 
209  std::ostringstream os;
210 
211  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
212 
213  if (getFileStatus(os.str().c_str())) {
214  std::remove(os.str().c_str());
215  }
216 
217  this->open(os.str().c_str());
218  }
219 
220 
221  /**
222  * Close file.
223  *
224  * If option is true, archive file; else delete file.
225  *
226  * \param option option
227  */
228  void close(const bool option)
229  {
230  using namespace std;
231  using namespace JPP;
232 
233  const JDateAndTime cal;
234 
235  if (this->is_open()) {
236 
237  const string file_name = this->getFile()->GetName();
238 
239  this->close();
240 
241  if (option) {
242 
243  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
244 
245  ostringstream os;
246 
247  os << getFullPath(this->archive)
248  << "KM3NeT"
249  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
250  << "_" << this->tag;
251 
252  if (i != 0) {
253  os << "_" << i;
254  }
255 
256  os << ".root";
257 
258  if (!getFileStatus(os.str().c_str())) {
259 
260  if (std::rename(file_name.c_str(), os.str().c_str()) == 0)
261  return;
262  else
263  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
264  }
265  }
266 
267  } else {
268 
269  std::remove(file_name.c_str());
270  }
271  }
272  }
273 
274 
275  /**
276  * Disable writing.
277  */
278  void disable()
279  {
280  sizeL0 = 0;
281  sizeL1 = 0;
282  sizeL2 = 0;
283  sizeSN = 0;
284  }
285 
286 
287  /**
288  * Check whether writing of data is enabled.
289  *
290  * \return true if writing enabled; else false
291  */
292  bool is_enabled() const
293  {
294  return (sizeL0 > 0 ||
295  sizeL1 > 0 ||
296  sizeL2 > 0 ||
297  sizeSN > 0);
298  }
299 
300 
301  /**
302  * Write circular buffer to output stream.
303  *
304  * \param out output stream
305  * \param object circular buffer
306  * \return output stream
307  */
308  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
309  {
310  if (object.is_open())
311  out << object.getFile()->GetName();
312  else
313  out << "void";
314 
315  out << ' ';
316 
317  out << object.sizeL0 << '/'
318  << object.sizeL1 << '/'
319  << object.sizeL2 << '/'
320  << object.sizeSN << '/';
321 
322  return out;
323  }
324 
325  Long64_t sizeL0; //!< Number of L0 time slices
326  Long64_t sizeL1; //!< Number of L1 time slices
327  Long64_t sizeL2; //!< Number of L2 time slices
328  Long64_t sizeSN; //!< Number of SN time slices
329 
330  std::string path; //!< Directory for temporary storage
331  std::string archive; //!< Directory for permanent archival
332  JTag tag; //!< Unique tag of this process
333  };
334 
335 
336  /**
337  * Sort DAQ process by index.
338  *
339  * \param first first DAQ process
340  * \param second second DAQ process
341  * \return true if index of first DAQ process less than that of second; else false
342  */
343  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
344  {
345  return first.index < second.index;
346  }
347 
348 
349  /**
350  * Constructor.
351  *
352  * \param name name of client
353  * \param server name of command message server
354  * \param hostname name of data server
355  * \param logger pointer to logger
356  * \param level debug level
357  * \param port server port
358  * \param backlog server backlog
359  * \param buffer_size server buffer
360  * \param path directory for temporary storage
361  * \param archive directory for parmanent archival
362  */
364  const std::string& server,
365  const std::string& hostname,
366  JLogger* logger,
367  const int level,
368  const int port,
369  const int backlog,
370  const int buffer_size,
371  const std::string& path,
372  const std::string& archive) :
373  JDAQClient (name,server,logger,level),
374  hostname (hostname),
375  port (port),
376  backlog (backlog),
377  buffer_size(buffer_size),
378  c_buffer (path, archive, getUniqueTag())
379  {
380  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
381 
382  addSubscription(JSubscriptionAll(RC_ALERT));
383 
384  totalCPURAM = getRAM();
385  current_slice_index = -1;
386  reporting = false;
387  }
388 
389 
390  virtual void actionEnter() override
391  {}
392 
393 
394  virtual void actionExit() override
395  {
396  if (c_buffer.is_open()) {
397 
398  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
399 
400  c_buffer.close(false);
401  }
402 
403  datawriter.reset();
404  }
405 
406 
407  virtual void actionInit(int length, const char* buffer) override
408  {
409  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
410 
411  try {
412 
413  JDebugStream(logger) << "Start server.";
414 
415  if (serversocket.is_valid()) {
416  serversocket->shutdown();
417  }
418 
419  serversocket.reset(new JServerSocket(port,backlog));
420  }
421  catch(const std::exception& error) {
422  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
423  ev_error();
424  }
425  }
426 
427 
428  virtual void actionConfigure(int length, const char* buffer) override
429  {
430  using namespace std;
431 
432  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
433 
434  long long int update_s = 10;
435  long long int logger_s = 5;
436 
437  parameters .reset();
438  dataFilters.clear();
439  dataQueues .clear();
440 
441  reporting = false;
442  dumpCount = 0;
443  dumpLimit = numeric_limits<int>::max();
444 
445  detector.clear();
446 
447  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
448 
449  properties["dataWriter"] = hostname;
450  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
451  properties["detector"] = detector;
452  properties["triggerParameters"] = parameters;
453  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
454  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
455  properties["frameIndex"] = maximal_frame_index = 100000;
456  properties["logger_s"] = logger_s;
457  properties["update_s"] = update_s;
458  properties["JDataFilter"] = dataFilters;
459  properties["DataQueue"] = dataQueues;
460  properties["path"] = c_buffer.path;
461  properties["archive"] = c_buffer.archive;
462  properties["c_sizeL0"] = c_buffer.sizeL0;
463  properties["c_sizeL1"] = c_buffer.sizeL1;
464  properties["c_sizeL2"] = c_buffer.sizeL2;
465  properties["c_sizeSN"] = c_buffer.sizeSN;
466  properties["dumpLimit"] = dumpLimit;
467 
468  try {
469  properties.read(string(buffer, length));
470  }
471  catch(const exception& error) {
472  JErrorStream(logger) << error.what();
473  }
474 
475  if (update_s <= 0) { update_s = 1; }
476  if (logger_s <= 0) { logger_s = 1; }
477 
478  setClockInterval(update_s * 1000000LL);
479 
480  hostname = trim(hostname);
481 
482  if (hostname != "")
483  datawriter.reset(new JControlHost_t(hostname));
484  else
485  throw JException("Undefined data writer host name.");
486 
487  maximum_frames_per_slice = frames_per_slice;
488 
489  // process processlist
490 
491  if (dataFilters.empty()) {
492  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
493  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
494  }
495 
496  sort(dataFilters.begin(), dataFilters.end(), compare);
497 
498  unsigned int numberOfDataFiltersOnThisMachine = 0;
499  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
500 
502 
503  {
504  JNoticeStream notice(logger);
505 
506  notice << "My IP addresses:";
507 
508  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
509  notice << ' ' << *i;
510  }
511  }
512 
513  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
514 
515  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
516 
517  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
518 
519  numberOfDataFiltersOnThisMachine++;
520 
521  if (i->port == this->port) {
522  thisProcess = i;
523  }
524  }
525  }
526 
527  if (numberOfDataFiltersOnThisMachine == 0) {
528  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
529  << "Assuming one datafilter on this machine.";
530  numberOfDataFiltersOnThisMachine = 1;
531  }
532 
533  if (thisProcess == dataFilters.end()) {
534  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
535  }
536 
537  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
538  JErrorStream(logger) << "Mismatch between given process names: "
539  << "I am called " << getName()
540  << ", but in the process list I am referred to as " << thisProcess->index;
541  }
542 
543  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
544  reporting = true;
545  }
546 
547  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
548 
549  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
550 
551  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
552  << "Queue size reduced to "
553  << maxQueueSize << " bytes." ;
554  }
555 
556  // detector
557 
558  if (parameters.disableHighRateVeto) {
559 
560  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
561 
562  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
563  }
564 
565  // trigger parameters
566 
568 
569  triggerNB .reset(new JTriggerNB (parameters));
570  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
571  trigger3DShower.reset(new JTrigger3DShower(parameters));
572  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
573 
574  moduleRouter.reset(new JModuleRouter(detector));
575 
576  if (reporting) {
577  JNoticeStream(logger) << "This data filter process will report.";
578  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
579  JDebugStream (logger) << "Trigger parameters: " << parameters;
580  JDebugStream (logger) << "Detector description: " << endl << detector;
581  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
582  }
583 
584  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
585 
586  // set L1, L2 and SN builders
587 
588  buildL1.reset(new JBuildL1_t(parameters));
589  buildL2.reset(new JBuildL2_t(parameters.L2));
590  buildSN.reset(new JBuildL2_t(parameters.SN));
591  buildNB.reset(new JBuildL2_t(parameters.NB));
592 
593  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
594  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
595  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
596  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
597 
598  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
599  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
600  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
601  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
602 
603  if (c_buffer.is_enabled()) {
604 
605  if (!c_buffer.is_open()) {
606 
607  c_buffer.open();
608 
609  if (c_buffer.is_open()) {
610 
611  putObject(c_buffer.getFile(), meta);
612 
613  JStatusStream(logger) << "Created circular buffer " << c_buffer;
614 
615  } else {
616 
617  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
618  }
619 
620  } else {
621 
622  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
623  }
624  }
625 
626  if (c_buffer.is_open()) {
627  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
628  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
629  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
630  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
631  } else {
632  c_buffer.disable();
633  }
634  }
635 
636 
637  virtual void actionStart(int length, const char* buffer) override
638  {
639  using namespace std;
640 
641  if (reporting) {
642  JNoticeStream(logger) << "Start run " << getRunNumber();
643  }
644 
645  timeslices.clear();
646 
647  current_slice_index = -1;
648  queueSize = 0;
649 
650  numberOfEvents = 0;
651  numberOfBytes = 0;
652  numberOfTimeslicesProcessed = 0;
653  numberOfIncompleteTimeslicesProcessed = 0;
654 
655  number_of_packets_received = 0;
656  number_of_packets_discarded = 0;
657  number_of_bytes_received = 0;
658  number_of_reads = 0;
659 
660  minFrameNumber = numeric_limits<int>::max();
661  maxFrameNumber = numeric_limits<int>::min();
662 
663  // Reset global trigger counter.
664 
666 
667  logErrorRun .reset();
668  logErrorDetector .reset();
669  logErrorIndex .reset();
670  logErrorIncomplete.reset();
671 
672  timer.reset();
673  timer.start();
674 
675  Qt.reset();
676 
677  // send trigger parameters to the datawriter
678 
679  ostringstream os;
680 
681  os << getRunNumber() << ' ' << parameters;
682 
683  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
684  }
685 
686 
687  virtual void actionPause(int length, const char* buffer) override
688  {
689  using namespace std;
690 
691  if (!timeslices.empty()) {
692 
693  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
694 
695  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
696  queueSize -= getSizeof(*i);
697  }
698 
699  timeslices.clear();
700  }
701 
702  { // force clearance of memory
703 
704  deque<JDAQTimesliceL0> buffer;
705 
706  timeslices.swap(buffer);
707  }
708 
709  if (queueSize != 0) {
710  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
711  }
712 
713  current_slice_index = -1;
714  queueSize = 0;
715 
716  timer.stop();
717  }
718 
719 
720  virtual void actionContinue(int length, const char* buffer) override
721  {
722  timer.start();
723  }
724 
725 
726  virtual void actionStop(int length, const char* buffer) override
727  {
728  typeout();
729 
730  datawriter.reset();
731  }
732 
733 
734  virtual void actionReset(int length, const char* buffer) override
735  {
736  if (serversocket.is_valid()) {
737  serversocket->shutdown();
738  }
739 
740  serversocket.reset();
741  }
742 
743 
744  virtual void actionQuit(int length, const char* buffer) override
745  {
746  datawriter.reset();
747  }
748 
749 
750  virtual void setSelect(JFileDescriptorMask& mask) const override
751  {
752  if (serversocket.is_valid()) {
753  mask.set(*serversocket);
754  }
755 
756  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
757  if (!channel->isReady()) {
758  mask.set(channel->getFileDescriptor());
759  }
760  }
761  }
762 
763 
764  virtual void actionSelect(const JFileDescriptorMask& mask) override
765  {
766  using namespace std;
767 
768  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
769 
770  try {
771 
772  if (mask.has(channel->getFileDescriptor())) {
773  channel->read();
774  }
775 
776  if (channel->isReady()) {
777 
778  number_of_packets_received += 1;
779  number_of_reads += channel->getCounter();
780  number_of_bytes_received += channel->size();
781 
782  if (isRunning()) {
783 
784  updateFrameQueue(channel);
785 
786  } else {
787 
788  JErrorStream(logErrorRun) << "Receiving data while not running.";
789 
790  number_of_packets_discarded += 1;
791  }
792 
793  channel->reset();
794  }
795 
796  ++channel;
797  }
798  catch(const exception& error) {
799 
800  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
801 
802  channel->shutdown();
803 
804  channel = channelList.erase(channel);
805  }
806  }
807 
808 
809  if (serversocket.is_valid()) {
810 
811  if (mask.has(*serversocket)) {
812 
813  JSocket socket;
814 
815  socket.accept(serversocket->getFileDescriptor());
816 
817  //socket.setSendBufferSize (buffer_size);
819 
820  socket.setKeepAlive (true);
821  socket.setNonBlocking(true);
822 
823  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
824 
825  channelList.push_back(JSocketInputChannel_t(socket));
826  }
827  }
828 
829 
830  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) ||
831  (timeslices.size() >= 2u &&
832  timeslices[1].size() >= frames_per_slice) ||
833  (timeslices.size() >= maxQueueDepth) ||
834  (queueSize >= maxQueueSize))) {
835 
836  const JDAQTimesliceL0& pending_slice = timeslices.front();
837  queueSize -= getSizeof(pending_slice);
838 
839  current_slice_index = pending_slice.getFrameIndex();
840  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
841  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
842 
843  if (pending_slice.size() > frames_per_slice) {
844 
845  JErrorStream(logger) << "More frames in timeslice than expected "
846  << pending_slice.size() << " > " << frames_per_slice;
847 
848  if (pending_slice.size() <= maximum_frames_per_slice) {
849 
850  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
851 
852  frames_per_slice = pending_slice.size();
853  }
854  }
855 
856  if (!pending_slice.empty()) {
857 
858  const localtime_t t0 = getLocalTime();
859 
860  processTimeSlice(pending_slice);
861 
862  const localtime_t t1 = getLocalTime();
863 
864  numberOfTimeslicesProcessed += 1;
865 
866  Qt.put(t1 - t0);
867 
868  if (pending_slice.size() < frames_per_slice) {
869 
870  numberOfIncompleteTimeslicesProcessed += 1;
871 
872  ostringstream error;
873 
874  error << "Timeout -> processed incomplete timeslice: "
875  << "Frame index = " << pending_slice.getFrameIndex() << ';'
876  << "Size of timeslice = " << pending_slice.size() << ';'
877  << "Queue depth = " << timeslices.size() << ';'
878  << "Queue size = " << queueSize << ';';
879 
880  if ((timeslices.size() >= 2u &&
881  timeslices[1].size() >= frames_per_slice)) {
882 
883  error << " intermittent problem -> continues as-is";
884 
885  } else {
886 
887  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
888 
889  frames_per_slice = pending_slice.size();
890  }
891 
892  JErrorStream(logErrorIncomplete) << error.str();
893  }
894  }
895 
896  timeslices.pop_front();
897  }
898  }
899 
900 
901  virtual void actionRunning() override
902  {
903  if (reporting) {
904  typeout();
905  }
906  }
907 
908 
909  /**
910  * Update queue with data frames.
911  *
912  * Note that any discarded data will be reported.
913  *
914  * \param channel incoming data channel
915  */
916  void updateFrameQueue(const JChannelList_t::const_iterator channel)
917  {
918  using namespace std;
919 
920  JByteArrayReader in(channel->data(), channel->size());
921 
922  JDAQPreamble preamble;
923  JDAQSuperFrameHeader header;
924 
925  in >> preamble;
926  in >> header;
927 
928  if (preamble.getLength() != channel->size()) {
929 
930  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
931  << "preamble.getLength() = " << preamble.getLength() << ';'
932  << "channel->size(): " << channel->size() << ';';
933 
934  number_of_packets_discarded += 1;
935 
936  return;
937  }
938 
939  if (header.getRunNumber() != getRunNumber()) {
940 
941  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
942  << " != " << getRunNumber()
943  << " -> Dropping frame.";
944 
945  number_of_packets_discarded += 1;
946 
947  return;
948  }
949 
950  if (header.getFrameIndex() <= current_slice_index) {
951 
952  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
953  << " module " << header.getModuleID()
954  << " -> Dropping frame.";
955 
956  number_of_packets_discarded += 1;
957 
958  if (frames_per_slice < maximum_frames_per_slice) {
959 
960  frames_per_slice++;
961 
962  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
963  }
964 
965  return;
966  }
967 
968  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
969 
970  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
971  << " module " << header.getModuleID()
972  << " -> Dropping frame.";
973 
974  number_of_packets_discarded += 1;
975 
976  return;
977  }
978 
979  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
980 
981  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
982  ++timesliceIterator;
983  }
984 
985  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
986 
987  // The corresponding time slice already exists
988 
989  } else {
990 
991  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
992 
993  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
994 
995  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
996 
997  queueSize += getSizeof(*timesliceIterator);
998  }
999 
1000  timesliceIterator->push_back(JDAQSuperFrame(header));
1001 
1002  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1003 
1004  queueSize += getSizeof(*timesliceIterator->rbegin());
1005  }
1006 
1007 
1008  /**
1009  * Process time slice.
1010  *
1011  * \param timeslice time slice
1012  */
1013  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1014  {
1015  using namespace std;
1016 
1017  try {
1018 
1019  timesliceRouter->configure(timeslice);
1020 
1021  if (parameters.writeSummary()) {
1022  this->put(JDAQSummaryslice(timeslice));
1023  }
1024 
1025  if (parameters.trigger3DMuon.enabled ||
1026  parameters.trigger3DShower.enabled ||
1027  parameters.triggerMXShower.enabled ||
1028  parameters.triggerNB.enabled ||
1029  parameters.writeL0.prescale ||
1030  parameters.writeL1.prescale ||
1031  parameters.writeL2.prescale ||
1032  parameters.writeSN.prescale ||
1033  c_buffer.is_enabled()) {
1034 
1035  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1036  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1037  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1038  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1039  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1040  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1041 
1042  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1043 
1044  if (moduleRouter->hasModule(frame->getModuleID())) {
1045 
1046  if (!checksum(*frame)) {
1047 
1048  JErrorStream(logger) << "Invalid data at "
1049  << "run = " << timeslice.getRunNumber() << ";"
1050  << "frame index = " << timeslice.getFrameIndex() << ";"
1051  << "module = " << frame->getModuleID() << ";"
1052  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1053 
1054  timesliceTX.push_back(*frame);
1055 
1056  continue;
1057  }
1058 
1059  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1060  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1061 
1062  // Apply high-rate veto
1063 
1064  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1065 
1066  // L0
1067 
1068  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1069 
1070  // Nano-beacon trigger
1071 
1072  if (parameters.triggerNB.enabled) {
1073 
1074  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1075 
1076  if (buffer.begin() != __end) {
1077 
1078  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1079  frame->getModuleIdentifier(),
1080  module.getPosition()));
1081 
1082  JSuperFrame1D_t zbuf;
1083 
1084  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1085 
1086  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1087  }
1088  }
1089 
1090  // L1
1091 
1092  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1093  frame->getModuleIdentifier(),
1094  module.getPosition()));
1095 
1096  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1097 
1098  // L2
1099 
1100  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1101  frame->getModuleIdentifier(),
1102  module.getPosition()));
1103 
1104  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1105 
1106  // SN
1107 
1108  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1109  frame->getModuleIdentifier(),
1110  module.getPosition()));
1111 
1112  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1113 
1114  } else {
1115 
1116  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1117  }
1118  }
1119 
1120  if (!timesliceTX.empty()) {
1121 
1122  if (dumpCount < dumpLimit) {
1123 
1124  this->put(timesliceTX);
1125 
1126  dumpCount += 1;
1127  }
1128  }
1129 
1130  // Trigger
1131 
1132  if (parameters.triggerNB.enabled) {
1133 
1134  const JTriggerInput trigger_input(timesliceNB);
1135 
1136  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1137 
1138  if (parameters.triggerNB.write()) {
1139 
1140  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1141  getTriggerMask(triggerNB->getTriggerBit()),
1142  *hit,
1143  *timesliceRouter,
1144  *moduleRouter,
1145  parameters.TMaxLocal_ns,
1146  parameters.triggerNB.DMax_m,
1147  getTimeRange(parameters.triggerNB));
1148 
1149  this->put(tev);
1150  }
1151  }
1152  }
1153 
1154  JTriggerInput trigger_input(timesliceL2);
1155  JTriggerOutput trigger_output;
1156 
1157  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1158  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1159  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1160 
1161  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1162 
1163  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1164 
1165  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1166 
1167  this->put(object);
1168 
1169  numberOfEvents += 1;
1170  }
1171 
1172  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1173 
1174  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1175 
1176  if (parameters.writeL1) { this->put(object); }
1177  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1178  }
1179 
1180  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1181 
1182  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1183 
1184  if (parameters.writeL2) { this->put(object); }
1185  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1186  }
1187 
1188  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1189 
1190  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1191 
1192  if (parameters.writeSN) { this->put(object); }
1193  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1194  }
1195 
1196  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1197 
1198  if (parameters.writeL0) { this->put(timeslice); }
1199  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1200  }
1201  }
1202 
1203  } catch(const exception& error) {
1204 
1205  JErrorStream(logger) << "Error = " << error.what() << ";"
1206  << "run = " << timeslice.getRunNumber() << ";"
1207  << "frame index = " << timeslice.getFrameIndex() << ";"
1208  << "time slice not correctly processed;"
1209  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1210 
1211  if (dumpCount < dumpLimit) {
1212 
1213  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1214 
1215  dumpCount += 1;
1216  }
1217  }
1218 
1219  timesliceRouter->reset();
1220  }
1221 
1222 
1223  /**
1224  * Report status to message logger.
1225  */
1226  void typeout()
1227  {
1228  timer.stop();
1229 
1230  const double T_us = (double) timer.usec_wall;
1231 
1232  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1233  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1234  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1235  try {
1236  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1237  }
1238  catch(const std::exception&) {}
1239  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1240  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1241 
1242  if (number_of_packets_received > 0) {
1243  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1244  }
1245 
1246  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1247  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1248 
1249  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1250 
1251  if (numberOfTimeslicesProcessed > 0) {
1252  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1253  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1254  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1255  }
1256 
1257  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1258  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1259 
1260  if (processedSlicesTime_us > 0) {
1261  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1262  }
1263  if (processedDetectorTime_us > 0) {
1264  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1265  }
1266 
1267  timer.start();
1268  }
1269 
1270 
1271  /**
1272  * Tagged action to handle alerts.
1273  *
1274  * \param tag tag
1275  * \param length number of characters
1276  * \param buffer message
1277  */
1278  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1279  {
1280  using namespace std;
1281 
1282  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1283 
1284  if (tag == RC_ALERT) {
1285 
1286  if (c_buffer.is_open()) {
1287 
1288  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1289 
1290  c_buffer.close(true);
1291  }
1292 
1293  if (c_buffer.is_enabled()) {
1294 
1295  c_buffer.open();
1296 
1297  if (c_buffer.is_open()) {
1298 
1299  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1300 
1301  putObject(c_buffer.getFile(), meta);
1302 
1303  } else {
1304 
1305  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1306 
1307  c_buffer.disable();
1308  }
1309  }
1310 
1311  } else {
1312 
1313  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1314  }
1315  }
1316 
1317  JMeta meta; //!< meta data
1318 
1319  private:
1320 
1322  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1323  std::string hostname; //!< host name of data server
1324 
1325  /**
1326  * Auxiliary method to send object to data server.
1327  *
1328  * \param object object to be sent
1329  */
1330  template<class T>
1331  void put(const T& object)
1332  {
1333  try {
1334 
1335  datawriter->put(object);
1336 
1337  numberOfBytes += getSizeof(object);
1338  }
1339  catch(const std::exception& error) {
1340  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1341  ev_error();
1342  }
1343  }
1344 
1345 
1346  int port; //!< server socket port
1347  int backlog;
1349 
1350  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1351  JChannelList_t channelList; //!< connections to data queue
1352 
1353  JTimer timer;
1355 
1356  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1358  unsigned int frames_per_slice;
1361 
1362  // trigger
1363 
1366 
1373 
1378 
1383 
1386 
1387  // process management
1388 
1391 
1392  // memory management
1393 
1394  long long int totalCPURAM;
1395  unsigned int maxQueueDepth;
1396  long long int maxQueueSize;
1397  long long int queueSize;
1398 
1399  // statistics
1400 
1402 
1403  long long int numberOfEvents;
1404  long long int numberOfBytes;
1407 
1410 
1411  // temporary
1412 
1415  long long int number_of_reads;
1417 
1418  // circular buffer
1419 
1421  };
1422 }
1423 
1424 /**
1425  * \file
1426  *
1427  * Application for real-time filtering of data.
1428  * For more information, see KM3NETDAQ::JDataFilter.
1429  *
1430  * \author rbruijn and mdejong
1431  */
1432 int main(int argc, char* argv[])
1433 {
1434  using namespace std;
1435  using namespace JPP;
1436  using namespace KM3NETDAQ;
1437 
1438  string server;
1439  string logger;
1440  string hostname;
1441  string client_name;
1442  int port;
1443  int backlog;
1444  int buffer_size;
1445  bool use_cout;
1446  string path;
1447  string archive;
1448  int debug;
1449 
1450 
1451  try {
1452 
1453  JParser<> zap("Application for real-time filtering of data.");
1454 
1455  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1456  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1457  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1458  zap['u'] = make_field(client_name, "client name") = "%";
1459  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1460  zap['q'] = make_field(backlog, "back log") = 1024;
1461  zap['s'] = make_field(buffer_size, "TCP buffer size") = 32 * MEGABYTE; // TCP buffer of 32 MB
1462  zap['c'] = make_field(use_cout, "print to terminal");
1463  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1464  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1465  zap['d'] = make_field(debug, "debug level") = 0;
1466 
1467  zap(argc, argv);
1468  }
1469  catch(const exception& error) {
1470  FATAL(error.what() << endl);
1471  }
1472 
1473 
1474  JLogger* out = NULL;
1475 
1476  if (use_cout)
1477  out = new JStreamLogger(cout);
1478  else
1479  out = new JControlHostLogger(logger);
1480 
1481  JDataFilter dfilter(client_name,
1482  server,
1483  hostname,
1484  out,
1485  debug,
1486  port,
1487  backlog,
1488  buffer_size,
1489  path,
1490  archive);
1491 
1492  dfilter.meta = JMeta(argc, argv);
1493 
1494  dfilter.enter();
1495  dfilter.run();
1496 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:901
Exception for opening of file.
Definition: JException.hh:342
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:343
Utility class to parse command line options.
Definition: JParser.hh:1517
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:79
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:228
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:164
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:161
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:156
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:326
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:696
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:43
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:151
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:662
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:186
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:330
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:163
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:292
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:160
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:162
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:373
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:720
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:157
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:308
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1993
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
then awk string
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
static struct JACOUSTICS::@4 compare
Auxiliary data structure to sort transmissions.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:916
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:390
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:332
Level specific message streamers.
long long int number_of_packets_received
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:744
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DOM<$WORKDIR/ev_configure_domsimulator.txt > RC_DWRT path
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:328
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:251
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:637
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:734
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:394
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:273
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:764
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:325
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:110
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:364
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:363
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:152
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:726
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
double u[N+1]
Definition: JPolint.hh:776
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:407
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:687
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:327
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:331
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:750
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:428
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.