Jpp  18.4.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "Jeep/JParser.hh"
14 #include "Jeep/JProperties.hh"
15 #include "Jeep/JTimer.hh"
16 #include "Jeep/JTimekeeper.hh"
17 #include "Jeep/JMessage.hh"
18 #include "Jeep/JPrint.hh"
19 #include "Jeep/JeepToolkit.hh"
26 #include "JDAQ/JDAQTags.hh"
27 #include "JDAQ/JDAQEventIO.hh"
28 #include "JDAQ/JDAQTimesliceIO.hh"
30 #include "JDetector/JDetector.hh"
31 #include "JTrigger/JHit.hh"
32 #include "JTrigger/JHitToolkit.hh"
35 #include "JTrigger/JTimeslice.hh"
36 #include "JTrigger/JHitL0.hh"
37 #include "JTrigger/JHitL1.hh"
38 #include "JTrigger/JBuildL1.hh"
39 #include "JTrigger/JBuildL2.hh"
43 #include "JTrigger/JTriggerNB.hh"
44 #include "JTrigger/JTriggerBits.hh"
48 #include "JTrigger/JTimesliceL1.hh"
51 #include "JTrigger/JChecksum.hh"
54 #include "JNet/JControlHost.hh"
56 #include "JNet/JTCPSocket.hh"
57 #include "JNet/JSocketChannel.hh"
58 #include "JNet/JServerSocket.hh"
59 #include "JPhysics/JConstants.hh"
60 #include "JTools/JQuantile.hh"
61 #include "JSupport/JSupport.hh"
63 #include "JSupport/JMeta.hh"
64 #include "JSystem/JStat.hh"
65 #include "JSystem/JTime.hh"
67 #include "JSystem/JNetwork.hh"
68 #include "JSystem/JFilesystem.hh"
69 
70 
71 namespace JNET {
72 
73  /**
74  * Get size of packeet.
75  *
76  * \param preamble DAQ data preamble
77  * \return size [B]
78  */
79  template<>
81  {
82  return preamble.getLength();
83  }
84 }
85 
86 namespace KM3NETDAQ {
87 
88  using namespace JPP;
89 
90  /**
91  * Main class for real-time filtering of data.
92  *
93  * This class implements the action methods for each transition of the state machine.\n
94  * When the state machine is entered, data are continually collected using
95  * custom implementation of virtual methods
96  * setSelect and
97  * actionSelect.
98  *
99  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
100  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
101  * When a JDAQTimeslice is complete,
102  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
103  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
104  * which, together with other parameters, is parsed in method actionConfigure.\n
105  * <pre>
106  * numberOfFramesPerSlice = frames_per_slice = 1;
107  * </pre>
108  * Note that this parameter can change during operation (see below).
109  *
110  * A timeout may occur when the total amount of data or the number of incomplete time slices
111  * in the queue exceeds one of the corresponding pre-set limits:\n
112  * <pre>
113  * queueSize = <maximal queue size [B]>;
114  * queueDepth = <maximal queue depth>;
115  * </pre>
116  * Note that these values apply per JDataFilter.\n
117  *
118  * When a timeout occurs,
119  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
120  * When data are received with a frame index below the current frame index,
121  * the parameter <tt>frames_per_slice</tt> is incremented by one,
122  * up to the original value set at actionConfigure.\n
123  *
124  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
125  * The following parameters can be used to configure the circular buffer.\n
126  * <pre>
127  * path = <write directory for temporary circular buffer>;
128  * archive = <write directory for archival of circular buffer>;
129  * c_sizeL0 = <L0 buffer size>;
130  * c_sizeL1 = <L1 buffer size>;
131  * c_sizeL2 = <L2 buffer size>;
132  * c_sizeSN = <SN buffer size>;
133  * </pre>
134  *
135  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
136  * - There will always be a file created which is deleted when the state machine is exited;
137  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
138  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
139  * - After archival of the temporary file, a new temporary file will be opened;
140  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
141  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
142  *
143  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
144  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
145  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
146  * (e.g. when there is more than one alert on a given day).
147  *
148  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
149  *
150  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
151  */
152  class JDataFilter :
153  public JDAQClient
154  {
155  public:
156 
159 
160  typedef double hit_type;
166 
167 
168 
169  /**
170  * Circular buffer.
171  */
173  public JTreeRecorder<JDAQTimesliceTypes_t>
174  {
177 
178 
179  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
180 
181 
182  /**
183  * Constructor.
184  *
185  * \param path directory for temporary storage
186  * \param archive directory for permanent archival
187  * \param tag tag
188  */
190  const std::string& archive,
191  const JTag& tag) :
192  path (path),
193  archive(archive),
194  tag (tag)
195  {
196  disable();
197  }
198 
199 
200  /**
201  * Open file.
202  *
203  * If file with same name exists, remove it beforehand.
204  */
205  void open()
206  {
207  using namespace std;
208  using namespace JPP;
209 
210  gErrorIgnoreLevel = kFatal;
211 
212  std::ostringstream os;
213 
214  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
215 
216  if (getFileStatus(os.str().c_str())) {
217  std::remove(os.str().c_str());
218  }
219 
220  this->open(os.str().c_str());
221  }
222 
223 
224  /**
225  * Close file.
226  *
227  * If option is true, archive file; else delete file.
228  *
229  * \param option option
230  */
231  void close(const bool option)
232  {
233  using namespace std;
234  using namespace JPP;
235 
236  const JDateAndTime cal;
237 
238  if (this->is_open()) {
239 
240  const string file_name = this->getFile()->GetName();
241 
242  this->close();
243 
244  if (option) {
245 
246  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
247 
248  ostringstream os;
249 
250  os << getFullPath(this->archive)
251  << "KM3NeT"
252  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
253  << "_" << this->tag;
254 
255  if (i != 0) {
256  os << "_" << i;
257  }
258 
259  os << ".root";
260 
261  if (!getFileStatus(os.str().c_str())) {
262 
263  if (JSYSTEM::rename(file_name, os.str()) == 0)
264  return;
265  else
266  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
267  }
268  }
269 
270  } else {
271 
272  std::remove(file_name.c_str());
273  }
274  }
275  }
276 
277 
278  /**
279  * Disable writing.
280  */
281  void disable()
282  {
283  sizeL0 = 0;
284  sizeL1 = 0;
285  sizeL2 = 0;
286  sizeSN = 0;
287  }
288 
289 
290  /**
291  * Check whether writing of data is enabled.
292  *
293  * \return true if writing enabled; else false
294  */
295  bool is_enabled() const
296  {
297  return (sizeL0 > 0 ||
298  sizeL1 > 0 ||
299  sizeL2 > 0 ||
300  sizeSN > 0);
301  }
302 
303 
304  /**
305  * Write circular buffer to output stream.
306  *
307  * \param out output stream
308  * \param object circular buffer
309  * \return output stream
310  */
311  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
312  {
313  if (object.is_open())
314  out << object.getFile()->GetName();
315  else
316  out << "void";
317 
318  out << ' ';
319 
320  out << object.sizeL0 << '/'
321  << object.sizeL1 << '/'
322  << object.sizeL2 << '/'
323  << object.sizeSN << '/';
324 
325  return out;
326  }
327 
328  Long64_t sizeL0; //!< Number of L0 time slices
329  Long64_t sizeL1; //!< Number of L1 time slices
330  Long64_t sizeL2; //!< Number of L2 time slices
331  Long64_t sizeSN; //!< Number of SN time slices
332 
333  std::string path; //!< Directory for temporary storage
334  std::string archive; //!< Directory for permanent archival
335  JTag tag; //!< Unique tag of this process
336  };
337 
338 
339  /**
340  * Sort DAQ process by index.
341  *
342  * \param first first DAQ process
343  * \param second second DAQ process
344  * \return true if index of first DAQ process less than that of second; else false
345  */
346  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
347  {
348  return first.index < second.index;
349  }
350 
351 
352  /**
353  * Constructor.
354  *
355  * \param name name of client
356  * \param server name of command message server
357  * \param hostname name of data server
358  * \param logger pointer to logger
359  * \param level debug level
360  * \param port server port
361  * \param backlog server backlog
362  * \param path directory for temporary storage
363  * \param archive directory for parmanent archival
364  */
366  const std::string& server,
367  const std::string& hostname,
368  JLogger* logger,
369  const int level,
370  const int port,
371  const int backlog,
372  const std::string& path,
373  const std::string& archive) :
374  JDAQClient (name,server,logger,level),
375  hostname (hostname),
376  port (port),
377  backlog (backlog),
378  c_buffer (path, archive, getUniqueTag())
379  {
380  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
381 
382  addSubscription(JSubscriptionAll(RC_ALERT));
383 
384  totalCPURAM = getRAM();
385  current_slice_index = -1;
386  reporting = false;
387 
388  this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
389  }
390 
391 
392  virtual void actionEnter() override
393  {}
394 
395 
396  virtual void actionExit() override
397  {
398  if (c_buffer.is_open()) {
399 
400  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
401 
402  c_buffer.close(false);
403  }
404 
405  datawriter.reset();
406  }
407 
408 
409  virtual void actionInit(int length, const char* buffer) override
410  {
411  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
412 
413  try {
414 
415  JDebugStream(logger) << "Start server.";
416 
417  if (serversocket.is_valid()) {
418  serversocket->shutdown();
419  }
420 
421  serversocket.reset(new JServerSocket(port,backlog));
422  }
423  catch(const std::exception& error) {
424  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
425  ev_error();
426  }
427  }
428 
429 
430  virtual void actionConfigure(int length, const char* buffer) override
431  {
432  using namespace std;
433 
434  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
435 
436  string _hostname_ = "";
437 
438  long long int update_s = 20;
439  long long int logger_s = 10;
440 
441  parameters .reset();
442  dataFilters.clear();
443  dataQueues .clear();
444 
445  reporting = false;
446  dumpCount = 0;
447  dumpLimit = numeric_limits<int>::max();
448 
449  detector.clear();
450 
451  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
452 
453  properties["dataWriter"] = _hostname_;
454  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
455  properties["detector"] = detector;
456  properties["triggerParameters"] = parameters;
457  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
458  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
459  properties["frameIndex"] = maximal_frame_index = 100000;
460  properties["logger_s"] = logger_s;
461  properties["update_s"] = update_s;
462  properties["JDataFilter"] = dataFilters;
463  properties["DataQueue"] = dataQueues;
464  properties["path"] = c_buffer.path;
465  properties["archive"] = c_buffer.archive;
466  properties["c_sizeL0"] = c_buffer.sizeL0;
467  properties["c_sizeL1"] = c_buffer.sizeL1;
468  properties["c_sizeL2"] = c_buffer.sizeL2;
469  properties["c_sizeSN"] = c_buffer.sizeSN;
470  properties["dumpLimit"] = dumpLimit;
471 
472  try {
473  properties.read(string(buffer, length));
474  }
475  catch(const std::exception& error) {
476  JErrorStream(logger) << error.what();
477  }
478 
479  if (update_s <= 0) { update_s = 20; }
480  if (logger_s <= 0) { logger_s = 10; }
481 
482  setClockInterval(update_s * 1000000LL);
483 
484  _hostname_ = trim(_hostname_);
485 
486  if (_hostname_ != "" && _hostname_ != hostname) {
487 
488  datawriter.reset();
489 
490  hostname = _hostname_;
491  }
492 
493  if (!datawriter.is_valid()) {
494 
495  datawriter.reset(new JControlHost_t(hostname));
496 
497  datawriter->MyId(getFullName());
498  }
499 
500  datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
501 
502  maximum_frames_per_slice = frames_per_slice;
503 
504  // process processlist
505 
506  if (dataFilters.empty()) {
507  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
508  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
509  }
510 
511  sort(dataFilters.begin(), dataFilters.end(), compare);
512 
513  unsigned int numberOfDataFiltersOnThisMachine = 0;
514  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
515 
517 
518  {
519  JNoticeStream notice(logger);
520 
521  notice << "My IP addresses:";
522 
523  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
524  notice << ' ' << *i;
525  }
526  }
527 
528  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
529 
530  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
531 
532  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
533 
534  numberOfDataFiltersOnThisMachine++;
535 
536  if (i->port == this->port) {
537  thisProcess = i;
538  }
539  }
540  }
541 
542  if (numberOfDataFiltersOnThisMachine == 0) {
543  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
544  << "Assuming one datafilter on this machine.";
545  numberOfDataFiltersOnThisMachine = 1;
546  }
547 
548  if (thisProcess == dataFilters.end()) {
549  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
550  }
551 
552  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
553  JErrorStream(logger) << "Mismatch between given process names: "
554  << "I am called " << getName()
555  << ", but in the process list I am referred to as " << thisProcess->index;
556  }
557 
558  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
559  reporting = true;
560  }
561 
562  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
563 
564  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
565 
566  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
567  << "Queue size reduced to "
568  << maxQueueSize << " bytes." ;
569  }
570 
571  // detector
572 
573  if (parameters.disableHighRateVeto) {
574 
575  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
576 
577  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
578  }
579 
580  // trigger parameters
581 
583 
584  triggerNB .reset(new JTriggerNB (parameters));
585  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
586  trigger3DShower.reset(new JTrigger3DShower(parameters));
587  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
588 
589  moduleRouter.reset(new JModuleRouter(detector));
590 
591  if (reporting) {
592  JNoticeStream(logger) << "This data filter process will report.";
593  JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
594  JDebugStream (logger) << "Trigger parameters: " << parameters;
595  JDebugStream (logger) << "Detector description: " << endl << detector;
596  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
597  }
598 
599  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
600 
601  // set L1, L2 and SN builders
602 
603  buildL1.reset(new JBuildL1_t(parameters));
604  buildL2.reset(new JBuildL2_t(parameters.L2));
605  buildSN.reset(new JBuildL2_t(parameters.SN));
606  buildNB.reset(new JBuildL2_t(parameters.NB));
607 
608  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
609  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
610  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
611  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
612 
613  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
614  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
615  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
616  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
617 
618  if (c_buffer.is_enabled()) {
619 
620  if (!c_buffer.is_open()) {
621 
622  c_buffer.open();
623 
624  if (c_buffer.is_open()) {
625 
626  putObject(c_buffer.getFile(), meta);
627 
628  JStatusStream(logger) << "Created circular buffer " << c_buffer;
629 
630  } else {
631 
632  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
633  }
634 
635  } else {
636 
637  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
638  }
639  }
640 
641  if (c_buffer.is_open()) {
642  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
643  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
644  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
645  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
646  } else {
647  c_buffer.disable();
648  }
649  }
650 
651 
652  virtual void actionStart(int length, const char* buffer) override
653  {
654  using namespace std;
655 
656  if (reporting) {
657  JNoticeStream(logger) << "Start run " << getRunNumber();
658  }
659 
660  timeslices.clear();
661 
662  current_slice_index = -1;
663  queueSize = 0;
664 
665  numberOfEvents = 0;
666  numberOfBytes = 0;
667  numberOfTimeslicesProcessed = 0;
668  numberOfIncompleteTimeslicesProcessed = 0;
669 
670  number_of_packets_received = 0;
671  number_of_packets_discarded = 0;
672  number_of_bytes_received = 0;
673  number_of_reads = 0;
674 
675  minFrameNumber = numeric_limits<int>::max();
676  maxFrameNumber = numeric_limits<int>::min();
677 
678  // Reset global trigger counter.
679 
681 
682  logErrorRun .reset();
683  logErrorDetector .reset();
684  logErrorIndex .reset();
685  logErrorIncomplete.reset();
686 
687  timer.reset();
688  timer.start();
689 
690  Qt.reset();
691  Qx.reset();
692 
693  // send trigger parameters to the datawriter
694 
695  ostringstream os;
696 
697  os << getRunNumber() << ' ' << parameters;
698 
699  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
700  }
701 
702 
703  virtual void actionPause(int length, const char* buffer) override
704  {
705  using namespace std;
706 
707  if (!timeslices.empty()) {
708 
709  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
710 
711  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
712  queueSize -= getSizeof(*i);
713  }
714 
715  timeslices.clear();
716  }
717 
718  { // force clearance of memory
719 
720  deque<JDAQTimesliceL0> buffer;
721 
722  timeslices.swap(buffer);
723  }
724 
725  if (queueSize != 0) {
726  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
727  }
728 
729  current_slice_index = -1;
730  queueSize = 0;
731 
732  timer.stop();
733  }
734 
735 
736  virtual void actionContinue(int length, const char* buffer) override
737  {
738  timer.start();
739  }
740 
741 
742  virtual void actionStop(int length, const char* buffer) override
743  {
744  typeout();
745 
746  datawriter.reset();
747  }
748 
749 
750  virtual void actionReset(int length, const char* buffer) override
751  {
752  if (serversocket.is_valid()) {
753  serversocket->shutdown();
754  }
755 
756  serversocket.reset();
757  }
758 
759 
760  virtual void actionQuit(int length, const char* buffer) override
761  {
762  datawriter.reset();
763  }
764 
765 
766  virtual void setSelect(JFileDescriptorMask& mask) const override
767  {
768  if (serversocket.is_valid()) {
769  mask.set(*serversocket);
770  }
771 
772  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
773  if (!channel->isReady()) {
774  mask.set(channel->getFileDescriptor());
775  }
776  }
777  }
778 
779 
780  virtual void actionSelect(const JFileDescriptorMask& mask) override
781  {
782  using namespace std;
783 
784  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
785 
786  try {
787 
788  if (mask.has(channel->getFileDescriptor())) {
789  channel->read();
790  }
791 
792  if (channel->isReady()) {
793 
794  number_of_packets_received += 1;
795  number_of_reads += channel->getCounter();
796  number_of_bytes_received += channel->size();
797 
798  if (isRunning()) {
799 
800  try {
801  updateFrameQueue(channel);
802  }
803  catch(const std::exception& error) {
804 
805  JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
806 
807  number_of_packets_discarded += 1;
808  }
809 
810  } else {
811 
812  JErrorStream(logErrorRun) << "Receiving data while not running.";
813 
814  number_of_packets_discarded += 1;
815  }
816 
817  channel->reset();
818  }
819 
820  ++channel;
821  }
822  catch(const std::exception& error) {
823 
824  if (isRunning()) {
825  JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
826  }
827 
828  channel->shutdown();
829 
830  channel = channelList.erase(channel);
831  }
832  }
833 
834 
835  if (serversocket.is_valid()) {
836 
837  if (mask.has(*serversocket)) {
838 
839  JTCPSocket socket(serversocket->getFileDescriptor());
840 
842 
843  socket.setKeepAlive (true);
844  socket.setNonBlocking(false);
845 
846  JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
847 
848  channelList.push_back(JSocketInputChannel_t(socket));
849  }
850  }
851 
852 
853  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) || // normal
854 
855  (timeslices.size() >= 2u && // intermittent problem
856  timeslices[1].size() >= frames_per_slice) ||
857 
858  (timeslices.size() >= maxQueueDepth) || // timeout
859  (queueSize >= maxQueueSize))) {
860 
861  const JDAQTimesliceL0& pending_slice = timeslices.front();
862  queueSize -= getSizeof(pending_slice);
863 
864  current_slice_index = pending_slice.getFrameIndex();
865  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
866  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
867 
868  if (pending_slice.size() > frames_per_slice) {
869 
870  ostringstream error;
871 
872  error << "More frames in timeslice than expected "
873  << pending_slice.size() << " > " << frames_per_slice;
874 
875  if (pending_slice.size() <= maximum_frames_per_slice) {
876 
877  error << " adjusting expected frames per timeslice";
878 
879  frames_per_slice = pending_slice.size();
880  }
881 
882  JErrorStream(logErrorIncomplete) << error.str();
883  }
884 
885  if (!pending_slice.empty()) {
886 
887  if (isRunning()) {
888 
889  const localtime_t t0 = getLocalTime();
890 
891  processTimeSlice(pending_slice);
892 
893  const localtime_t t1 = getLocalTime();
894 
895  numberOfTimeslicesProcessed += 1;
896 
897  Qt.put(t1 - t0);
898 
899  } else {
900 
901  JErrorStream(logErrorRun) << "Skip processing of data while not running.";
902  }
903 
904  if (pending_slice.size() < frames_per_slice) {
905 
906  numberOfIncompleteTimeslicesProcessed += 1;
907 
908  ostringstream error;
909 
910  error << "Timeout -> processed incomplete timeslice: "
911  << "Frame index = " << pending_slice.getFrameIndex() << ';'
912  << "Size of timeslice = " << pending_slice.size() << ';'
913  << "Queue depth = " << timeslices.size() << ';'
914  << "Queue size = " << queueSize << ';';
915 
916  if ((timeslices.size() >= 2u &&
917  timeslices[1].size() >= frames_per_slice)) {
918 
919  error << " intermittent problem -> continues as-is";
920 
921  } else {
922 
923  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
924 
925  frames_per_slice = pending_slice.size();
926  }
927 
928  JErrorStream(logErrorIncomplete) << error.str();
929  }
930  }
931 
932  timeslices.pop_front();
933  }
934  }
935 
936 
937  virtual void actionRunning() override
938  {
939  if (reporting) {
940  typeout();
941  }
942  }
943 
944 
945  /**
946  * Update queue with data frames.
947  *
948  * Note that any discarded data will be reported.
949  *
950  * \param channel incoming data channel
951  */
952  void updateFrameQueue(const JChannelList_t::const_iterator channel)
953  {
954  using namespace std;
955 
956  JByteArrayReader in(channel->data(), channel->size());
957 
958  JDAQPreamble preamble;
959  JDAQSuperFrameHeader header;
960 
961  in >> preamble;
962  in >> header;
963 
964  if (preamble.getLength() != channel->size()) {
965 
966  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
967  << "preamble.getLength() = " << preamble.getLength() << ';'
968  << "channel->size(): " << channel->size() << ';';
969 
970  number_of_packets_discarded += 1;
971 
972  return;
973  }
974 
975  if (header.getRunNumber() != getRunNumber()) {
976 
977  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
978  << " != " << getRunNumber()
979  << " -> Dropping frame.";
980 
981  number_of_packets_discarded += 1;
982 
983  return;
984  }
985 
986  if (header.getFrameIndex() <= current_slice_index) {
987 
988  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
989  << " module " << header.getModuleID()
990  << " -> Dropping frame.";
991 
992  number_of_packets_discarded += 1;
993 
994  if (frames_per_slice < maximum_frames_per_slice) {
995 
996  frames_per_slice++;
997 
998  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
999  }
1000 
1001  return;
1002  }
1003 
1004  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
1005 
1006  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1007  << " module " << header.getModuleID()
1008  << " -> Dropping frame.";
1009 
1010  number_of_packets_discarded += 1;
1011 
1012  return;
1013  }
1014 
1015  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1016 
1017  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1018  ++timesliceIterator;
1019  }
1020 
1021  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1022 
1023  // The corresponding time slice already exists
1024 
1025  } else {
1026 
1027  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1028 
1029  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1030 
1031  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1032 
1033  queueSize += getSizeof(*timesliceIterator);
1034  }
1035 
1036  timesliceIterator->push_back(JDAQSuperFrame(header));
1037 
1038  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1039 
1040  queueSize += getSizeof(*timesliceIterator->rbegin());
1041  }
1042 
1043 
1044  /**
1045  * Process time slice.
1046  *
1047  * \param timeslice time slice
1048  */
1049  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1050  {
1051  using namespace std;
1052 
1053  try {
1054 
1055  timesliceRouter->configure(timeslice);
1056 
1057  if (parameters.writeSummary()) {
1058  this->put(JDAQSummaryslice(timeslice));
1059  }
1060 
1061  if (parameters.trigger3DMuon.enabled ||
1062  parameters.trigger3DShower.enabled ||
1063  parameters.triggerMXShower.enabled ||
1064  parameters.triggerNB.enabled ||
1065  parameters.writeL0.prescale ||
1066  parameters.writeL1.prescale ||
1067  parameters.writeL2.prescale ||
1068  parameters.writeSN.prescale ||
1069  c_buffer.is_enabled()) {
1070 
1071  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1072  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1073  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1074  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1075  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1076  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1077 
1078  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1079 
1080  if (moduleRouter->hasModule(frame->getModuleID())) {
1081 
1082  if (!checksum(*frame)) {
1083 
1084  JWarningStream(logger) << "Invalid data at "
1085  << "run = " << timeslice.getRunNumber() << ";"
1086  << "frame index = " << timeslice.getFrameIndex() << ";"
1087  << "module = " << frame->getModuleID() << ";"
1088  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1089 
1090  if (dumpCount < dumpLimit) {
1091  timesliceTX.push_back(*frame);
1092  }
1093 
1094  continue;
1095  }
1096 
1097  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1098  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1099 
1100  // Apply high-rate veto
1101 
1102  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1103 
1104  // L0
1105 
1106  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1107 
1108  // Nano-beacon trigger
1109 
1110  if (parameters.triggerNB.enabled) {
1111 
1112  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1113 
1114  if (buffer.begin() != __end) {
1115 
1116  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1117  frame->getModuleIdentifier(),
1118  module.getPosition()));
1119 
1120  JSuperFrame1D_t zbuf;
1121 
1122  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1123 
1124  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1125  }
1126  }
1127 
1128  // L1
1129 
1130  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1131  frame->getModuleIdentifier(),
1132  module.getPosition()));
1133 
1134  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1135 
1136  // L2
1137 
1138  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1139  frame->getModuleIdentifier(),
1140  module.getPosition()));
1141 
1142  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1143 
1144  // SN
1145 
1146  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1147  frame->getModuleIdentifier(),
1148  module.getPosition()));
1149 
1150  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1151 
1152  } else {
1153 
1154  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1155  }
1156  }
1157 
1158  if (!timesliceTX.empty()) {
1159 
1160  if (dumpCount < dumpLimit) {
1161 
1162  this->put(timesliceTX);
1163 
1164  dumpCount += 1;
1165  }
1166  }
1167 
1168  // Trigger
1169 
1170  if (parameters.triggerNB.enabled) {
1171 
1172  const JTriggerInput trigger_input(timesliceNB);
1173 
1174  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1175 
1176  if (parameters.triggerNB.write()) {
1177 
1178  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1179  getTriggerMask(triggerNB->getTriggerBit()),
1180  *hit,
1181  *timesliceRouter,
1182  *moduleRouter,
1183  parameters.TMaxLocal_ns,
1184  parameters.triggerNB.DMax_m,
1185  getTimeRange(parameters.triggerNB));
1186 
1187  this->put(tev);
1188  }
1189  }
1190  }
1191 
1192  JTriggerInput trigger_input(timesliceL2);
1193  JTriggerOutput trigger_output;
1194 
1195  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1196  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1197  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1198 
1199  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1200 
1201  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1202 
1203  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1204 
1205  this->put(object);
1206 
1207  numberOfEvents += 1;
1208  }
1209 
1210  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1211 
1212  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1213 
1214  if (parameters.writeL1) { this->put(object); }
1215  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1216  }
1217 
1218  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1219 
1220  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1221 
1222  if (parameters.writeL2) { this->put(object); }
1223  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1224  }
1225 
1226  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1227 
1228  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1229 
1230  if (parameters.writeSN) { this->put(object); }
1231  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1232  }
1233 
1234  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1235 
1236  if (parameters.writeL0) { this->put(timeslice); }
1237  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1238  }
1239  }
1240 
1241  } catch(const std::exception& error) {
1242 
1243  JErrorStream(logger) << "Error = " << error.what() << ";"
1244  << "run = " << timeslice.getRunNumber() << ";"
1245  << "frame index = " << timeslice.getFrameIndex() << ";"
1246  << "time slice not correctly processed;"
1247  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1248 
1249  if (dumpCount < dumpLimit) {
1250 
1251  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1252 
1253  dumpCount += 1;
1254  }
1255  }
1256 
1257  timesliceRouter->reset();
1258  }
1259 
1260 
1261  /**
1262  * Report status to message logger.
1263  */
1264  void typeout()
1265  {
1266  timer.stop();
1267 
1268  const double T_us = (double) timer.usec_wall;
1269 
1270  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1271  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1272  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1273  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1274  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1275  JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1276 
1277  if (number_of_packets_received > 0) {
1278  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1279  }
1280 
1281  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1282  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1283 
1284  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1285 
1286  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1287  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1288 
1289  if (processedSlicesTime_us > 0) {
1290  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1291  }
1292  if (processedDetectorTime_us > 0) {
1293  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1294  }
1295 
1296  timer.start();
1297  }
1298 
1299 
1300  /**
1301  * Tagged action to handle alerts.
1302  *
1303  * \param tag tag
1304  * \param length number of characters
1305  * \param buffer message
1306  */
1307  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1308  {
1309  using namespace std;
1310 
1311  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1312 
1313  if (tag == RC_ALERT) {
1314 
1315  if (c_buffer.is_open()) {
1316 
1317  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1318 
1319  c_buffer.close(true);
1320  }
1321 
1322  if (c_buffer.is_enabled()) {
1323 
1324  c_buffer.open();
1325 
1326  if (c_buffer.is_open()) {
1327 
1328  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1329 
1330  putObject(c_buffer.getFile(), meta);
1331 
1332  } else {
1333 
1334  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1335 
1336  c_buffer.disable();
1337  }
1338  }
1339 
1340  } else {
1341 
1342  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1343  }
1344  }
1345 
1346  JMeta meta; //!< meta data
1347 
1348  private:
1349 
1351  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1352  std::string hostname; //!< host name of data server
1353 
1354  /**
1355  * Auxiliary method to send object to data server.
1356  *
1357  * \param object object to be sent
1358  */
1359  template<class T>
1360  void put(const T& object)
1361  {
1362  try {
1363 
1364  const localtime_t t0 = getLocalTime();
1365 
1366  datawriter->put(object);
1367 
1368  const localtime_t t1 = getLocalTime();
1369 
1370  numberOfBytes += getSizeof(object);
1371 
1372  Qx.put(t1 - t0);
1373  }
1374  catch(const std::exception& error) {
1375  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1376  ev_error();
1377  }
1378  }
1379 
1380 
1381  int port; //!< server socket port
1382  int backlog;
1383 
1384  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1385  JChannelList_t channelList; //!< connections to data queue
1386 
1387  JTimer timer;
1389 
1390  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1392  unsigned int frames_per_slice;
1395 
1396  // trigger
1397 
1400 
1407 
1412 
1417 
1420 
1421  // process management
1422 
1425 
1426  // memory management
1427 
1428  long long int totalCPURAM;
1429  unsigned int maxQueueDepth;
1430  long long int maxQueueSize;
1431  long long int queueSize;
1432 
1433  // statistics
1434 
1436 
1437  long long int numberOfEvents;
1438  long long int numberOfBytes;
1441 
1444 
1445  // temporary
1446 
1449  long long int number_of_reads;
1451 
1452  // circular buffer
1453 
1455  };
1456 }
1457 
1458 /**
1459  * \file
1460  *
1461  * Application for real-time filtering of data.
1462  * For more information, see KM3NETDAQ::JDataFilter.
1463  *
1464  * \author rbruijn and mdejong
1465  */
1466 int main(int argc, char* argv[])
1467 {
1468  using namespace std;
1469  using namespace JPP;
1470  using namespace KM3NETDAQ;
1471 
1472  string server;
1473  string logger;
1474  string hostname;
1475  string client_name;
1476  int port;
1477  int backlog;
1478  bool use_cout;
1479  string path;
1480  string archive;
1481  int debug;
1482 
1483 
1484  try {
1485 
1486  JParser<> zap("Application for real-time filtering of data.");
1487 
1488  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1489  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1490  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1491  zap['u'] = make_field(client_name, "client name") = "%";
1492  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1493  zap['q'] = make_field(backlog, "back log") = 1024;
1494  zap['c'] = make_field(use_cout, "print to terminal");
1495  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1496  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1497  zap['d'] = make_field(debug, "debug level") = 0;
1498 
1499  zap(argc, argv);
1500  }
1501  catch(const std::exception& error) {
1502  FATAL(error.what() << endl);
1503  }
1504 
1505 
1506  JLogger* out = NULL;
1507 
1508  if (use_cout)
1509  out = new JStreamLogger(cout);
1510  else
1511  out = new JControlHostLogger(logger);
1512 
1513  JDataFilter dfilter(client_name,
1514  server,
1515  hostname,
1516  out,
1517  debug,
1518  port,
1519  backlog,
1520  path,
1521  archive);
1522 
1523  dfilter.meta = JMeta(argc, argv);
1524 
1525  dfilter.enter();
1526  dfilter.run();
1527 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:937
Exception for opening of file.
Definition: JException.hh:358
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:346
Utility class to parse command line options.
Definition: JParser.hh:1514
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:80
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:231
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:67
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:165
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:162
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:157
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:329
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:67
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -&gt; JDataWriter.cc
Definition: JDAQTags.hh:32
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:497
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:84
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:152
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
Definition: JFilesystem.hh:49
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:189
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:333
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:164
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:295
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:161
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
Definition: getArchive.sh:42
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc &lt;- JLigier.cc
Definition: JDAQTags.hh:31
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:163
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:736
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:158
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:311
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc &lt;- DataQueue.cc
Definition: JDAQTags.hh:30
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:65
then awk string
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:952
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:392
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:335
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:760
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:331
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:133
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
TCP socket.
Definition: JTCPSocket.hh:25
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:652
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:750
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:396
JSinglePointer< JTimesliceRouter > timesliceRouter
Auxiliary data structure for average.
Definition: JKatoomba_t.hh:76
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:298
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
Auxiliary class for date and time.
Definition: JDateAndTime.hh:78
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:73
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:780
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:328
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:60
Data structure for input to trigger algorithm.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:742
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
double u[N+1]
Definition: JPolint.hh:865
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:365
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:409
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:703
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:330
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:334
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:766
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:430
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.