Jpp  18.0.0-rc.4
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "Jeep/JParser.hh"
14 #include "Jeep/JProperties.hh"
15 #include "Jeep/JTimer.hh"
16 #include "Jeep/JTimekeeper.hh"
17 #include "Jeep/JMessage.hh"
18 #include "Jeep/JPrint.hh"
19 #include "Jeep/JeepToolkit.hh"
26 #include "JDAQ/JDAQTags.hh"
27 #include "JDAQ/JDAQEventIO.hh"
28 #include "JDAQ/JDAQTimesliceIO.hh"
30 #include "JDetector/JDetector.hh"
31 #include "JTrigger/JHit.hh"
32 #include "JTrigger/JHitToolkit.hh"
35 #include "JTrigger/JTimeslice.hh"
36 #include "JTrigger/JHitL0.hh"
37 #include "JTrigger/JHitL1.hh"
38 #include "JTrigger/JBuildL1.hh"
39 #include "JTrigger/JBuildL2.hh"
43 #include "JTrigger/JTriggerNB.hh"
44 #include "JTrigger/JTriggerBits.hh"
48 #include "JTrigger/JTimesliceL1.hh"
51 #include "JTrigger/JChecksum.hh"
54 #include "JNet/JControlHost.hh"
56 #include "JNet/JTCPSocket.hh"
57 #include "JNet/JSocketChannel.hh"
58 #include "JNet/JServerSocket.hh"
59 #include "JPhysics/JConstants.hh"
60 #include "JTools/JQuantile.hh"
61 #include "JSupport/JSupport.hh"
63 #include "JSupport/JMeta.hh"
64 #include "JSystem/JStat.hh"
65 #include "JSystem/JTime.hh"
67 #include "JSystem/JNetwork.hh"
68 
69 
70 namespace JNET {
71 
72  /**
73  * Get size of packeet.
74  *
75  * \param preamble DAQ data preamble
76  * \return size [B]
77  */
78  template<>
80  {
81  return preamble.getLength();
82  }
83 }
84 
85 namespace KM3NETDAQ {
86 
87  using namespace JPP;
88 
89  /**
90  * Main class for real-time filtering of data.
91  *
92  * This class implements the action methods for each transition of the state machine.\n
93  * When the state machine is entered, data are continually collected using
94  * custom implementation of virtual methods
95  * setSelect and
96  * actionSelect.
97  *
98  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
99  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
100  * When a JDAQTimeslice is complete,
101  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
102  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
103  * which, together with other parameters, is parsed in method actionConfigure.\n
104  * <pre>
105  * numberOfFramesPerSlice = frames_per_slice = 1;
106  * </pre>
107  * Note that this parameter can change during operation (see below).
108  *
109  * A timeout may occur when the total amount of data or the number of incomplete time slices
110  * in the queue exceeds one of the corresponding pre-set limits:\n
111  * <pre>
112  * queueSize = <maximal queue size [B]>;
113  * queueDepth = <maximal queue depth>;
114  * </pre>
115  * Note that these values apply per JDataFilter.\n
116  *
117  * When a timeout occurs,
118  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
119  * When data are received with a frame index below the current frame index,
120  * the parameter <tt>frames_per_slice</tt> is incremented by one,
121  * up to the original value set at actionConfigure.\n
122  *
123  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
124  * The following parameters can be used to configure the circular buffer.\n
125  * <pre>
126  * path = <write directory for temporary circular buffer>;
127  * archive = <write directory for archival of circular buffer>;
128  * c_sizeL0 = <L0 buffer size>;
129  * c_sizeL1 = <L1 buffer size>;
130  * c_sizeL2 = <L2 buffer size>;
131  * c_sizeSN = <SN buffer size>;
132  * </pre>
133  *
134  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
135  * - There will always be a file created which is deleted when the state machine is exited;
136  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
137  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
138  * - After archival of the temporary file, a new temporary file will be opened;
139  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
140  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
141  *
142  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
143  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
144  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
145  * (e.g. when there is more than one alert on a given day).
146  *
147  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
148  *
149  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
150  */
151  class JDataFilter :
152  public JDAQClient
153  {
154  public:
155 
158 
159  typedef double hit_type;
165 
166 
167 
168  /**
169  * Circular buffer.
170  */
172  public JTreeRecorder<JDAQTimesliceTypes_t>
173  {
176 
177 
178  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
179 
180 
181  /**
182  * Constructor.
183  *
184  * \param path directory for temporary storage
185  * \param archive directory for permanent archival
186  * \param tag tag
187  */
189  const std::string& archive,
190  const JTag& tag) :
191  path (path),
192  archive(archive),
193  tag (tag)
194  {
195  disable();
196  }
197 
198 
199  /**
200  * Open file.
201  *
202  * If file with same name exists, remove it beforehand.
203  */
204  void open()
205  {
206  using namespace std;
207  using namespace JPP;
208 
209  gErrorIgnoreLevel = kFatal;
210 
211  std::ostringstream os;
212 
213  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
214 
215  if (getFileStatus(os.str().c_str())) {
216  std::remove(os.str().c_str());
217  }
218 
219  this->open(os.str().c_str());
220  }
221 
222 
223  /**
224  * Close file.
225  *
226  * If option is true, archive file; else delete file.
227  *
228  * \param option option
229  */
230  void close(const bool option)
231  {
232  using namespace std;
233  using namespace JPP;
234 
235  const JDateAndTime cal;
236 
237  if (this->is_open()) {
238 
239  const string file_name = this->getFile()->GetName();
240 
241  this->close();
242 
243  if (option) {
244 
245  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
246 
247  ostringstream os;
248 
249  os << getFullPath(this->archive)
250  << "KM3NeT"
251  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
252  << "_" << this->tag;
253 
254  if (i != 0) {
255  os << "_" << i;
256  }
257 
258  os << ".root";
259 
260  if (!getFileStatus(os.str().c_str())) {
261 
262  if (std::rename(file_name.c_str(), os.str().c_str()) == 0)
263  return;
264  else
265  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
266  }
267  }
268 
269  } else {
270 
271  std::remove(file_name.c_str());
272  }
273  }
274  }
275 
276 
277  /**
278  * Disable writing.
279  */
280  void disable()
281  {
282  sizeL0 = 0;
283  sizeL1 = 0;
284  sizeL2 = 0;
285  sizeSN = 0;
286  }
287 
288 
289  /**
290  * Check whether writing of data is enabled.
291  *
292  * \return true if writing enabled; else false
293  */
294  bool is_enabled() const
295  {
296  return (sizeL0 > 0 ||
297  sizeL1 > 0 ||
298  sizeL2 > 0 ||
299  sizeSN > 0);
300  }
301 
302 
303  /**
304  * Write circular buffer to output stream.
305  *
306  * \param out output stream
307  * \param object circular buffer
308  * \return output stream
309  */
310  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
311  {
312  if (object.is_open())
313  out << object.getFile()->GetName();
314  else
315  out << "void";
316 
317  out << ' ';
318 
319  out << object.sizeL0 << '/'
320  << object.sizeL1 << '/'
321  << object.sizeL2 << '/'
322  << object.sizeSN << '/';
323 
324  return out;
325  }
326 
327  Long64_t sizeL0; //!< Number of L0 time slices
328  Long64_t sizeL1; //!< Number of L1 time slices
329  Long64_t sizeL2; //!< Number of L2 time slices
330  Long64_t sizeSN; //!< Number of SN time slices
331 
332  std::string path; //!< Directory for temporary storage
333  std::string archive; //!< Directory for permanent archival
334  JTag tag; //!< Unique tag of this process
335  };
336 
337 
338  /**
339  * Sort DAQ process by index.
340  *
341  * \param first first DAQ process
342  * \param second second DAQ process
343  * \return true if index of first DAQ process less than that of second; else false
344  */
345  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
346  {
347  return first.index < second.index;
348  }
349 
350 
351  /**
352  * Constructor.
353  *
354  * \param name name of client
355  * \param server name of command message server
356  * \param hostname name of data server
357  * \param logger pointer to logger
358  * \param level debug level
359  * \param port server port
360  * \param backlog server backlog
361  * \param path directory for temporary storage
362  * \param archive directory for parmanent archival
363  */
365  const std::string& server,
366  const std::string& hostname,
367  JLogger* logger,
368  const int level,
369  const int port,
370  const int backlog,
371  const std::string& path,
372  const std::string& archive) :
373  JDAQClient (name,server,logger,level),
374  hostname (hostname),
375  port (port),
376  backlog (backlog),
377  c_buffer (path, archive, getUniqueTag())
378  {
379  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
380 
381  addSubscription(JSubscriptionAll(RC_ALERT));
382 
383  totalCPURAM = getRAM();
384  current_slice_index = -1;
385  reporting = false;
386 
387  this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
388  }
389 
390 
391  virtual void actionEnter() override
392  {}
393 
394 
395  virtual void actionExit() override
396  {
397  if (c_buffer.is_open()) {
398 
399  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
400 
401  c_buffer.close(false);
402  }
403 
404  datawriter.reset();
405  }
406 
407 
408  virtual void actionInit(int length, const char* buffer) override
409  {
410  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
411 
412  try {
413 
414  JDebugStream(logger) << "Start server.";
415 
416  if (serversocket.is_valid()) {
417  serversocket->shutdown();
418  }
419 
420  serversocket.reset(new JServerSocket(port,backlog));
421  }
422  catch(const std::exception& error) {
423  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
424  ev_error();
425  }
426  }
427 
428 
429  virtual void actionConfigure(int length, const char* buffer) override
430  {
431  using namespace std;
432 
433  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
434 
435  long long int update_s = 10;
436  long long int logger_s = 5;
437 
438  parameters .reset();
439  dataFilters.clear();
440  dataQueues .clear();
441 
442  reporting = false;
443  dumpCount = 0;
444  dumpLimit = numeric_limits<int>::max();
445 
446  detector.clear();
447 
448  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
449 
450  properties["dataWriter"] = hostname;
451  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
452  properties["detector"] = detector;
453  properties["triggerParameters"] = parameters;
454  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
455  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
456  properties["frameIndex"] = maximal_frame_index = 100000;
457  properties["logger_s"] = logger_s;
458  properties["update_s"] = update_s;
459  properties["JDataFilter"] = dataFilters;
460  properties["DataQueue"] = dataQueues;
461  properties["path"] = c_buffer.path;
462  properties["archive"] = c_buffer.archive;
463  properties["c_sizeL0"] = c_buffer.sizeL0;
464  properties["c_sizeL1"] = c_buffer.sizeL1;
465  properties["c_sizeL2"] = c_buffer.sizeL2;
466  properties["c_sizeSN"] = c_buffer.sizeSN;
467  properties["dumpLimit"] = dumpLimit;
468 
469  try {
470  properties.read(string(buffer, length));
471  }
472  catch(const exception& error) {
473  JErrorStream(logger) << error.what();
474  }
475 
476  if (update_s <= 0) { update_s = 1; }
477  if (logger_s <= 0) { logger_s = 1; }
478 
479  setClockInterval(update_s * 1000000LL);
480 
481  hostname = trim(hostname);
482 
483  if (hostname != "")
484  datawriter.reset(new JControlHost_t(hostname));
485  else
486  throw JException("Undefined data writer host name.");
487 
488  datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
489 
490  maximum_frames_per_slice = frames_per_slice;
491 
492  // process processlist
493 
494  if (dataFilters.empty()) {
495  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
496  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
497  }
498 
499  sort(dataFilters.begin(), dataFilters.end(), compare);
500 
501  unsigned int numberOfDataFiltersOnThisMachine = 0;
502  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
503 
505 
506  {
507  JNoticeStream notice(logger);
508 
509  notice << "My IP addresses:";
510 
511  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
512  notice << ' ' << *i;
513  }
514  }
515 
516  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
517 
518  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
519 
520  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
521 
522  numberOfDataFiltersOnThisMachine++;
523 
524  if (i->port == this->port) {
525  thisProcess = i;
526  }
527  }
528  }
529 
530  if (numberOfDataFiltersOnThisMachine == 0) {
531  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
532  << "Assuming one datafilter on this machine.";
533  numberOfDataFiltersOnThisMachine = 1;
534  }
535 
536  if (thisProcess == dataFilters.end()) {
537  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
538  }
539 
540  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
541  JErrorStream(logger) << "Mismatch between given process names: "
542  << "I am called " << getName()
543  << ", but in the process list I am referred to as " << thisProcess->index;
544  }
545 
546  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
547  reporting = true;
548  }
549 
550  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
551 
552  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
553 
554  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
555  << "Queue size reduced to "
556  << maxQueueSize << " bytes." ;
557  }
558 
559  // detector
560 
561  if (parameters.disableHighRateVeto) {
562 
563  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
564 
565  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
566  }
567 
568  // trigger parameters
569 
571 
572  triggerNB .reset(new JTriggerNB (parameters));
573  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
574  trigger3DShower.reset(new JTrigger3DShower(parameters));
575  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
576 
577  moduleRouter.reset(new JModuleRouter(detector));
578 
579  if (reporting) {
580  JNoticeStream(logger) << "This data filter process will report.";
581  JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
582  JDebugStream (logger) << "Trigger parameters: " << parameters;
583  JDebugStream (logger) << "Detector description: " << endl << detector;
584  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
585  }
586 
587  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
588 
589  // set L1, L2 and SN builders
590 
591  buildL1.reset(new JBuildL1_t(parameters));
592  buildL2.reset(new JBuildL2_t(parameters.L2));
593  buildSN.reset(new JBuildL2_t(parameters.SN));
594  buildNB.reset(new JBuildL2_t(parameters.NB));
595 
596  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
597  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
598  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
599  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
600 
601  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
602  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
603  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
604  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
605 
606  if (c_buffer.is_enabled()) {
607 
608  if (!c_buffer.is_open()) {
609 
610  c_buffer.open();
611 
612  if (c_buffer.is_open()) {
613 
614  putObject(c_buffer.getFile(), meta);
615 
616  JStatusStream(logger) << "Created circular buffer " << c_buffer;
617 
618  } else {
619 
620  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
621  }
622 
623  } else {
624 
625  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
626  }
627  }
628 
629  if (c_buffer.is_open()) {
630  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
631  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
632  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
633  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
634  } else {
635  c_buffer.disable();
636  }
637  }
638 
639 
640  virtual void actionStart(int length, const char* buffer) override
641  {
642  using namespace std;
643 
644  if (reporting) {
645  JNoticeStream(logger) << "Start run " << getRunNumber();
646  }
647 
648  timeslices.clear();
649 
650  current_slice_index = -1;
651  queueSize = 0;
652 
653  numberOfEvents = 0;
654  numberOfBytes = 0;
655  numberOfTimeslicesProcessed = 0;
656  numberOfIncompleteTimeslicesProcessed = 0;
657 
658  number_of_packets_received = 0;
659  number_of_packets_discarded = 0;
660  number_of_bytes_received = 0;
661  number_of_reads = 0;
662 
663  minFrameNumber = numeric_limits<int>::max();
664  maxFrameNumber = numeric_limits<int>::min();
665 
666  // Reset global trigger counter.
667 
669 
670  logErrorRun .reset();
671  logErrorDetector .reset();
672  logErrorIndex .reset();
673  logErrorIncomplete.reset();
674 
675  timer.reset();
676  timer.start();
677 
678  Qt.reset();
679 
680  // send trigger parameters to the datawriter
681 
682  ostringstream os;
683 
684  os << getRunNumber() << ' ' << parameters;
685 
686  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
687  }
688 
689 
690  virtual void actionPause(int length, const char* buffer) override
691  {
692  using namespace std;
693 
694  if (!timeslices.empty()) {
695 
696  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
697 
698  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
699  queueSize -= getSizeof(*i);
700  }
701 
702  timeslices.clear();
703  }
704 
705  { // force clearance of memory
706 
707  deque<JDAQTimesliceL0> buffer;
708 
709  timeslices.swap(buffer);
710  }
711 
712  if (queueSize != 0) {
713  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
714  }
715 
716  current_slice_index = -1;
717  queueSize = 0;
718 
719  timer.stop();
720  }
721 
722 
723  virtual void actionContinue(int length, const char* buffer) override
724  {
725  timer.start();
726  }
727 
728 
729  virtual void actionStop(int length, const char* buffer) override
730  {
731  typeout();
732 
733  datawriter.reset();
734  }
735 
736 
737  virtual void actionReset(int length, const char* buffer) override
738  {
739  if (serversocket.is_valid()) {
740  serversocket->shutdown();
741  }
742 
743  serversocket.reset();
744  }
745 
746 
747  virtual void actionQuit(int length, const char* buffer) override
748  {
749  datawriter.reset();
750  }
751 
752 
753  virtual void setSelect(JFileDescriptorMask& mask) const override
754  {
755  if (serversocket.is_valid()) {
756  mask.set(*serversocket);
757  }
758 
759  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
760  if (!channel->isReady()) {
761  mask.set(channel->getFileDescriptor());
762  }
763  }
764  }
765 
766 
767  virtual void actionSelect(const JFileDescriptorMask& mask) override
768  {
769  using namespace std;
770 
771  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
772 
773  try {
774 
775  if (mask.has(channel->getFileDescriptor())) {
776  channel->read();
777  }
778 
779  if (channel->isReady()) {
780 
781  number_of_packets_received += 1;
782  number_of_reads += channel->getCounter();
783  number_of_bytes_received += channel->size();
784 
785  if (isRunning()) {
786 
787  updateFrameQueue(channel);
788 
789  } else {
790 
791  JErrorStream(logErrorRun) << "Receiving data while not running.";
792 
793  number_of_packets_discarded += 1;
794  }
795 
796  channel->reset();
797  }
798 
799  ++channel;
800  }
801  catch(const exception& error) {
802 
803  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
804 
805  channel->shutdown();
806 
807  channel = channelList.erase(channel);
808  }
809  }
810 
811 
812  if (serversocket.is_valid()) {
813 
814  if (mask.has(*serversocket)) {
815 
816  JTCPSocket socket;
817 
818  socket.accept(serversocket->getFileDescriptor());
819 
821 
822  socket.setKeepAlive (true);
823  socket.setNonBlocking(true);
824 
825  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
826 
827  channelList.push_back(JSocketInputChannel_t(socket));
828  }
829  }
830 
831 
832  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) ||
833  (timeslices.size() >= 2u &&
834  timeslices[1].size() >= frames_per_slice) ||
835  (timeslices.size() >= maxQueueDepth) ||
836  (queueSize >= maxQueueSize))) {
837 
838  const JDAQTimesliceL0& pending_slice = timeslices.front();
839  queueSize -= getSizeof(pending_slice);
840 
841  current_slice_index = pending_slice.getFrameIndex();
842  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
843  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
844 
845  if (pending_slice.size() > frames_per_slice) {
846 
847  JErrorStream(logger) << "More frames in timeslice than expected "
848  << pending_slice.size() << " > " << frames_per_slice;
849 
850  if (pending_slice.size() <= maximum_frames_per_slice) {
851 
852  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
853 
854  frames_per_slice = pending_slice.size();
855  }
856  }
857 
858  if (!pending_slice.empty()) {
859 
860  const localtime_t t0 = getLocalTime();
861 
862  processTimeSlice(pending_slice);
863 
864  const localtime_t t1 = getLocalTime();
865 
866  numberOfTimeslicesProcessed += 1;
867 
868  Qt.put(t1 - t0);
869 
870  if (pending_slice.size() < frames_per_slice) {
871 
872  numberOfIncompleteTimeslicesProcessed += 1;
873 
874  ostringstream error;
875 
876  error << "Timeout -> processed incomplete timeslice: "
877  << "Frame index = " << pending_slice.getFrameIndex() << ';'
878  << "Size of timeslice = " << pending_slice.size() << ';'
879  << "Queue depth = " << timeslices.size() << ';'
880  << "Queue size = " << queueSize << ';';
881 
882  if ((timeslices.size() >= 2u &&
883  timeslices[1].size() >= frames_per_slice)) {
884 
885  error << " intermittent problem -> continues as-is";
886 
887  } else {
888 
889  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
890 
891  frames_per_slice = pending_slice.size();
892  }
893 
894  JErrorStream(logErrorIncomplete) << error.str();
895  }
896  }
897 
898  timeslices.pop_front();
899  }
900  }
901 
902 
903  virtual void actionRunning() override
904  {
905  if (reporting) {
906  typeout();
907  }
908  }
909 
910 
911  /**
912  * Update queue with data frames.
913  *
914  * Note that any discarded data will be reported.
915  *
916  * \param channel incoming data channel
917  */
918  void updateFrameQueue(const JChannelList_t::const_iterator channel)
919  {
920  using namespace std;
921 
922  JByteArrayReader in(channel->data(), channel->size());
923 
924  JDAQPreamble preamble;
925  JDAQSuperFrameHeader header;
926 
927  in >> preamble;
928  in >> header;
929 
930  if (preamble.getLength() != channel->size()) {
931 
932  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
933  << "preamble.getLength() = " << preamble.getLength() << ';'
934  << "channel->size(): " << channel->size() << ';';
935 
936  number_of_packets_discarded += 1;
937 
938  return;
939  }
940 
941  if (header.getRunNumber() != getRunNumber()) {
942 
943  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
944  << " != " << getRunNumber()
945  << " -> Dropping frame.";
946 
947  number_of_packets_discarded += 1;
948 
949  return;
950  }
951 
952  if (header.getFrameIndex() <= current_slice_index) {
953 
954  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
955  << " module " << header.getModuleID()
956  << " -> Dropping frame.";
957 
958  number_of_packets_discarded += 1;
959 
960  if (frames_per_slice < maximum_frames_per_slice) {
961 
962  frames_per_slice++;
963 
964  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
965  }
966 
967  return;
968  }
969 
970  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
971 
972  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
973  << " module " << header.getModuleID()
974  << " -> Dropping frame.";
975 
976  number_of_packets_discarded += 1;
977 
978  return;
979  }
980 
981  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
982 
983  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
984  ++timesliceIterator;
985  }
986 
987  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
988 
989  // The corresponding time slice already exists
990 
991  } else {
992 
993  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
994 
995  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
996 
997  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
998 
999  queueSize += getSizeof(*timesliceIterator);
1000  }
1001 
1002  timesliceIterator->push_back(JDAQSuperFrame(header));
1003 
1004  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1005 
1006  queueSize += getSizeof(*timesliceIterator->rbegin());
1007  }
1008 
1009 
1010  /**
1011  * Process time slice.
1012  *
1013  * \param timeslice time slice
1014  */
1015  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1016  {
1017  using namespace std;
1018 
1019  try {
1020 
1021  timesliceRouter->configure(timeslice);
1022 
1023  if (parameters.writeSummary()) {
1024  this->put(JDAQSummaryslice(timeslice));
1025  }
1026 
1027  if (parameters.trigger3DMuon.enabled ||
1028  parameters.trigger3DShower.enabled ||
1029  parameters.triggerMXShower.enabled ||
1030  parameters.triggerNB.enabled ||
1031  parameters.writeL0.prescale ||
1032  parameters.writeL1.prescale ||
1033  parameters.writeL2.prescale ||
1034  parameters.writeSN.prescale ||
1035  c_buffer.is_enabled()) {
1036 
1037  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1038  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1039  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1040  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1041  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1042  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1043 
1044  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1045 
1046  if (moduleRouter->hasModule(frame->getModuleID())) {
1047 
1048  if (!checksum(*frame)) {
1049 
1050  JErrorStream(logger) << "Invalid data at "
1051  << "run = " << timeslice.getRunNumber() << ";"
1052  << "frame index = " << timeslice.getFrameIndex() << ";"
1053  << "module = " << frame->getModuleID() << ";"
1054  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1055 
1056  timesliceTX.push_back(*frame);
1057 
1058  continue;
1059  }
1060 
1061  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1062  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1063 
1064  // Apply high-rate veto
1065 
1066  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1067 
1068  // L0
1069 
1070  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1071 
1072  // Nano-beacon trigger
1073 
1074  if (parameters.triggerNB.enabled) {
1075 
1076  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1077 
1078  if (buffer.begin() != __end) {
1079 
1080  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1081  frame->getModuleIdentifier(),
1082  module.getPosition()));
1083 
1084  JSuperFrame1D_t zbuf;
1085 
1086  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1087 
1088  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1089  }
1090  }
1091 
1092  // L1
1093 
1094  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1095  frame->getModuleIdentifier(),
1096  module.getPosition()));
1097 
1098  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1099 
1100  // L2
1101 
1102  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1103  frame->getModuleIdentifier(),
1104  module.getPosition()));
1105 
1106  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1107 
1108  // SN
1109 
1110  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1111  frame->getModuleIdentifier(),
1112  module.getPosition()));
1113 
1114  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1115 
1116  } else {
1117 
1118  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1119  }
1120  }
1121 
1122  if (!timesliceTX.empty()) {
1123 
1124  if (dumpCount < dumpLimit) {
1125 
1126  this->put(timesliceTX);
1127 
1128  dumpCount += 1;
1129  }
1130  }
1131 
1132  // Trigger
1133 
1134  if (parameters.triggerNB.enabled) {
1135 
1136  const JTriggerInput trigger_input(timesliceNB);
1137 
1138  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1139 
1140  if (parameters.triggerNB.write()) {
1141 
1142  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1143  getTriggerMask(triggerNB->getTriggerBit()),
1144  *hit,
1145  *timesliceRouter,
1146  *moduleRouter,
1147  parameters.TMaxLocal_ns,
1148  parameters.triggerNB.DMax_m,
1149  getTimeRange(parameters.triggerNB));
1150 
1151  this->put(tev);
1152  }
1153  }
1154  }
1155 
1156  JTriggerInput trigger_input(timesliceL2);
1157  JTriggerOutput trigger_output;
1158 
1159  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1160  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1161  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1162 
1163  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1164 
1165  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1166 
1167  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1168 
1169  this->put(object);
1170 
1171  numberOfEvents += 1;
1172  }
1173 
1174  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1175 
1176  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1177 
1178  if (parameters.writeL1) { this->put(object); }
1179  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1180  }
1181 
1182  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1183 
1184  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1185 
1186  if (parameters.writeL2) { this->put(object); }
1187  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1188  }
1189 
1190  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1191 
1192  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1193 
1194  if (parameters.writeSN) { this->put(object); }
1195  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1196  }
1197 
1198  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1199 
1200  if (parameters.writeL0) { this->put(timeslice); }
1201  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1202  }
1203  }
1204 
1205  } catch(const exception& error) {
1206 
1207  JErrorStream(logger) << "Error = " << error.what() << ";"
1208  << "run = " << timeslice.getRunNumber() << ";"
1209  << "frame index = " << timeslice.getFrameIndex() << ";"
1210  << "time slice not correctly processed;"
1211  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1212 
1213  if (dumpCount < dumpLimit) {
1214 
1215  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1216 
1217  dumpCount += 1;
1218  }
1219  }
1220 
1221  timesliceRouter->reset();
1222  }
1223 
1224 
1225  /**
1226  * Report status to message logger.
1227  */
1228  void typeout()
1229  {
1230  timer.stop();
1231 
1232  const double T_us = (double) timer.usec_wall;
1233 
1234  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1235  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1236  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1237  try {
1238  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1239  }
1240  catch(const std::exception&) {}
1241  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1242  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1243 
1244  if (number_of_packets_received > 0) {
1245  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1246  }
1247 
1248  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1249  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1250 
1251  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1252 
1253  if (numberOfTimeslicesProcessed > 0) {
1254  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1255  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1256  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1257  }
1258 
1259  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1260  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1261 
1262  if (processedSlicesTime_us > 0) {
1263  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1264  }
1265  if (processedDetectorTime_us > 0) {
1266  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1267  }
1268 
1269  timer.start();
1270  }
1271 
1272 
1273  /**
1274  * Tagged action to handle alerts.
1275  *
1276  * \param tag tag
1277  * \param length number of characters
1278  * \param buffer message
1279  */
1280  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1281  {
1282  using namespace std;
1283 
1284  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1285 
1286  if (tag == RC_ALERT) {
1287 
1288  if (c_buffer.is_open()) {
1289 
1290  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1291 
1292  c_buffer.close(true);
1293  }
1294 
1295  if (c_buffer.is_enabled()) {
1296 
1297  c_buffer.open();
1298 
1299  if (c_buffer.is_open()) {
1300 
1301  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1302 
1303  putObject(c_buffer.getFile(), meta);
1304 
1305  } else {
1306 
1307  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1308 
1309  c_buffer.disable();
1310  }
1311  }
1312 
1313  } else {
1314 
1315  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1316  }
1317  }
1318 
1319  JMeta meta; //!< meta data
1320 
1321  private:
1322 
1324  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1325  std::string hostname; //!< host name of data server
1326 
1327  /**
1328  * Auxiliary method to send object to data server.
1329  *
1330  * \param object object to be sent
1331  */
1332  template<class T>
1333  void put(const T& object)
1334  {
1335  try {
1336 
1337  datawriter->put(object);
1338 
1339  numberOfBytes += getSizeof(object);
1340  }
1341  catch(const std::exception& error) {
1342  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1343  ev_error();
1344  }
1345  }
1346 
1347 
1348  int port; //!< server socket port
1349  int backlog;
1350 
1351  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1352  JChannelList_t channelList; //!< connections to data queue
1353 
1354  JTimer timer;
1356 
1357  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1359  unsigned int frames_per_slice;
1362 
1363  // trigger
1364 
1367 
1374 
1379 
1384 
1387 
1388  // process management
1389 
1392 
1393  // memory management
1394 
1395  long long int totalCPURAM;
1396  unsigned int maxQueueDepth;
1397  long long int maxQueueSize;
1398  long long int queueSize;
1399 
1400  // statistics
1401 
1403 
1404  long long int numberOfEvents;
1405  long long int numberOfBytes;
1408 
1411 
1412  // temporary
1413 
1416  long long int number_of_reads;
1418 
1419  // circular buffer
1420 
1422  };
1423 }
1424 
1425 /**
1426  * \file
1427  *
1428  * Application for real-time filtering of data.
1429  * For more information, see KM3NETDAQ::JDataFilter.
1430  *
1431  * \author rbruijn and mdejong
1432  */
1433 int main(int argc, char* argv[])
1434 {
1435  using namespace std;
1436  using namespace JPP;
1437  using namespace KM3NETDAQ;
1438 
1439  string server;
1440  string logger;
1441  string hostname;
1442  string client_name;
1443  int port;
1444  int backlog;
1445  bool use_cout;
1446  string path;
1447  string archive;
1448  int debug;
1449 
1450 
1451  try {
1452 
1453  JParser<> zap("Application for real-time filtering of data.");
1454 
1455  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1456  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1457  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1458  zap['u'] = make_field(client_name, "client name") = "%";
1459  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1460  zap['q'] = make_field(backlog, "back log") = 1024;
1461  zap['c'] = make_field(use_cout, "print to terminal");
1462  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1463  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1464  zap['d'] = make_field(debug, "debug level") = 0;
1465 
1466  zap(argc, argv);
1467  }
1468  catch(const exception& error) {
1469  FATAL(error.what() << endl);
1470  }
1471 
1472 
1473  JLogger* out = NULL;
1474 
1475  if (use_cout)
1476  out = new JStreamLogger(cout);
1477  else
1478  out = new JControlHostLogger(logger);
1479 
1480  JDataFilter dfilter(client_name,
1481  server,
1482  hostname,
1483  out,
1484  debug,
1485  port,
1486  backlog,
1487  path,
1488  archive);
1489 
1490  dfilter.meta = JMeta(argc, argv);
1491 
1492  dfilter.enter();
1493  dfilter.run();
1494 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:903
Exception for opening of file.
Definition: JException.hh:342
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:345
Utility class to parse command line options.
Definition: JParser.hh:1514
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:79
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:230
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:164
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:161
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:156
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:328
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:65
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -&gt; JDataWriter.cc
Definition: JDAQTags.hh:30
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:696
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:82
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:151
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:687
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:188
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:332
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:163
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:294
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:160
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
Definition: getArchive.sh:42
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc &lt;- JLigier.cc
Definition: JDAQTags.hh:29
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:162
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
void accept(const int server)
Accept connection from a server.
Definition: JTCPSocket.hh:139
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:723
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:157
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:310
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc &lt;- DataQueue.cc
Definition: JDAQTags.hh:28
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
then awk string
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:918
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:391
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:334
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:747
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:330
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:131
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
TCP socket.
Definition: JTCPSocket.hh:25
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:640
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:737
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:395
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:298
Auxiliary class for date and time.
Definition: JDateAndTime.hh:78
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:71
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:767
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:327
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:389
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JTCPSocket.hh:42
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:58
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:87
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:729
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
double u[N+1]
Definition: JPolint.hh:776
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:364
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:408
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:690
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:329
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:333
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:753
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:429
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.