Jpp  17.2.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "Jeep/JParser.hh"
14 #include "Jeep/JProperties.hh"
15 #include "Jeep/JTimer.hh"
16 #include "Jeep/JTimekeeper.hh"
17 #include "Jeep/JMessage.hh"
18 #include "Jeep/JPrint.hh"
19 #include "Jeep/JeepToolkit.hh"
26 #include "JDAQ/JDAQTags.hh"
27 #include "JDAQ/JDAQEventIO.hh"
28 #include "JDAQ/JDAQTimesliceIO.hh"
30 #include "JDetector/JDetector.hh"
31 #include "JTrigger/JHit.hh"
32 #include "JTrigger/JHitToolkit.hh"
35 #include "JTrigger/JTimeslice.hh"
36 #include "JTrigger/JHitL0.hh"
37 #include "JTrigger/JHitL1.hh"
38 #include "JTrigger/JBuildL1.hh"
39 #include "JTrigger/JBuildL2.hh"
43 #include "JTrigger/JTriggerNB.hh"
44 #include "JTrigger/JTriggerBits.hh"
48 #include "JTrigger/JTimesliceL1.hh"
51 #include "JTrigger/JChecksum.hh"
54 #include "JNet/JControlHost.hh"
56 #include "JNet/JSocket.hh"
57 #include "JNet/JSocketChannel.hh"
58 #include "JNet/JServerSocket.hh"
59 #include "JPhysics/JConstants.hh"
60 #include "JTools/JQuantile.hh"
61 #include "JSupport/JSupport.hh"
63 #include "JSupport/JMeta.hh"
64 #include "JSystem/JStat.hh"
65 #include "JSystem/JTime.hh"
67 #include "JSystem/JNetwork.hh"
68 
69 
70 namespace JNET {
71 
72  /**
73  * Get size of packeet.
74  *
75  * \param preamble DAQ data preamble
76  * \return size [B]
77  */
78  template<>
80  {
81  return preamble.getLength();
82  }
83 }
84 
85 namespace KM3NETDAQ {
86 
87  using namespace JPP;
88 
89  /**
90  * Main class for real-time filtering of data.
91  *
92  * This class implements the action methods for each transition of the state machine.\n
93  * When the state machine is entered, data are continually collected using
94  * custom implementation of virtual methods
95  * setSelect and
96  * actionSelect.
97  *
98  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
99  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
100  * When a JDAQTimeslice is complete,
101  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
102  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
103  * which, together with other parameters, is parsed in method actionConfigure.\n
104  * <pre>
105  * numberOfFramesPerSlice = frames_per_slice = 1;
106  * </pre>
107  * Note that this parameter can change during operation (see below).
108  *
109  * A timeout may occur when the total amount of data or the number of incomplete time slices
110  * in the queue exceeds one of the corresponding pre-set limits:\n
111  * <pre>
112  * queueSize = <maximal queue size [B]>;
113  * queueDepth = <maximal queue depth>;
114  * </pre>
115  * Note that these values apply per JDataFilter.\n
116  *
117  * When a timeout occurs,
118  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
119  * When data are received with a frame index below the current frame index,
120  * the parameter <tt>frames_per_slice</tt> is incremented by one,
121  * up to the original value set at actionConfigure.\n
122  *
123  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
124  * The following parameters can be used to configure the circular buffer.\n
125  * <pre>
126  * path = <write directory for temporary circular buffer>;
127  * archive = <write directory for archival of circular buffer>;
128  * c_sizeL0 = <L0 buffer size>;
129  * c_sizeL1 = <L1 buffer size>;
130  * c_sizeL2 = <L2 buffer size>;
131  * c_sizeSN = <SN buffer size>;
132  * </pre>
133  *
134  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
135  * - There will always be a file created which is deleted when the state machine is exited;
136  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
137  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
138  * - After archival of the temporary file, a new temporary file will be opened;
139  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
140  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
141  *
142  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
143  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
144  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
145  * (e.g. when there is more than one alert on a given day).
146  *
147  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
148  *
149  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
150  */
151  class JDataFilter :
152  public JDAQClient
153  {
154  public:
155 
158 
159  typedef double hit_type;
165 
166 
167  /**
168  * Circular buffer.
169  */
171  public JTreeRecorder<JDAQTimesliceTypes_t>
172  {
175 
176  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
177 
178 
179  /**
180  * Constructor.
181  *
182  * \param path directory for temporary storage
183  * \param archive directory for permanent archival
184  * \param tag tag
185  */
186  JCircularBuffer_t(const std::string& path,
187  const std::string& archive,
188  const JTag& tag) :
189  path (path),
190  archive(archive),
191  tag (tag)
192  {
193  disable();
194  }
195 
196 
197  /**
198  * Open file.
199  *
200  * If file with same name exists, remove it beforehand.
201  */
202  void open()
203  {
204  using namespace std;
205  using namespace JPP;
206 
207  gErrorIgnoreLevel = kFatal;
208 
209  std::ostringstream os;
210 
211  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
212 
213  if (getFileStatus(os.str().c_str())) {
214  std::remove(os.str().c_str());
215  }
216 
217  this->open(os.str().c_str());
218  }
219 
220 
221  /**
222  * Close file.
223  *
224  * If option is true, archive file; else delete file.
225  *
226  * \param option option
227  */
228  void close(const bool option)
229  {
230  using namespace std;
231  using namespace JPP;
232 
233  const JDateAndTime cal;
234 
235  if (this->is_open()) {
236 
237  const string file_name = this->getFile()->GetName();
238 
239  this->close();
240 
241  if (option) {
242 
243  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
244 
245  ostringstream os;
246 
247  os << getFullPath(this->archive)
248  << "KM3NeT"
249  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
250  << "_" << this->tag;
251 
252  if (i != 0) {
253  os << "_" << i;
254  }
255 
256  os << ".root";
257 
258  if (!getFileStatus(os.str().c_str())) {
259 
260  if (std::rename(file_name.c_str(), os.str().c_str()) == 0)
261  return;
262  else
263  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
264  }
265  }
266 
267  } else {
268 
269  std::remove(file_name.c_str());
270  }
271  }
272  }
273 
274 
275  /**
276  * Disable writing.
277  */
278  void disable()
279  {
280  sizeL0 = 0;
281  sizeL1 = 0;
282  sizeL2 = 0;
283  sizeSN = 0;
284  }
285 
286 
287  /**
288  * Check whether writing of data is enabled.
289  *
290  * \return true if writing enabled; else false
291  */
292  bool is_enabled() const
293  {
294  return (sizeL0 > 0 ||
295  sizeL1 > 0 ||
296  sizeL2 > 0 ||
297  sizeSN > 0);
298  }
299 
300 
301  /**
302  * Write circular buffer to output stream.
303  *
304  * \param out output stream
305  * \param object circular buffer
306  * \return output stream
307  */
308  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
309  {
310  if (object.is_open())
311  out << object.getFile()->GetName();
312  else
313  out << "void";
314 
315  out << ' ';
316 
317  out << object.sizeL0 << '/'
318  << object.sizeL1 << '/'
319  << object.sizeL2 << '/'
320  << object.sizeSN << '/';
321 
322  return out;
323  }
324 
325  Long64_t sizeL0; //!< Number of L0 time slices
326  Long64_t sizeL1; //!< Number of L1 time slices
327  Long64_t sizeL2; //!< Number of L2 time slices
328  Long64_t sizeSN; //!< Number of SN time slices
329 
330  std::string path; //!< Directory for temporary storage
331  std::string archive; //!< Directory for permanent archival
332  JTag tag; //!< Unique tag of this process
333  };
334 
335 
336  /**
337  * Sort DAQ process by index.
338  *
339  * \param first first DAQ process
340  * \param second second DAQ process
341  * \return true if index of first DAQ process less than that of second; else false
342  */
343  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
344  {
345  return first.index < second.index;
346  }
347 
348 
349  /**
350  * Constructor.
351  *
352  * \param name name of client
353  * \param server name of command message server
354  * \param hostname name of data server
355  * \param logger pointer to logger
356  * \param level debug level
357  * \param port server port
358  * \param backlog server backlog
359  * \param buffer_size server buffer
360  * \param path directory for temporary storage
361  * \param archive directory for parmanent archival
362  */
363  JDataFilter(const std::string& name,
364  const std::string& server,
365  const std::string& hostname,
366  JLogger* logger,
367  const int level,
368  const int port,
369  const int backlog,
370  const int buffer_size,
371  const std::string& path,
372  const std::string& archive) :
373  JDAQClient (name,server,logger,level),
374  hostname (hostname),
375  port (port),
376  backlog (backlog),
377  buffer_size(buffer_size),
378  c_buffer (path, archive, getUniqueTag())
379  {
380  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
381 
382  addSubscription(JSubscriptionAll(RC_ALERT));
383 
384  totalCPURAM = getRAM();
385  current_slice_index = -1;
386  reporting = false;
387  }
388 
389 
390  virtual void actionEnter() override
391  {}
392 
393 
394  virtual void actionExit() override
395  {
396  if (c_buffer.is_open()) {
397 
398  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
399 
400  c_buffer.close(false);
401  }
402 
403  datawriter.reset();
404  }
405 
406 
407  virtual void actionInit(int length, const char* buffer) override
408  {
409  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
410 
411  try {
412 
413  JDebugStream(logger) << "Start server.";
414 
415  if (serversocket.is_valid()) {
416  serversocket->shutdown();
417  }
418 
419  serversocket.reset(new JServerSocket(port,backlog));
420  }
421  catch(const std::exception& error) {
422  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
423  ev_error();
424  }
425  }
426 
427 
428  virtual void actionConfigure(int length, const char* buffer) override
429  {
430  using namespace std;
431 
432  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
433 
434  long long int update_s = 10;
435  long long int logger_s = 5;
436 
437  parameters .reset();
438  dataFilters.clear();
439  dataQueues .clear();
440 
441  reporting = false;
442  dumpCount = 0;
443  dumpLimit = numeric_limits<int>::max();
444 
445  detector.clear();
446 
447  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
448 
449  properties["dataWriter"] = hostname;
450  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
451  properties["detector"] = detector;
452  properties["triggerParameters"] = parameters;
453  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
454  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
455  properties["frameIndex"] = maximal_frame_index = 100000;
456  properties["logger_s"] = logger_s;
457  properties["update_s"] = update_s;
458  properties["JDataFilter"] = dataFilters;
459  properties["DataQueue"] = dataQueues;
460  properties["path"] = c_buffer.path;
461  properties["archive"] = c_buffer.archive;
462  properties["c_sizeL0"] = c_buffer.sizeL0;
463  properties["c_sizeL1"] = c_buffer.sizeL1;
464  properties["c_sizeL2"] = c_buffer.sizeL2;
465  properties["c_sizeSN"] = c_buffer.sizeSN;
466  properties["dumpLimit"] = dumpLimit;
467 
468  try {
469  properties.read(string(buffer, length));
470  }
471  catch(const exception& error) {
472  JErrorStream(logger) << error.what();
473  }
474 
475  if (update_s <= 0) { update_s = 1; }
476  if (logger_s <= 0) { logger_s = 1; }
477 
478  setClockInterval(update_s * 1000000LL);
479 
480  hostname = trim(hostname);
481 
482  if (hostname != "")
483  datawriter.reset(new JControlHost_t(hostname));
484  else
485  throw JException("Undefined data writer host name.");
486 
487  maximum_frames_per_slice = frames_per_slice;
488 
489  // process processlist
490 
491  if (dataFilters.empty()) {
492  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
493  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
494  }
495 
496  sort(dataFilters.begin(), dataFilters.end(), compare);
497 
498  unsigned int numberOfDataFiltersOnThisMachine = 0;
499  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
500 
502 
503  {
504  JNoticeStream notice(logger);
505 
506  notice << "My IP addresses:";
507 
508  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
509  notice << ' ' << *i;
510  }
511  }
512 
513  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
514 
515  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
516 
517  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
518 
519  numberOfDataFiltersOnThisMachine++;
520 
521  if (i->port == this->port) {
522  thisProcess = i;
523  }
524  }
525  }
526 
527  if (numberOfDataFiltersOnThisMachine == 0) {
528  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
529  << "Assuming one datafilter on this machine.";
530  numberOfDataFiltersOnThisMachine = 1;
531  }
532 
533  if (thisProcess == dataFilters.end()) {
534  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
535  }
536 
537  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
538  JErrorStream(logger) << "Mismatch between given process names: "
539  << "I am called " << getName()
540  << ", but in the process list I am referred to as " << thisProcess->index;
541  }
542 
543  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
544  reporting = true;
545  }
546 
547  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
548 
549  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
550 
551  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
552  << "Queue size reduced to "
553  << maxQueueSize << " bytes." ;
554  }
555 
556  // detector
557 
558  if (parameters.disableHighRateVeto) {
559 
560  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
561 
562  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
563  }
564 
565  // trigger parameters
566 
568 
569  triggerNB .reset(new JTriggerNB (parameters));
570  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
571  trigger3DShower.reset(new JTrigger3DShower(parameters));
572  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
573 
574  moduleRouter.reset(new JModuleRouter(detector));
575 
576  if (reporting) {
577  JNoticeStream(logger) << "This data filter process will report.";
578  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
579  JDebugStream (logger) << "Trigger parameters: " << parameters;
580  JDebugStream (logger) << "Detector description: " << endl << detector;
581  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
582  }
583 
584  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
585 
586  // set L1, L2 and SN builders
587 
588  buildL1.reset(new JBuildL1_t(parameters));
589  buildL2.reset(new JBuildL2_t(parameters.L2));
590  buildSN.reset(new JBuildL2_t(parameters.SN));
591  buildNB.reset(new JBuildL2_t(parameters.NB));
592 
593  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
594  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
595  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
596  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
597 
598  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
599  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
600  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
601  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
602 
603  if (c_buffer.is_enabled()) {
604 
605  if (!c_buffer.is_open()) {
606 
607  c_buffer.open();
608 
609  if (c_buffer.is_open()) {
610 
611  putObject(c_buffer.getFile(), meta);
612 
613  JStatusStream(logger) << "Created circular buffer " << c_buffer;
614 
615  } else {
616 
617  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
618  }
619 
620  } else {
621 
622  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
623  }
624  }
625 
626  if (c_buffer.is_open()) {
627  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
628  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
629  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
630  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
631  } else {
632  c_buffer.disable();
633  }
634  }
635 
636 
637  virtual void actionStart(int length, const char* buffer) override
638  {
639  using namespace std;
640 
641  if (reporting) {
642  JNoticeStream(logger) << "Start run " << getRunNumber();
643  }
644 
645  timeslices.clear();
646 
647  current_slice_index = -1;
648  queueSize = 0;
649 
650  numberOfEvents = 0;
651  numberOfBytes = 0;
652  numberOfTimeslicesProcessed = 0;
653  numberOfIncompleteTimeslicesProcessed = 0;
654 
655  number_of_packets_received = 0;
656  number_of_packets_discarded = 0;
657  number_of_bytes_received = 0;
658  number_of_reads = 0;
659 
660  minFrameNumber = numeric_limits<int>::max();
661  maxFrameNumber = numeric_limits<int>::min();
662 
663  // Reset global trigger counter.
664 
666 
667  logErrorRun .reset();
668  logErrorDetector .reset();
669  logErrorIndex .reset();
670  logErrorIncomplete.reset();
671 
672  timer.reset();
673  timer.start();
674 
675  Qt.reset();
676 
677  // send trigger parameters to the datawriter
678 
679  ostringstream os;
680 
681  os << getRunNumber() << ' ' << parameters;
682 
683  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
684  }
685 
686 
687  virtual void actionPause(int length, const char* buffer) override
688  {
689  using namespace std;
690 
691  if (!timeslices.empty()) {
692 
693  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
694 
695  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
696  queueSize -= getSizeof(*i);
697  }
698 
699  timeslices.clear();
700  }
701 
702  { // force clearance of memory
703 
704  deque<JDAQTimesliceL0> buffer;
705 
706  timeslices.swap(buffer);
707  }
708 
709  if (queueSize != 0) {
710  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
711  }
712 
713  current_slice_index = -1;
714  queueSize = 0;
715 
716  timer.stop();
717  }
718 
719 
720  virtual void actionContinue(int length, const char* buffer) override
721  {
722  timer.start();
723  }
724 
725 
726  virtual void actionStop(int length, const char* buffer) override
727  {
728  typeout();
729 
730  datawriter.reset();
731  }
732 
733 
734  virtual void actionReset(int length, const char* buffer) override
735  {
736  if (serversocket.is_valid()) {
737  serversocket->shutdown();
738  }
739 
740  serversocket.reset();
741  }
742 
743 
744  virtual void actionQuit(int length, const char* buffer) override
745  {
746  datawriter.reset();
747  }
748 
749 
750  virtual void setSelect(JFileDescriptorMask& mask) const override
751  {
752  if (serversocket.is_valid()) {
753  mask.set(*serversocket);
754  }
755 
756  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
757  if (!channel->isReady()) {
758  mask.set(channel->getFileDescriptor());
759  }
760  }
761  }
762 
763 
764  virtual void actionSelect(const JFileDescriptorMask& mask) override
765  {
766  using namespace std;
767 
768  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
769 
770  try {
771 
772  if (mask.has(channel->getFileDescriptor())) {
773  channel->read();
774  }
775 
776  if (channel->isReady()) {
777 
778  number_of_packets_received += 1;
779  number_of_reads += channel->getCounter();
780  number_of_bytes_received += channel->size();
781 
782  if (isRunning()) {
783 
784  updateFrameQueue(channel);
785 
786  } else {
787 
788  JErrorStream(logErrorRun) << "Receiving data while not running.";
789 
790  number_of_packets_discarded += 1;
791  }
792 
793  channel->reset();
794  }
795 
796  ++channel;
797  }
798  catch(const exception& error) {
799 
800  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
801 
802  channel->shutdown();
803 
804  channel = channelList.erase(channel);
805  }
806  }
807 
808 
809  if (serversocket.is_valid()) {
810 
811  if (mask.has(*serversocket)) {
812 
813  JSocket socket;
814 
815  socket.accept(serversocket->getFileDescriptor());
816 
817  //socket.setSendBufferSize (buffer_size);
819 
820  socket.setKeepAlive (true);
821  socket.setNonBlocking(true);
822 
823  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
824 
825  channelList.push_back(JSocketInputChannel_t(socket));
826  }
827  }
828 
829 
830  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) ||
831  (timeslices.size() >= 2u &&
832  timeslices[1].size() >= frames_per_slice) ||
833  (timeslices.size() >= maxQueueDepth) ||
834  (queueSize >= maxQueueSize))) {
835 
836  const JDAQTimesliceL0& pending_slice = timeslices.front();
837  queueSize -= getSizeof(pending_slice);
838 
839  current_slice_index = pending_slice.getFrameIndex();
840  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
841  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
842 
843  if (pending_slice.size() > frames_per_slice) {
844 
845  JErrorStream(logger) << "More frames in timeslice than expected "
846  << pending_slice.size() << " > " << frames_per_slice;
847 
848  if (pending_slice.size() <= maximum_frames_per_slice) {
849 
850  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
851 
852  frames_per_slice = pending_slice.size();
853  }
854  }
855 
856  if (!pending_slice.empty()) {
857 
858  const localtime_t t0 = getLocalTime();
859 
860  processTimeSlice(pending_slice);
861 
862  const localtime_t t1 = getLocalTime();
863 
864  numberOfTimeslicesProcessed += 1;
865 
866  Qt.put(t1 - t0);
867 
868  if (pending_slice.size() < frames_per_slice) {
869 
870  numberOfIncompleteTimeslicesProcessed += 1;
871 
872  ostringstream error;
873 
874  error << "Timeout -> processed incomplete timeslice: "
875  << "Frame index = " << pending_slice.getFrameIndex() << ';'
876  << "Size of timeslice = " << pending_slice.size() << ';'
877  << "Queue depth = " << timeslices.size() << ';'
878  << "Queue size = " << queueSize << ';';
879 
880  if ((timeslices.size() >= 2u &&
881  timeslices[1].size() >= frames_per_slice)) {
882 
883  error << " intermittent problem -> continues as-is";
884 
885  } else {
886 
887  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
888 
889  frames_per_slice = pending_slice.size();
890  }
891 
892  JErrorStream(logErrorIncomplete) << error.str();
893  }
894  }
895 
896  timeslices.pop_front();
897  }
898  }
899 
900 
901  virtual void actionRunning() override
902  {
903  if (reporting) {
904  typeout();
905  }
906  }
907 
908 
909  /**
910  * Update queue with data frames.
911  *
912  * Note that any discarded data will be reported.
913  *
914  * \param channel incoming data channel
915  */
916  void updateFrameQueue(const JChannelList_t::const_iterator channel)
917  {
918  using namespace std;
919 
920  JByteArrayReader in(channel->data(), channel->size());
921 
922  JDAQPreamble preamble;
923  JDAQSuperFrameHeader header;
924 
925  in >> preamble;
926  in >> header;
927 
928  if (preamble.getLength() != channel->size()) {
929 
930  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
931  << "preamble.getLength() = " << preamble.getLength() << ';'
932  << "channel->size(): " << channel->size() << ';';
933 
934  number_of_packets_discarded += 1;
935 
936  return;
937  }
938 
939  if (header.getRunNumber() != getRunNumber()) {
940 
941  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
942  << " unequal to current run " << getRunNumber()
943  << " -> Dropping frame.";
944 
945  number_of_packets_discarded += 1;
946 
947  return;
948  }
949 
950  if (header.getFrameIndex() <= current_slice_index) {
951 
952  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
953  << " -> Dropping frame.";
954 
955  number_of_packets_discarded += 1;
956 
957  if (frames_per_slice < maximum_frames_per_slice) {
958 
959  frames_per_slice++;
960 
961  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
962  }
963 
964  return;
965  }
966 
967  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
968 
969  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
970  << " -> Dropping frame.";
971 
972  number_of_packets_discarded += 1;
973 
974  return;
975  }
976 
977  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
978 
979  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
980  ++timesliceIterator;
981  }
982 
983  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
984 
985  // The corresponding time slice already exists
986 
987  } else {
988 
989  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
990 
991  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
992 
993  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
994 
995  queueSize += getSizeof(*timesliceIterator);
996  }
997 
998  timesliceIterator->push_back(JDAQSuperFrame(header));
999 
1000  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1001 
1002  queueSize += getSizeof(*timesliceIterator->rbegin());
1003  }
1004 
1005 
1006  /**
1007  * Process time slice.
1008  *
1009  * \param timeslice time slice
1010  */
1011  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1012  {
1013  using namespace std;
1014 
1015  try {
1016 
1017  timesliceRouter->configure(timeslice);
1018 
1019  if (parameters.writeSummary()) {
1020  this->put(JDAQSummaryslice(timeslice));
1021  }
1022 
1023  if (parameters.trigger3DMuon.enabled ||
1024  parameters.trigger3DShower.enabled ||
1025  parameters.triggerMXShower.enabled ||
1026  parameters.triggerNB.enabled ||
1027  parameters.writeL0.prescale ||
1028  parameters.writeL1.prescale ||
1029  parameters.writeL2.prescale ||
1030  parameters.writeSN.prescale ||
1031  c_buffer.is_enabled()) {
1032 
1033  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1034  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1035  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1036  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1037  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1038  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1039 
1040  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1041 
1042  if (moduleRouter->hasModule(frame->getModuleID())) {
1043 
1044  if (!checksum(*frame)) {
1045 
1046  JErrorStream(logger) << "Invalid data at "
1047  << "run = " << timeslice.getRunNumber() << ";"
1048  << "frame index = " << timeslice.getFrameIndex() << ";"
1049  << "module = " << frame->getModuleID() << ";"
1050  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1051 
1052  timesliceTX.push_back(*frame);
1053 
1054  continue;
1055  }
1056 
1057  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1058  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1059 
1060  // Apply high-rate veto
1061 
1062  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1063 
1064  // L0
1065 
1066  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1067 
1068  // Nano-beacon trigger
1069 
1070  if (parameters.triggerNB.enabled) {
1071 
1072  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1073 
1074  if (buffer.begin() != __end) {
1075 
1076  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1077  frame->getModuleIdentifier(),
1078  module.getPosition()));
1079 
1080  JSuperFrame1D_t zbuf;
1081 
1082  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1083 
1084  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1085  }
1086  }
1087 
1088  // L1
1089 
1090  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1091  frame->getModuleIdentifier(),
1092  module.getPosition()));
1093 
1094  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1095 
1096  // L2
1097 
1098  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1099  frame->getModuleIdentifier(),
1100  module.getPosition()));
1101 
1102  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1103 
1104  // SN
1105 
1106  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1107  frame->getModuleIdentifier(),
1108  module.getPosition()));
1109 
1110  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1111 
1112  } else {
1113 
1114  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1115  }
1116  }
1117 
1118  if (!timesliceTX.empty()) {
1119 
1120  if (dumpCount < dumpLimit) {
1121 
1122  this->put(timesliceTX);
1123 
1124  dumpCount += 1;
1125  }
1126  }
1127 
1128  // Trigger
1129 
1130  if (parameters.triggerNB.enabled) {
1131 
1132  const JTriggerInput trigger_input(timesliceNB);
1133 
1134  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1135 
1136  if (parameters.triggerNB.write()) {
1137 
1138  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1139  getTriggerMask(triggerNB->getTriggerBit()),
1140  *hit,
1141  *timesliceRouter,
1142  *moduleRouter,
1143  parameters.TMaxLocal_ns,
1144  parameters.triggerNB.DMax_m,
1145  getTimeRange(parameters.triggerNB));
1146 
1147  this->put(tev);
1148  }
1149  }
1150  }
1151 
1152  JTriggerInput trigger_input(timesliceL2);
1153  JTriggerOutput trigger_output;
1154 
1155  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1156  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1157  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1158 
1159  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1160 
1161  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1162 
1163  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1164 
1165  if (this->put(object)) {
1166  numberOfEvents += 1;
1167  }
1168  }
1169 
1170  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1171 
1172  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1173 
1174  if (parameters.writeL1) { this->put(object); }
1175  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1176  }
1177 
1178  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1179 
1180  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1181 
1182  if (parameters.writeL2) { this->put(object); }
1183  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1184  }
1185 
1186  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1187 
1188  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1189 
1190  if (parameters.writeSN) { this->put(object); }
1191  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1192  }
1193 
1194  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1195 
1196  if (parameters.writeL0) { this->put(timeslice); }
1197  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1198  }
1199  }
1200 
1201  } catch(const exception& error) {
1202 
1203  JErrorStream(logger) << "Error = " << error.what() << ";"
1204  << "run = " << timeslice.getRunNumber() << ";"
1205  << "frame index = " << timeslice.getFrameIndex() << ";"
1206  << "time slice not correctly processed;"
1207  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1208 
1209  if (dumpCount < dumpLimit) {
1210 
1211  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1212 
1213  dumpCount += 1;
1214  }
1215  }
1216 
1217  timesliceRouter->reset();
1218  }
1219 
1220 
1221  /**
1222  * Report status to message logger.
1223  */
1224  void typeout()
1225  {
1226  timer.stop();
1227 
1228  const double T_us = (double) timer.usec_wall;
1229 
1230  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1231  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1232  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1233  try {
1234  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1235  }
1236  catch(const std::exception&) {}
1237  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1238  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1239 
1240  if (number_of_packets_received > 0) {
1241  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1242  }
1243 
1244  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1245  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1246 
1247  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1248 
1249  if (numberOfTimeslicesProcessed > 0) {
1250  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1251  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1252  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1253  }
1254 
1255  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1256  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1257 
1258  if (processedSlicesTime_us > 0) {
1259  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1260  }
1261  if (processedDetectorTime_us > 0) {
1262  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1263  }
1264 
1265  timer.start();
1266  }
1267 
1268 
1269  /**
1270  * Tagged action to handle alerts.
1271  *
1272  * \param tag tag
1273  * \param length number of characters
1274  * \param buffer message
1275  */
1276  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1277  {
1278  using namespace std;
1279 
1280  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1281 
1282  if (tag == RC_ALERT) {
1283 
1284  if (c_buffer.is_open()) {
1285 
1286  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1287 
1288  c_buffer.close(true);
1289  }
1290 
1291  if (c_buffer.is_enabled()) {
1292 
1293  c_buffer.open();
1294 
1295  if (c_buffer.is_open()) {
1296 
1297  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1298 
1299  putObject(c_buffer.getFile(), meta);
1300 
1301  } else {
1302 
1303  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1304 
1305  c_buffer.disable();
1306  }
1307  }
1308 
1309  } else {
1310 
1311  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1312  }
1313  }
1314 
1315  JMeta meta; //!< meta data
1316 
1317  private:
1318 
1320  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1321  std::string hostname; //!< host name of data server
1322 
1323  /**
1324  * Auxiliary method to send object to data server.
1325  *
1326  * \param object object to be sent
1327  * \return true if sent; else false
1328  */
1329  template<class T>
1330  bool put(const T& object)
1331  {
1332  try {
1333 
1334  datawriter->put(object);
1335 
1336  numberOfBytes += getSizeof(object);
1337 
1338  return true;
1339  }
1340  catch(const std::exception& error) {
1341  JErrorStream(logger) << error.what();
1342  }
1343 
1344  return false;
1345  }
1346 
1347 
1348  int port; //!< server socket port
1349  int backlog;
1351 
1352  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1353  JChannelList_t channelList; //!< connections to data queue
1354 
1355  JTimer timer;
1357 
1358  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1360  unsigned int frames_per_slice;
1363 
1364  // trigger
1365 
1368 
1375 
1380 
1385 
1388 
1389  // process management
1390 
1393 
1394  // memory management
1395 
1396  long long int totalCPURAM;
1397  unsigned int maxQueueDepth;
1398  long long int maxQueueSize;
1399  long long int queueSize;
1400 
1401  // statistics
1402 
1404 
1405  long long int numberOfEvents;
1406  long long int numberOfBytes;
1409 
1412 
1413  // temporary
1414 
1417  long long int number_of_reads;
1419 
1420  // circular buffer
1421 
1423  };
1424 }
1425 
1426 /**
1427  * \file
1428  *
1429  * Application for real-time filtering of data.
1430  * For more information, see KM3NETDAQ::JDataFilter.
1431  *
1432  * \author rbruijn and mdejong
1433  */
1434 int main(int argc, char* argv[])
1435 {
1436  using namespace std;
1437  using namespace JPP;
1438  using namespace KM3NETDAQ;
1439 
1440  string server;
1441  string logger;
1442  string hostname;
1443  string client_name;
1444  int port;
1445  int backlog;
1446  int buffer_size;
1447  bool use_cout;
1448  string path;
1449  string archive;
1450  int debug;
1451 
1452 
1453  try {
1454 
1455  JParser<> zap("Application for real-time filtering of data.");
1456 
1457  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1458  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1459  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1460  zap['u'] = make_field(client_name, "client name") = "%";
1461  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1462  zap['q'] = make_field(backlog, "back log") = 1024;
1463  zap['s'] = make_field(buffer_size, "TCP buffer size") = 32 * MEGABYTE; // TCP buffer of 32 MB
1464  zap['c'] = make_field(use_cout, "print to terminal");
1465  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1466  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1467  zap['d'] = make_field(debug, "debug level") = 0;
1468 
1469  zap(argc, argv);
1470  }
1471  catch(const exception& error) {
1472  FATAL(error.what() << endl);
1473  }
1474 
1475 
1476  JLogger* out = NULL;
1477 
1478  if (use_cout)
1479  out = new JStreamLogger(cout);
1480  else
1481  out = new JControlHostLogger(logger);
1482 
1483  JDataFilter dfilter(client_name,
1484  server,
1485  hostname,
1486  out,
1487  debug,
1488  port,
1489  backlog,
1490  buffer_size,
1491  path,
1492  archive);
1493 
1494  dfilter.meta = JMeta(argc, argv);
1495 
1496  dfilter.enter();
1497  dfilter.run();
1498 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:901
Exception for opening of file.
Definition: JException.hh:342
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:343
Utility class to parse command line options.
Definition: JParser.hh:1517
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:79
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:228
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:164
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:161
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:156
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:326
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:696
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:43
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:151
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:662
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:186
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:330
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:163
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:292
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:160
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:162
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:373
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:720
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:157
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:308
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1993
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
static struct JACOUSTICS::@4 compare
Auxiliary data structure to sort transmissions.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:916
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:390
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:332
Level specific message streamers.
long long int number_of_packets_received
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:744
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DOM<$WORKDIR/ev_configure_domsimulator.txt > RC_DWRT path
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:328
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:251
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:637
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:734
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:394
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:273
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:764
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:325
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:110
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:364
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:363
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:152
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:726
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
double u[N+1]
Definition: JPolint.hh:776
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:407
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:687
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:327
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:331
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:750
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:428
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.