Jpp  18.2.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "JLang/JException.hh"
13 #include "Jeep/JParser.hh"
14 #include "Jeep/JProperties.hh"
15 #include "Jeep/JTimer.hh"
16 #include "Jeep/JTimekeeper.hh"
17 #include "Jeep/JMessage.hh"
18 #include "Jeep/JPrint.hh"
19 #include "Jeep/JeepToolkit.hh"
26 #include "JDAQ/JDAQTags.hh"
27 #include "JDAQ/JDAQEventIO.hh"
28 #include "JDAQ/JDAQTimesliceIO.hh"
30 #include "JDetector/JDetector.hh"
31 #include "JTrigger/JHit.hh"
32 #include "JTrigger/JHitToolkit.hh"
35 #include "JTrigger/JTimeslice.hh"
36 #include "JTrigger/JHitL0.hh"
37 #include "JTrigger/JHitL1.hh"
38 #include "JTrigger/JBuildL1.hh"
39 #include "JTrigger/JBuildL2.hh"
43 #include "JTrigger/JTriggerNB.hh"
44 #include "JTrigger/JTriggerBits.hh"
48 #include "JTrigger/JTimesliceL1.hh"
51 #include "JTrigger/JChecksum.hh"
54 #include "JNet/JControlHost.hh"
56 #include "JNet/JTCPSocket.hh"
57 #include "JNet/JSocketChannel.hh"
58 #include "JNet/JServerSocket.hh"
59 #include "JPhysics/JConstants.hh"
60 #include "JTools/JQuantile.hh"
61 #include "JSupport/JSupport.hh"
63 #include "JSupport/JMeta.hh"
64 #include "JSystem/JStat.hh"
65 #include "JSystem/JTime.hh"
67 #include "JSystem/JNetwork.hh"
68 #include "JSystem/JFilesystem.hh"
69 
70 
71 namespace JNET {
72 
73  /**
74  * Get size of packeet.
75  *
76  * \param preamble DAQ data preamble
77  * \return size [B]
78  */
79  template<>
81  {
82  return preamble.getLength();
83  }
84 }
85 
86 namespace KM3NETDAQ {
87 
88  using namespace JPP;
89 
90  /**
91  * Main class for real-time filtering of data.
92  *
93  * This class implements the action methods for each transition of the state machine.\n
94  * When the state machine is entered, data are continually collected using
95  * custom implementation of virtual methods
96  * setSelect and
97  * actionSelect.
98  *
99  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
100  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
101  * When a JDAQTimeslice is complete,
102  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
103  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
104  * which, together with other parameters, is parsed in method actionConfigure.\n
105  * <pre>
106  * numberOfFramesPerSlice = frames_per_slice = 1;
107  * </pre>
108  * Note that this parameter can change during operation (see below).
109  *
110  * A timeout may occur when the total amount of data or the number of incomplete time slices
111  * in the queue exceeds one of the corresponding pre-set limits:\n
112  * <pre>
113  * queueSize = <maximal queue size [B]>;
114  * queueDepth = <maximal queue depth>;
115  * </pre>
116  * Note that these values apply per JDataFilter.\n
117  *
118  * When a timeout occurs,
119  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
120  * When data are received with a frame index below the current frame index,
121  * the parameter <tt>frames_per_slice</tt> is incremented by one,
122  * up to the original value set at actionConfigure.\n
123  *
124  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
125  * The following parameters can be used to configure the circular buffer.\n
126  * <pre>
127  * path = <write directory for temporary circular buffer>;
128  * archive = <write directory for archival of circular buffer>;
129  * c_sizeL0 = <L0 buffer size>;
130  * c_sizeL1 = <L1 buffer size>;
131  * c_sizeL2 = <L2 buffer size>;
132  * c_sizeSN = <SN buffer size>;
133  * </pre>
134  *
135  * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
136  * - There will always be a file created which is deleted when the state machine is exited;
137  * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
138  * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
139  * - After archival of the temporary file, a new temporary file will be opened;
140  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
141  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
142  *
143  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
144  * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
145  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
146  * (e.g. when there is more than one alert on a given day).
147  *
148  * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
149  *
150  * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
151  */
152  class JDataFilter :
153  public JDAQClient
154  {
155  public:
156 
159 
160  typedef double hit_type;
166 
167 
168 
169  /**
170  * Circular buffer.
171  */
173  public JTreeRecorder<JDAQTimesliceTypes_t>
174  {
177 
178 
179  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
180 
181 
182  /**
183  * Constructor.
184  *
185  * \param path directory for temporary storage
186  * \param archive directory for permanent archival
187  * \param tag tag
188  */
190  const std::string& archive,
191  const JTag& tag) :
192  path (path),
193  archive(archive),
194  tag (tag)
195  {
196  disable();
197  }
198 
199 
200  /**
201  * Open file.
202  *
203  * If file with same name exists, remove it beforehand.
204  */
205  void open()
206  {
207  using namespace std;
208  using namespace JPP;
209 
210  gErrorIgnoreLevel = kFatal;
211 
212  std::ostringstream os;
213 
214  os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
215 
216  if (getFileStatus(os.str().c_str())) {
217  std::remove(os.str().c_str());
218  }
219 
220  this->open(os.str().c_str());
221  }
222 
223 
224  /**
225  * Close file.
226  *
227  * If option is true, archive file; else delete file.
228  *
229  * \param option option
230  */
231  void close(const bool option)
232  {
233  using namespace std;
234  using namespace JPP;
235 
236  const JDateAndTime cal;
237 
238  if (this->is_open()) {
239 
240  const string file_name = this->getFile()->GetName();
241 
242  this->close();
243 
244  if (option) {
245 
246  for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
247 
248  ostringstream os;
249 
250  os << getFullPath(this->archive)
251  << "KM3NeT"
252  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
253  << "_" << this->tag;
254 
255  if (i != 0) {
256  os << "_" << i;
257  }
258 
259  os << ".root";
260 
261  if (!getFileStatus(os.str().c_str())) {
262 
263  if (JSYSTEM::rename(file_name, os.str()) == 0)
264  return;
265  else
266  THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
267  }
268  }
269 
270  } else {
271 
272  std::remove(file_name.c_str());
273  }
274  }
275  }
276 
277 
278  /**
279  * Disable writing.
280  */
281  void disable()
282  {
283  sizeL0 = 0;
284  sizeL1 = 0;
285  sizeL2 = 0;
286  sizeSN = 0;
287  }
288 
289 
290  /**
291  * Check whether writing of data is enabled.
292  *
293  * \return true if writing enabled; else false
294  */
295  bool is_enabled() const
296  {
297  return (sizeL0 > 0 ||
298  sizeL1 > 0 ||
299  sizeL2 > 0 ||
300  sizeSN > 0);
301  }
302 
303 
304  /**
305  * Write circular buffer to output stream.
306  *
307  * \param out output stream
308  * \param object circular buffer
309  * \return output stream
310  */
311  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
312  {
313  if (object.is_open())
314  out << object.getFile()->GetName();
315  else
316  out << "void";
317 
318  out << ' ';
319 
320  out << object.sizeL0 << '/'
321  << object.sizeL1 << '/'
322  << object.sizeL2 << '/'
323  << object.sizeSN << '/';
324 
325  return out;
326  }
327 
328  Long64_t sizeL0; //!< Number of L0 time slices
329  Long64_t sizeL1; //!< Number of L1 time slices
330  Long64_t sizeL2; //!< Number of L2 time slices
331  Long64_t sizeSN; //!< Number of SN time slices
332 
333  std::string path; //!< Directory for temporary storage
334  std::string archive; //!< Directory for permanent archival
335  JTag tag; //!< Unique tag of this process
336  };
337 
338 
339  /**
340  * Sort DAQ process by index.
341  *
342  * \param first first DAQ process
343  * \param second second DAQ process
344  * \return true if index of first DAQ process less than that of second; else false
345  */
346  static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
347  {
348  return first.index < second.index;
349  }
350 
351 
352  /**
353  * Constructor.
354  *
355  * \param name name of client
356  * \param server name of command message server
357  * \param hostname name of data server
358  * \param logger pointer to logger
359  * \param level debug level
360  * \param port server port
361  * \param backlog server backlog
362  * \param path directory for temporary storage
363  * \param archive directory for parmanent archival
364  */
366  const std::string& server,
367  const std::string& hostname,
368  JLogger* logger,
369  const int level,
370  const int port,
371  const int backlog,
372  const std::string& path,
373  const std::string& archive) :
374  JDAQClient (name,server,logger,level),
375  hostname (hostname),
376  port (port),
377  backlog (backlog),
378  c_buffer (path, archive, getUniqueTag())
379  {
380  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
381 
382  addSubscription(JSubscriptionAll(RC_ALERT));
383 
384  totalCPURAM = getRAM();
385  current_slice_index = -1;
386  reporting = false;
387 
388  this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
389  }
390 
391 
392  virtual void actionEnter() override
393  {}
394 
395 
396  virtual void actionExit() override
397  {
398  if (c_buffer.is_open()) {
399 
400  JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
401 
402  c_buffer.close(false);
403  }
404 
405  datawriter.reset();
406  }
407 
408 
409  virtual void actionInit(int length, const char* buffer) override
410  {
411  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
412 
413  try {
414 
415  JDebugStream(logger) << "Start server.";
416 
417  if (serversocket.is_valid()) {
418  serversocket->shutdown();
419  }
420 
421  serversocket.reset(new JServerSocket(port,backlog));
422  }
423  catch(const std::exception& error) {
424  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
425  ev_error();
426  }
427  }
428 
429 
430  virtual void actionConfigure(int length, const char* buffer) override
431  {
432  using namespace std;
433 
434  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
435 
436  long long int update_s = 20;
437  long long int logger_s = 10;
438 
439  parameters .reset();
440  dataFilters.clear();
441  dataQueues .clear();
442 
443  reporting = false;
444  dumpCount = 0;
445  dumpLimit = numeric_limits<int>::max();
446 
447  detector.clear();
448 
449  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
450 
451  properties["dataWriter"] = hostname;
452  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
453  properties["detector"] = detector;
454  properties["triggerParameters"] = parameters;
455  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
456  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
457  properties["frameIndex"] = maximal_frame_index = 100000;
458  properties["logger_s"] = logger_s;
459  properties["update_s"] = update_s;
460  properties["JDataFilter"] = dataFilters;
461  properties["DataQueue"] = dataQueues;
462  properties["path"] = c_buffer.path;
463  properties["archive"] = c_buffer.archive;
464  properties["c_sizeL0"] = c_buffer.sizeL0;
465  properties["c_sizeL1"] = c_buffer.sizeL1;
466  properties["c_sizeL2"] = c_buffer.sizeL2;
467  properties["c_sizeSN"] = c_buffer.sizeSN;
468  properties["dumpLimit"] = dumpLimit;
469 
470  try {
471  properties.read(string(buffer, length));
472  }
473  catch(const std::exception& error) {
474  JErrorStream(logger) << error.what();
475  }
476 
477  if (update_s <= 0) { update_s = 20; }
478  if (logger_s <= 0) { logger_s = 10; }
479 
480  setClockInterval(update_s * 1000000LL);
481 
482  hostname = trim(hostname);
483 
484  if (hostname != "")
485  datawriter.reset(new JControlHost_t(hostname));
486  else
487  throw JException("Undefined data writer host name.");
488 
489  datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
490 
491  maximum_frames_per_slice = frames_per_slice;
492 
493  // process processlist
494 
495  if (dataFilters.empty()) {
496  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
497  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
498  }
499 
500  sort(dataFilters.begin(), dataFilters.end(), compare);
501 
502  unsigned int numberOfDataFiltersOnThisMachine = 0;
503  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
504 
506 
507  {
508  JNoticeStream notice(logger);
509 
510  notice << "My IP addresses:";
511 
512  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
513  notice << ' ' << *i;
514  }
515  }
516 
517  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
518 
519  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
520 
521  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
522 
523  numberOfDataFiltersOnThisMachine++;
524 
525  if (i->port == this->port) {
526  thisProcess = i;
527  }
528  }
529  }
530 
531  if (numberOfDataFiltersOnThisMachine == 0) {
532  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
533  << "Assuming one datafilter on this machine.";
534  numberOfDataFiltersOnThisMachine = 1;
535  }
536 
537  if (thisProcess == dataFilters.end()) {
538  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
539  }
540 
541  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
542  JErrorStream(logger) << "Mismatch between given process names: "
543  << "I am called " << getName()
544  << ", but in the process list I am referred to as " << thisProcess->index;
545  }
546 
547  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
548  reporting = true;
549  }
550 
551  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
552 
553  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
554 
555  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
556  << "Queue size reduced to "
557  << maxQueueSize << " bytes." ;
558  }
559 
560  // detector
561 
562  if (parameters.disableHighRateVeto) {
563 
564  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
565 
566  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
567  }
568 
569  // trigger parameters
570 
572 
573  triggerNB .reset(new JTriggerNB (parameters));
574  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
575  trigger3DShower.reset(new JTrigger3DShower(parameters));
576  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
577 
578  moduleRouter.reset(new JModuleRouter(detector));
579 
580  if (reporting) {
581  JNoticeStream(logger) << "This data filter process will report.";
582  JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
583  JDebugStream (logger) << "Trigger parameters: " << parameters;
584  JDebugStream (logger) << "Detector description: " << endl << detector;
585  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
586  }
587 
588  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
589 
590  // set L1, L2 and SN builders
591 
592  buildL1.reset(new JBuildL1_t(parameters));
593  buildL2.reset(new JBuildL2_t(parameters.L2));
594  buildSN.reset(new JBuildL2_t(parameters.SN));
595  buildNB.reset(new JBuildL2_t(parameters.NB));
596 
597  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
598  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
599  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
600  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
601 
602  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
603  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
604  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
605  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
606 
607  if (c_buffer.is_enabled()) {
608 
609  if (!c_buffer.is_open()) {
610 
611  c_buffer.open();
612 
613  if (c_buffer.is_open()) {
614 
615  putObject(c_buffer.getFile(), meta);
616 
617  JStatusStream(logger) << "Created circular buffer " << c_buffer;
618 
619  } else {
620 
621  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
622  }
623 
624  } else {
625 
626  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
627  }
628  }
629 
630  if (c_buffer.is_open()) {
631  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
632  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
633  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
634  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
635  } else {
636  c_buffer.disable();
637  }
638  }
639 
640 
641  virtual void actionStart(int length, const char* buffer) override
642  {
643  using namespace std;
644 
645  if (reporting) {
646  JNoticeStream(logger) << "Start run " << getRunNumber();
647  }
648 
649  timeslices.clear();
650 
651  current_slice_index = -1;
652  queueSize = 0;
653 
654  numberOfEvents = 0;
655  numberOfBytes = 0;
656  numberOfTimeslicesProcessed = 0;
657  numberOfIncompleteTimeslicesProcessed = 0;
658 
659  number_of_packets_received = 0;
660  number_of_packets_discarded = 0;
661  number_of_bytes_received = 0;
662  number_of_reads = 0;
663 
664  minFrameNumber = numeric_limits<int>::max();
665  maxFrameNumber = numeric_limits<int>::min();
666 
667  // Reset global trigger counter.
668 
670 
671  logErrorRun .reset();
672  logErrorDetector .reset();
673  logErrorIndex .reset();
674  logErrorIncomplete.reset();
675 
676  timer.reset();
677  timer.start();
678 
679  Qt.reset();
680  Qx.reset();
681 
682  // send trigger parameters to the datawriter
683 
684  ostringstream os;
685 
686  os << getRunNumber() << ' ' << parameters;
687 
688  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
689  }
690 
691 
692  virtual void actionPause(int length, const char* buffer) override
693  {
694  using namespace std;
695 
696  if (!timeslices.empty()) {
697 
698  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
699 
700  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
701  queueSize -= getSizeof(*i);
702  }
703 
704  timeslices.clear();
705  }
706 
707  { // force clearance of memory
708 
709  deque<JDAQTimesliceL0> buffer;
710 
711  timeslices.swap(buffer);
712  }
713 
714  if (queueSize != 0) {
715  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
716  }
717 
718  current_slice_index = -1;
719  queueSize = 0;
720 
721  timer.stop();
722  }
723 
724 
725  virtual void actionContinue(int length, const char* buffer) override
726  {
727  timer.start();
728  }
729 
730 
731  virtual void actionStop(int length, const char* buffer) override
732  {
733  typeout();
734 
735  datawriter.reset();
736  }
737 
738 
739  virtual void actionReset(int length, const char* buffer) override
740  {
741  if (serversocket.is_valid()) {
742  serversocket->shutdown();
743  }
744 
745  serversocket.reset();
746  }
747 
748 
749  virtual void actionQuit(int length, const char* buffer) override
750  {
751  datawriter.reset();
752  }
753 
754 
755  virtual void setSelect(JFileDescriptorMask& mask) const override
756  {
757  if (serversocket.is_valid()) {
758  mask.set(*serversocket);
759  }
760 
761  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
762  if (!channel->isReady()) {
763  mask.set(channel->getFileDescriptor());
764  }
765  }
766  }
767 
768 
769  virtual void actionSelect(const JFileDescriptorMask& mask) override
770  {
771  using namespace std;
772 
773  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
774 
775  try {
776 
777  if (mask.has(channel->getFileDescriptor())) {
778  channel->read();
779  }
780 
781  if (channel->isReady()) {
782 
783  number_of_packets_received += 1;
784  number_of_reads += channel->getCounter();
785  number_of_bytes_received += channel->size();
786 
787  if (isRunning()) {
788 
789  try {
790  updateFrameQueue(channel);
791  }
792  catch(const std::exception& error) {
793 
794  JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
795 
796  number_of_packets_discarded += 1;
797  }
798 
799  } else {
800 
801  JErrorStream(logErrorRun) << "Receiving data while not running.";
802 
803  number_of_packets_discarded += 1;
804  }
805 
806  channel->reset();
807  }
808 
809  ++channel;
810  }
811  catch(const std::exception& error) {
812 
813  if (isRunning()) {
814  JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
815  }
816 
817  channel->shutdown();
818 
819  channel = channelList.erase(channel);
820  }
821  }
822 
823 
824  if (serversocket.is_valid()) {
825 
826  if (mask.has(*serversocket)) {
827 
828  JTCPSocket socket;
829 
830  socket.accept(serversocket->getFileDescriptor());
831 
833 
834  socket.setKeepAlive (true);
835  socket.setNonBlocking(true);
836 
837  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
838 
839  channelList.push_back(JSocketInputChannel_t(socket));
840  }
841  }
842 
843 
844  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) || // normal
845 
846  (timeslices.size() >= 2u && // intermittent problem
847  timeslices[1].size() >= frames_per_slice) ||
848 
849  (timeslices.size() >= maxQueueDepth) || // timeout
850  (queueSize >= maxQueueSize))) {
851 
852  const JDAQTimesliceL0& pending_slice = timeslices.front();
853  queueSize -= getSizeof(pending_slice);
854 
855  current_slice_index = pending_slice.getFrameIndex();
856  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
857  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
858 
859  if (pending_slice.size() > frames_per_slice) {
860 
861  ostringstream error;
862 
863  error << "More frames in timeslice than expected "
864  << pending_slice.size() << " > " << frames_per_slice;
865 
866  if (pending_slice.size() <= maximum_frames_per_slice) {
867 
868  error << " adjusting expected frames per timeslice";
869 
870  frames_per_slice = pending_slice.size();
871  }
872 
873  JErrorStream(logErrorIncomplete) << error.str();
874  }
875 
876  if (!pending_slice.empty()) {
877 
878  if (isRunning()) {
879 
880  const localtime_t t0 = getLocalTime();
881 
882  processTimeSlice(pending_slice);
883 
884  const localtime_t t1 = getLocalTime();
885 
886  numberOfTimeslicesProcessed += 1;
887 
888  Qt.put(t1 - t0);
889 
890  } else {
891 
892  JErrorStream(logErrorRun) << "Skip processing of data while not running.";
893  }
894 
895  if (pending_slice.size() < frames_per_slice) {
896 
897  numberOfIncompleteTimeslicesProcessed += 1;
898 
899  ostringstream error;
900 
901  error << "Timeout -> processed incomplete timeslice: "
902  << "Frame index = " << pending_slice.getFrameIndex() << ';'
903  << "Size of timeslice = " << pending_slice.size() << ';'
904  << "Queue depth = " << timeslices.size() << ';'
905  << "Queue size = " << queueSize << ';';
906 
907  if ((timeslices.size() >= 2u &&
908  timeslices[1].size() >= frames_per_slice)) {
909 
910  error << " intermittent problem -> continues as-is";
911 
912  } else {
913 
914  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
915 
916  frames_per_slice = pending_slice.size();
917  }
918 
919  JErrorStream(logErrorIncomplete) << error.str();
920  }
921  }
922 
923  timeslices.pop_front();
924  }
925  }
926 
927 
928  virtual void actionRunning() override
929  {
930  if (reporting) {
931  typeout();
932  }
933  }
934 
935 
936  /**
937  * Update queue with data frames.
938  *
939  * Note that any discarded data will be reported.
940  *
941  * \param channel incoming data channel
942  */
943  void updateFrameQueue(const JChannelList_t::const_iterator channel)
944  {
945  using namespace std;
946 
947  JByteArrayReader in(channel->data(), channel->size());
948 
949  JDAQPreamble preamble;
950  JDAQSuperFrameHeader header;
951 
952  in >> preamble;
953  in >> header;
954 
955  if (preamble.getLength() != channel->size()) {
956 
957  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
958  << "preamble.getLength() = " << preamble.getLength() << ';'
959  << "channel->size(): " << channel->size() << ';';
960 
961  number_of_packets_discarded += 1;
962 
963  return;
964  }
965 
966  if (header.getRunNumber() != getRunNumber()) {
967 
968  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
969  << " != " << getRunNumber()
970  << " -> Dropping frame.";
971 
972  number_of_packets_discarded += 1;
973 
974  return;
975  }
976 
977  if (header.getFrameIndex() <= current_slice_index) {
978 
979  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
980  << " module " << header.getModuleID()
981  << " -> Dropping frame.";
982 
983  number_of_packets_discarded += 1;
984 
985  if (frames_per_slice < maximum_frames_per_slice) {
986 
987  frames_per_slice++;
988 
989  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
990  }
991 
992  return;
993  }
994 
995  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
996 
997  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
998  << " module " << header.getModuleID()
999  << " -> Dropping frame.";
1000 
1001  number_of_packets_discarded += 1;
1002 
1003  return;
1004  }
1005 
1006  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1007 
1008  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1009  ++timesliceIterator;
1010  }
1011 
1012  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1013 
1014  // The corresponding time slice already exists
1015 
1016  } else {
1017 
1018  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1019 
1020  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1021 
1022  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1023 
1024  queueSize += getSizeof(*timesliceIterator);
1025  }
1026 
1027  timesliceIterator->push_back(JDAQSuperFrame(header));
1028 
1029  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1030 
1031  queueSize += getSizeof(*timesliceIterator->rbegin());
1032  }
1033 
1034 
1035  /**
1036  * Process time slice.
1037  *
1038  * \param timeslice time slice
1039  */
1040  void processTimeSlice(const JDAQTimesliceL0& timeslice)
1041  {
1042  using namespace std;
1043 
1044  try {
1045 
1046  timesliceRouter->configure(timeslice);
1047 
1048  if (parameters.writeSummary()) {
1049  this->put(JDAQSummaryslice(timeslice));
1050  }
1051 
1052  if (parameters.trigger3DMuon.enabled ||
1053  parameters.trigger3DShower.enabled ||
1054  parameters.triggerMXShower.enabled ||
1055  parameters.triggerNB.enabled ||
1056  parameters.writeL0.prescale ||
1057  parameters.writeL1.prescale ||
1058  parameters.writeL2.prescale ||
1059  parameters.writeSN.prescale ||
1060  c_buffer.is_enabled()) {
1061 
1062  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1063  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1064  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1065  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1066  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1067  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1068 
1069  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1070 
1071  if (moduleRouter->hasModule(frame->getModuleID())) {
1072 
1073  if (!checksum(*frame)) {
1074 
1075  JWarningStream(logger) << "Invalid data at "
1076  << "run = " << timeslice.getRunNumber() << ";"
1077  << "frame index = " << timeslice.getFrameIndex() << ";"
1078  << "module = " << frame->getModuleID() << ";"
1079  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1080 
1081  if (dumpCount < dumpLimit) {
1082  timesliceTX.push_back(*frame);
1083  }
1084 
1085  continue;
1086  }
1087 
1088  const JModule& module = moduleRouter->getModule(frame->getModuleID());
1089  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1090 
1091  // Apply high-rate veto
1092 
1093  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1094 
1095  // L0
1096 
1097  timesliceL0.push_back(JSuperFrame1D_t(buffer));
1098 
1099  // Nano-beacon trigger
1100 
1101  if (parameters.triggerNB.enabled) {
1102 
1103  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1104 
1105  if (buffer.begin() != __end) {
1106 
1107  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1108  frame->getModuleIdentifier(),
1109  module.getPosition()));
1110 
1111  JSuperFrame1D_t zbuf;
1112 
1113  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1114 
1115  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1116  }
1117  }
1118 
1119  // L1
1120 
1121  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1122  frame->getModuleIdentifier(),
1123  module.getPosition()));
1124 
1125  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1126 
1127  // L2
1128 
1129  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1130  frame->getModuleIdentifier(),
1131  module.getPosition()));
1132 
1133  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1134 
1135  // SN
1136 
1137  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1138  frame->getModuleIdentifier(),
1139  module.getPosition()));
1140 
1141  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1142 
1143  } else {
1144 
1145  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1146  }
1147  }
1148 
1149  if (!timesliceTX.empty()) {
1150 
1151  if (dumpCount < dumpLimit) {
1152 
1153  this->put(timesliceTX);
1154 
1155  dumpCount += 1;
1156  }
1157  }
1158 
1159  // Trigger
1160 
1161  if (parameters.triggerNB.enabled) {
1162 
1163  const JTriggerInput trigger_input(timesliceNB);
1164 
1165  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1166 
1167  if (parameters.triggerNB.write()) {
1168 
1169  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1170  getTriggerMask(triggerNB->getTriggerBit()),
1171  *hit,
1172  *timesliceRouter,
1173  *moduleRouter,
1174  parameters.TMaxLocal_ns,
1175  parameters.triggerNB.DMax_m,
1176  getTimeRange(parameters.triggerNB));
1177 
1178  this->put(tev);
1179  }
1180  }
1181  }
1182 
1183  JTriggerInput trigger_input(timesliceL2);
1184  JTriggerOutput trigger_output;
1185 
1186  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1187  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1188  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1189 
1190  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1191 
1192  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1193 
1194  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1195 
1196  this->put(object);
1197 
1198  numberOfEvents += 1;
1199  }
1200 
1201  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1202 
1203  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1204 
1205  if (parameters.writeL1) { this->put(object); }
1206  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1207  }
1208 
1209  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1210 
1211  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1212 
1213  if (parameters.writeL2) { this->put(object); }
1214  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1215  }
1216 
1217  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1218 
1219  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1220 
1221  if (parameters.writeSN) { this->put(object); }
1222  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1223  }
1224 
1225  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1226 
1227  if (parameters.writeL0) { this->put(timeslice); }
1228  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1229  }
1230  }
1231 
1232  } catch(const std::exception& error) {
1233 
1234  JErrorStream(logger) << "Error = " << error.what() << ";"
1235  << "run = " << timeslice.getRunNumber() << ";"
1236  << "frame index = " << timeslice.getFrameIndex() << ";"
1237  << "time slice not correctly processed;"
1238  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1239 
1240  if (dumpCount < dumpLimit) {
1241 
1242  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1243 
1244  dumpCount += 1;
1245  }
1246  }
1247 
1248  timesliceRouter->reset();
1249  }
1250 
1251 
1252  /**
1253  * Report status to message logger.
1254  */
1255  void typeout()
1256  {
1257  timer.stop();
1258 
1259  const double T_us = (double) timer.usec_wall;
1260 
1261  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1262  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1263  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1264  try {
1265  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1266  }
1267  catch(const std::exception&) {}
1268  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1269  JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean();
1270 
1271  if (number_of_packets_received > 0) {
1272  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1273  }
1274 
1275  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1276  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1277 
1278  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1279 
1280  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1281  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1282 
1283  if (processedSlicesTime_us > 0) {
1284  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1285  }
1286  if (processedDetectorTime_us > 0) {
1287  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1288  }
1289 
1290  timer.start();
1291  }
1292 
1293 
1294  /**
1295  * Tagged action to handle alerts.
1296  *
1297  * \param tag tag
1298  * \param length number of characters
1299  * \param buffer message
1300  */
1301  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1302  {
1303  using namespace std;
1304 
1305  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1306 
1307  if (tag == RC_ALERT) {
1308 
1309  if (c_buffer.is_open()) {
1310 
1311  JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1312 
1313  c_buffer.close(true);
1314  }
1315 
1316  if (c_buffer.is_enabled()) {
1317 
1318  c_buffer.open();
1319 
1320  if (c_buffer.is_open()) {
1321 
1322  JStatusStream(logger) << "Created circular buffer " << c_buffer;
1323 
1324  putObject(c_buffer.getFile(), meta);
1325 
1326  } else {
1327 
1328  JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1329 
1330  c_buffer.disable();
1331  }
1332  }
1333 
1334  } else {
1335 
1336  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1337  }
1338  }
1339 
1340  JMeta meta; //!< meta data
1341 
1342  private:
1343 
1345  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1346  std::string hostname; //!< host name of data server
1347 
1348  /**
1349  * Auxiliary method to send object to data server.
1350  *
1351  * \param object object to be sent
1352  */
1353  template<class T>
1354  void put(const T& object)
1355  {
1356  try {
1357 
1358  const localtime_t t0 = getLocalTime();
1359 
1360  datawriter->put(object);
1361 
1362  const localtime_t t1 = getLocalTime();
1363 
1364  numberOfBytes += getSizeof(object);
1365 
1366  Qx.put(t1 - t0);
1367  }
1368  catch(const std::exception& error) {
1369  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1370  ev_error();
1371  }
1372  }
1373 
1374 
1375  int port; //!< server socket port
1376  int backlog;
1377 
1378  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1379  JChannelList_t channelList; //!< connections to data queue
1380 
1381  JTimer timer;
1383 
1384  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1386  unsigned int frames_per_slice;
1389 
1390  // trigger
1391 
1394 
1401 
1406 
1411 
1414 
1415  // process management
1416 
1419 
1420  // memory management
1421 
1422  long long int totalCPURAM;
1423  unsigned int maxQueueDepth;
1424  long long int maxQueueSize;
1425  long long int queueSize;
1426 
1427  // statistics
1428 
1430 
1431  long long int numberOfEvents;
1432  long long int numberOfBytes;
1435 
1438 
1439  // temporary
1440 
1443  long long int number_of_reads;
1445 
1446  // circular buffer
1447 
1449  };
1450 }
1451 
1452 /**
1453  * \file
1454  *
1455  * Application for real-time filtering of data.
1456  * For more information, see KM3NETDAQ::JDataFilter.
1457  *
1458  * \author rbruijn and mdejong
1459  */
1460 int main(int argc, char* argv[])
1461 {
1462  using namespace std;
1463  using namespace JPP;
1464  using namespace KM3NETDAQ;
1465 
1466  string server;
1467  string logger;
1468  string hostname;
1469  string client_name;
1470  int port;
1471  int backlog;
1472  bool use_cout;
1473  string path;
1474  string archive;
1475  int debug;
1476 
1477 
1478  try {
1479 
1480  JParser<> zap("Application for real-time filtering of data.");
1481 
1482  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1483  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1484  zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1485  zap['u'] = make_field(client_name, "client name") = "%";
1486  zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1487  zap['q'] = make_field(backlog, "back log") = 1024;
1488  zap['c'] = make_field(use_cout, "print to terminal");
1489  zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1490  zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1491  zap['d'] = make_field(debug, "debug level") = 0;
1492 
1493  zap(argc, argv);
1494  }
1495  catch(const std::exception& error) {
1496  FATAL(error.what() << endl);
1497  }
1498 
1499 
1500  JLogger* out = NULL;
1501 
1502  if (use_cout)
1503  out = new JStreamLogger(cout);
1504  else
1505  out = new JControlHostLogger(logger);
1506 
1507  JDataFilter dfilter(client_name,
1508  server,
1509  hostname,
1510  out,
1511  debug,
1512  port,
1513  backlog,
1514  path,
1515  archive);
1516 
1517  dfilter.meta = JMeta(argc, argv);
1518 
1519  dfilter.enter();
1520  dfilter.run();
1521 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:928
Exception for opening of file.
Definition: JException.hh:358
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:346
Utility class to parse command line options.
Definition: JParser.hh:1514
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
General exception.
Definition: JException.hh:24
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:80
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:678
Exceptions.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
void close(const bool option)
Close file.
Definition: JDataFilter.cc:231
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:165
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:162
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:157
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:329
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:67
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -&gt; JDataWriter.cc
Definition: JDAQTags.hh:32
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:497
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:84
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:152
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
Definition: JFilesystem.hh:49
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
Definition: JDataFilter.cc:189
std::string path
Directory for temporary storage.
Definition: JDataFilter.cc:333
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:164
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:295
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:161
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
Definition: getArchive.sh:42
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc &lt;- JLigier.cc
Definition: JDAQTags.hh:31
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:163
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
void accept(const int server)
Accept connection from a server.
Definition: JTCPSocket.hh:139
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:725
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:158
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:311
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
void close(std::istream *pf)
Close file.
Definition: JeepToolkit.hh:386
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc &lt;- DataQueue.cc
Definition: JDAQTags.hh:30
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:65
then awk string
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:943
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:392
JTag tag
Unique tag of this process.
Definition: JDataFilter.cc:335
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:749
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:331
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:131
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
static JStat getFileStatus
Function object for file status.
Definition: JStat.hh:173
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
TCP socket.
Definition: JTCPSocket.hh:25
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:641
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:739
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:396
JSinglePointer< JTimesliceRouter > timesliceRouter
Auxiliary data structure for average.
Definition: JKatoomba_t.hh:75
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:298
Auxiliary class for date and time.
Definition: JDateAndTime.hh:78
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:73
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:769
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:328
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JTCPSocket.hh:42
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:60
Data structure for input to trigger algorithm.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:87
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:731
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
double u[N+1]
Definition: JPolint.hh:776
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
Definition: JDataFilter.cc:365
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:409
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:692
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:330
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
std::string archive
Directory for permanent archival.
Definition: JDataFilter.cc:334
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:755
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:430
Time slice with calibrated data.
Definition: JTimeslice.hh:26
File status.