Jpp  17.1.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <deque>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JDetector/JDetector.hh"
30 #include "JTrigger/JHit.hh"
31 #include "JTrigger/JHitToolkit.hh"
34 #include "JTrigger/JTimeslice.hh"
35 #include "JTrigger/JHitL0.hh"
36 #include "JTrigger/JHitL1.hh"
37 #include "JTrigger/JBuildL1.hh"
38 #include "JTrigger/JBuildL2.hh"
42 #include "JTrigger/JTriggerNB.hh"
43 #include "JTrigger/JTriggerBits.hh"
47 #include "JTrigger/JTimesliceL1.hh"
50 #include "JTrigger/JChecksum.hh"
53 #include "JNet/JControlHost.hh"
55 #include "JNet/JSocket.hh"
56 #include "JNet/JSocketChannel.hh"
57 #include "JNet/JServerSocket.hh"
58 #include "JPhysics/JConstants.hh"
59 #include "JTools/JQuantile.hh"
60 #include "JSupport/JSupport.hh"
62 #include "JSupport/JMeta.hh"
63 #include "JSystem/JTime.hh"
65 #include "JSystem/JNetwork.hh"
66 
67 
68 namespace JNET {
69 
70  /**
71  * Get size of packeet.
72  *
73  * \param preamble DAQ data preamble
74  * \return size [B]
75  */
76  template<>
78  {
79  return preamble.getLength();
80  }
81 }
82 
83 namespace KM3NETDAQ {
84 
85  using namespace JPP;
86 
87  /**
88  * Main class for real-time filtering of data.
89  *
90  * This class implements the action methods for each transition of the state machine.\n
91  * When the state machine is entered, data are continually collected using
92  * custom implementation of virtual methods
93  * setSelect and
94  * actionSelect.
95  *
96  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
97  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
98  * When a JDAQTimeslice is complete,
99  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
100  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
101  * which, together with other parameters, is parsed in method actionConfigure.\n
102  * <pre>
103  * numberOfFramesPerSlice = frames_per_slice = 1;
104  * </pre>
105  * Note that this parameter can change during operation (see below).
106  *
107  * A timeout may occur when the total amount of data or the number of incomplete time slices
108  * in the queue exceeds one of the corresponding pre-set limits:\n
109  * <pre>
110  * queueSize = <maximal queue size [B]>;
111  * queueDepth = <maximal queue depth>;
112  * </pre>
113  * Note that these values apply per JDataFilter.\n
114  *
115  * When a timeout occurs,
116  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
117  * When data are received with a frame index below the current frame index,
118  * the parameter <tt>frames_per_slice</tt> is incremented by one,
119  * up to the original value set at actionConfigure.\n
120  *
121  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
122  * The following parameters can be used to configure the circular buffer.\n
123  * <pre>
124  * path = <write directory for circular buffer>;
125  * c_sizeL0 = <L0 buffer size>;
126  * c_sizeL1 = <L1 buffer size>;
127  * c_sizeL2 = <L2 buffer size>;
128  * c_sizeSN = <SN buffer size>;
129  * </pre>
130  *
131  * Note that when path is set to a valid directory:
132  * - There will always be a file created which is closed when the state machine is exited;
133  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
134  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
135  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
136  * - After closure of the file, a new file will be opened if path is set;
137  *
138  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
139  * the configuration or alert and <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
140  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
141  * (e.g. when there is more than one alert on a given day).
142  *
143  * The application JConvert.cc can be used to readily convert multiple input files to a single output file.
144  *
145  * The script JDataFilter.sh can be used to test this application.
146  */
147  class JDataFilter :
148  public JDAQClient
149  {
150  public:
151 
154 
155  typedef double hit_type;
161 
162  /**
163  * Circular buffer.
164  */
166  public JTreeRecorder<JDAQTimesliceTypes_t>
167  {
169 
170  /**
171  * Default constructor.
172  */
174  {
175  disable();
176  }
177 
178  /**
179  * Open file.
180  *
181  * \param path directory
182  * \param tag unique process tag
183  */
184  void open(const std::string& path, const JTag& tag)
185  {
186  using namespace std;
187 
188  const JDateAndTime cal;
189 
190  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
191 
192  ostringstream os;
193 
194  os << getFullPath(path)
195  << "KM3NeT"
196  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
197  << "_" << tag;
198 
199  if (i != 0) {
200  os << "_" << i;
201  }
202 
203  os << ".root";
204 
205  try {
206  this->open(os.str().c_str());
207  }
208  catch(const exception& error) {}
209  }
210  }
211 
212  /**
213  * Disable writing.
214  */
215  void disable()
216  {
217  sizeL0 = 0;
218  sizeL1 = 0;
219  sizeL2 = 0;
220  sizeSN = 0;
221  }
222 
223  /**
224  * Check whether writing of data is enabled.
225  *
226  * \return true if writing enabled; else false
227  */
228  bool is_enabled() const
229  {
230  return (sizeL0 > 0 ||
231  sizeL1 > 0 ||
232  sizeL2 > 0 ||
233  sizeSN > 0);
234  }
235 
236  /**
237  * Write circular buffer to output stream.
238  *
239  * \param out output stream
240  * \param object circular buffer
241  * \return output stream
242  */
243  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
244  {
245  if (object.is_open())
246  out << object.getFile()->GetName();
247  else
248  out << "void";
249 
250  out << ' ';
251 
252  return out << object.sizeL0 << '/'
253  << object.sizeL1 << '/'
254  << object.sizeL2 << '/'
255  << object.sizeSN << '/';
256  }
257 
258  Long64_t sizeL0; //!< Number of L0 time slices
259  Long64_t sizeL1; //!< Number of L1 time slices
260  Long64_t sizeL2; //!< Number of L2 time slices
261  Long64_t sizeSN; //!< Number of SN time slices
262  };
263 
264 
265  /**
266  * Sort DAQ process by index.
267  *
268  * \param first first DAQ process
269  * \param second second DAQ process
270  * \return true if index of first DAQ process less than that of second; else false
271  */
272  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
273  {
274  return first.index < second.index;
275  }
276 
277 
278  /**
279  * Constructor.
280  *
281  * \param name name of client
282  * \param server name of command message server
283  * \param hostname name of data server
284  * \param logger pointer to logger
285  * \param level debug level
286  * \param port server port
287  * \param backlog server backlog
288  * \param buffer_size server buffer
289  * \param path directory for writing circular buffer
290  */
291  JDataFilter(const std::string& name,
292  const std::string& server,
293  const std::string& hostname,
294  JLogger* logger,
295  const int level,
296  const int port,
297  const int backlog,
298  const int buffer_size,
299  const std::string& path) :
300  JDAQClient (name,server,logger,level),
301  hostname (hostname),
302  port (port),
303  backlog (backlog),
304  buffer_size(buffer_size),
305  path (path)
306  {
307  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
308 
309  addSubscription(JSubscriptionAll(RC_ALERT));
310 
311  totalCPURAM = getRAM();
312  current_slice_index = -1;
313  reporting = false;
314  }
315 
316 
317  virtual void actionEnter() override
318  {}
319 
320 
321  virtual void actionExit() override
322  {
323  if (c_buffer.is_open()) {
324 
325  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
326 
327  c_buffer.close();
328  }
329 
330  datawriter.reset();
331  }
332 
333 
334  virtual void actionInit(int length, const char* buffer) override
335  {
336  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
337 
338  try {
339 
340  JDebugStream(logger) << "Start server.";
341 
342  if (serversocket.is_valid()) {
343  serversocket->shutdown();
344  }
345 
346  serversocket.reset(new JServerSocket(port,backlog));
347  }
348  catch(const std::exception& error) {
349  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
350  ev_error();
351  }
352  }
353 
354 
355  virtual void actionConfigure(int length, const char* buffer) override
356  {
357  using namespace std;
358 
359  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
360 
361  long long int update_s = 10;
362  long long int logger_s = 5;
363 
364  parameters .reset();
365  dataFilters.clear();
366  dataQueues .clear();
367 
368  reporting = false;
369  dumpCount = 0;
370  dumpLimit = numeric_limits<int>::max();
371 
372  detector.clear();
373 
374  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
375 
376  properties["dataWriter"] = hostname;
377  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
378  properties["detector"] = detector;
379  properties["triggerParameters"] = parameters;
380  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
381  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
382  properties["frameIndex"] = maximal_frame_index = 100000;
383  properties["logger_s"] = logger_s;
384  properties["update_s"] = update_s;
385  properties["JDataFilter"] = dataFilters;
386  properties["DataQueue"] = dataQueues;
387  properties["path"] = path;
388  properties["c_sizeL0"] = c_buffer.sizeL0;
389  properties["c_sizeL1"] = c_buffer.sizeL1;
390  properties["c_sizeL2"] = c_buffer.sizeL2;
391  properties["c_sizeSN"] = c_buffer.sizeSN;
392  properties["dumpLimit"] = dumpLimit;
393 
394  try {
395  properties.read(string(buffer, length));
396  }
397  catch(const exception& error) {
398  JErrorStream(logger) << error.what();
399  }
400 
401  if (update_s <= 0) { update_s = 1; }
402  if (logger_s <= 0) { logger_s = 1; }
403 
404  setClockInterval(update_s * 1000000LL);
405 
406  hostname = trim(hostname);
407 
408  if (hostname != "")
409  datawriter.reset(new JControlHost_t(hostname));
410  else
411  throw JException("Undefined data writer host name.");
412 
413  maximum_frames_per_slice = frames_per_slice;
414 
415  // process processlist
416 
417  if (dataFilters.empty()) {
418  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
419  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
420  }
421 
422  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
423 
424  unsigned int numberOfDataFiltersOnThisMachine = 0;
425  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
426 
428 
429  {
430  JNoticeStream notice(logger);
431 
432  notice << "My IP addresses:";
433 
434  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
435  notice << ' ' << *i;
436  }
437  }
438 
439  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
440 
441  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
442 
443  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
444 
445  numberOfDataFiltersOnThisMachine++;
446 
447  if (i->port == this->port) {
448  thisProcess = i;
449  }
450  }
451  }
452 
453  if (numberOfDataFiltersOnThisMachine == 0) {
454  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
455  << "Assuming one datafilter on this machine.";
456  numberOfDataFiltersOnThisMachine = 1;
457  }
458 
459  if (thisProcess == dataFilters.end()) {
460  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
461  }
462 
463  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
464  JErrorStream(logger) << "Mismatch between given process names: "
465  << "I am called " << getName()
466  << ", but in the process list I am referred to as " << thisProcess->index;
467  }
468 
469  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
470  reporting = true;
471  }
472 
473  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
474 
475  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
476 
477  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
478  << "Queue size reduced to "
479  << maxQueueSize << " bytes." ;
480  }
481 
482  // detector
483 
484  if (parameters.disableHighRateVeto) {
485 
486  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
487 
488  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
489  }
490 
491  // trigger parameters
492 
494 
495  triggerNB .reset(new JTriggerNB (parameters));
496  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
497  trigger3DShower.reset(new JTrigger3DShower(parameters));
498  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
499 
500  moduleRouter.reset(new JModuleRouter(detector));
501 
502  if (reporting) {
503  JNoticeStream(logger) << "This data filter process will report.";
504  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
505  JDebugStream (logger) << "Trigger parameters: " << parameters;
506  JDebugStream (logger) << "Detector description: " << endl << detector;
507  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
508  }
509 
510  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
511 
512  // set L1, L2 and SN builders
513 
514  buildL1.reset(new JBuildL1_t(parameters));
515  buildL2.reset(new JBuildL2_t(parameters.L2));
516  buildSN.reset(new JBuildL2_t(parameters.SN));
517  buildNB.reset(new JBuildL2_t(parameters.NB));
518 
519  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
520  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
521  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
522  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
523 
524  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
525  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
526  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
527  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
528 
529  if (c_buffer.is_enabled()) {
530 
531  if (!c_buffer.is_open()) {
532 
533  c_buffer.open(path, getUniqueTag());
534 
535  if (c_buffer.is_open()) {
536 
537  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
538 
539  putObject(c_buffer.getFile(), meta);
540 
541  } else {
542 
543  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
544 
545  c_buffer.disable();
546  }
547 
548  } else {
549 
550  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
551  }
552 
553  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
554  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
555  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
556  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
557 
558  } else {
559 
560  if (c_buffer.is_open()) {
561  c_buffer.close();
562  }
563  }
564  }
565 
566 
567  virtual void actionStart(int length, const char* buffer) override
568  {
569  using namespace std;
570 
571  if (reporting) {
572  JNoticeStream(logger) << "Start run " << getRunNumber();
573  }
574 
575  timeslices.clear();
576 
577  current_slice_index = -1;
578  queueSize = 0;
579 
580  numberOfEvents = 0;
581  numberOfBytes = 0;
582  numberOfTimeslicesProcessed = 0;
583  numberOfIncompleteTimeslicesProcessed = 0;
584 
585  number_of_packets_received = 0;
586  number_of_packets_discarded = 0;
587  number_of_bytes_received = 0;
588  number_of_reads = 0;
589 
590  minFrameNumber = numeric_limits<int>::max();
591  maxFrameNumber = numeric_limits<int>::min();
592 
593  // Reset global trigger counter.
594 
596 
597  logErrorRun .reset();
598  logErrorDetector .reset();
599  logErrorIndex .reset();
600  logErrorIncomplete.reset();
601 
602  timer.reset();
603  timer.start();
604 
605  Qt.reset();
606 
607  // send trigger parameters to the datawriter
608 
609  ostringstream os;
610 
611  os << getRunNumber() << ' ' << parameters;
612 
613  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
614  }
615 
616 
617  virtual void actionPause(int length, const char* buffer) override
618  {
619  using namespace std;
620 
621  if (!timeslices.empty()) {
622 
623  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
624 
625  for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
626  queueSize -= getSizeof(*i);
627  }
628 
629  timeslices.clear();
630  }
631 
632  { // force clearance of memory
633 
634  deque<JDAQTimesliceL0> buffer;
635 
636  timeslices.swap(buffer);
637  }
638 
639  if (queueSize != 0) {
640  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
641  }
642 
643  current_slice_index = -1;
644  queueSize = 0;
645 
646  timer.stop();
647  }
648 
649 
650  virtual void actionContinue(int length, const char* buffer) override
651  {
652  timer.start();
653  }
654 
655 
656  virtual void actionStop(int length, const char* buffer) override
657  {
658  typeout();
659 
660  datawriter.reset();
661  }
662 
663 
664  virtual void actionReset(int length, const char* buffer) override
665  {
666  if (serversocket.is_valid()) {
667  serversocket->shutdown();
668  }
669 
670  serversocket.reset();
671  }
672 
673 
674  virtual void actionQuit(int length, const char* buffer) override
675  {
676  datawriter.reset();
677  }
678 
679 
680  virtual void setSelect(JFileDescriptorMask& mask) const override
681  {
682  if (serversocket.is_valid()) {
683  mask.set(*serversocket);
684  }
685 
686  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
687  if (!channel->isReady()) {
688  mask.set(channel->getFileDescriptor());
689  }
690  }
691  }
692 
693 
694  virtual void actionSelect(const JFileDescriptorMask& mask) override
695  {
696  using namespace std;
697 
698  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
699 
700  try {
701 
702  if (mask.has(channel->getFileDescriptor())) {
703  channel->read();
704  }
705 
706  if (channel->isReady()) {
707 
708  number_of_packets_received += 1;
709  number_of_reads += channel->getCounter();
710  number_of_bytes_received += channel->size();
711 
712  if (isRunning()) {
713 
714  updateFrameQueue(channel);
715 
716  } else {
717 
718  JErrorStream(logErrorRun) << "Receiving data while not running.";
719 
720  number_of_packets_discarded += 1;
721  }
722 
723  channel->reset();
724  }
725 
726  ++channel;
727  }
728  catch(const exception& error) {
729 
730  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
731 
732  channel->shutdown();
733 
734  channel = channelList.erase(channel);
735  }
736  }
737 
738 
739  if (serversocket.is_valid()) {
740 
741  if (mask.has(*serversocket)) {
742 
743  JSocket socket;
744 
745  socket.accept(serversocket->getFileDescriptor());
746 
747  //socket.setSendBufferSize (buffer_size);
749 
750  socket.setKeepAlive (true);
751  socket.setNonBlocking(true);
752 
753  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
754 
755  channelList.push_back(JSocketInputChannel_t(socket));
756  }
757  }
758 
759 
760  if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) ||
761  (timeslices.size() >= 2u &&
762  timeslices[1].size() >= frames_per_slice) ||
763  (timeslices.size() >= maxQueueDepth) ||
764  (queueSize >= maxQueueSize))) {
765 
766  const JDAQTimesliceL0& pending_slice = timeslices.front();
767  queueSize -= getSizeof(pending_slice);
768 
769  current_slice_index = pending_slice.getFrameIndex();
770  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
771  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
772 
773  if (pending_slice.size() > frames_per_slice) {
774 
775  JErrorStream(logger) << "More frames in timeslice than expected "
776  << pending_slice.size() << " > " << frames_per_slice;
777 
778  if (pending_slice.size() <= maximum_frames_per_slice) {
779 
780  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
781 
782  frames_per_slice = pending_slice.size();
783  }
784  }
785 
786  if (!pending_slice.empty()) {
787 
788  const localtime_t t0 = getLocalTime();
789 
790  processTimeSlice(pending_slice);
791 
792  const localtime_t t1 = getLocalTime();
793 
794  numberOfTimeslicesProcessed += 1;
795 
796  Qt.put(t1 - t0);
797 
798  if (pending_slice.size() < frames_per_slice) {
799 
800  numberOfIncompleteTimeslicesProcessed += 1;
801 
802  ostringstream error;
803 
804  error << "Timeout -> processed incomplete timeslice: "
805  << "Frame index = " << pending_slice.getFrameIndex() << ';'
806  << "Size of timeslice = " << pending_slice.size() << ';'
807  << "Queue depth = " << timeslices.size() << ';'
808  << "Queue size = " << queueSize << ';';
809 
810  if ((timeslices.size() >= 2u &&
811  timeslices[1].size() >= frames_per_slice)) {
812 
813  error << " intermittent problem -> continues as-is";
814 
815  } else {
816 
817  error << " adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
818 
819  frames_per_slice = pending_slice.size();
820  }
821 
822  JErrorStream(logErrorIncomplete) << error.str();
823  }
824  }
825 
826  timeslices.pop_front();
827  }
828  }
829 
830 
831  virtual void actionRunning() override
832  {
833  if (reporting) {
834  typeout();
835  }
836  }
837 
838 
839  /**
840  * Update queue with data frames.
841  *
842  * Note that any discarded data will be reported.
843  *
844  * \param channel incoming data channel
845  */
846  void updateFrameQueue(const JChannelList_t::const_iterator channel)
847  {
848  using namespace std;
849 
850  JByteArrayReader in(channel->data(), channel->size());
851 
852  JDAQPreamble preamble;
853  JDAQSuperFrameHeader header;
854 
855  in >> preamble;
856  in >> header;
857 
858  if (preamble.getLength() != channel->size()) {
859 
860  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
861  << "preamble.getLength() = " << preamble.getLength() << ';'
862  << "channel->size(): " << channel->size() << ';';
863 
864  number_of_packets_discarded += 1;
865 
866  return;
867  }
868 
869  if (header.getRunNumber() != getRunNumber()) {
870 
871  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
872  << " unequal to current run " << getRunNumber()
873  << " -> Dropping frame.";
874 
875  number_of_packets_discarded += 1;
876 
877  return;
878  }
879 
880  if (header.getFrameIndex() <= current_slice_index) {
881 
882  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
883  << " -> Dropping frame.";
884 
885  number_of_packets_discarded += 1;
886 
887  if (frames_per_slice < maximum_frames_per_slice) {
888 
889  frames_per_slice++;
890 
891  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
892  }
893 
894  return;
895  }
896 
897  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
898 
899  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
900  << " -> Dropping frame.";
901 
902  number_of_packets_discarded += 1;
903 
904  return;
905  }
906 
907  deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
908 
909  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
910  ++timesliceIterator;
911  }
912 
913  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
914 
915  // The corresponding time slice already exists
916 
917  } else {
918 
919  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
920 
921  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
922 
923  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
924 
925  queueSize += getSizeof(*timesliceIterator);
926  }
927 
928  timesliceIterator->push_back(JDAQSuperFrame(header));
929 
930  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
931 
932  queueSize += getSizeof(*timesliceIterator->rbegin());
933  }
934 
935 
936  /**
937  * Process time slice.
938  *
939  * \param timeslice time slice
940  */
941  void processTimeSlice(const JDAQTimesliceL0& timeslice)
942  {
943  using namespace std;
944 
945  try {
946 
947  timesliceRouter->configure(timeslice);
948 
949  if (parameters.writeSummary()) {
950  this->put(JDAQSummaryslice(timeslice));
951  }
952 
953  if (parameters.trigger3DMuon.enabled ||
954  parameters.trigger3DShower.enabled ||
955  parameters.triggerMXShower.enabled ||
956  parameters.triggerNB.enabled ||
957  parameters.writeL0.prescale ||
958  parameters.writeL1.prescale ||
959  parameters.writeL2.prescale ||
960  parameters.writeSN.prescale ||
961  c_buffer.is_enabled()) {
962 
963  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
964  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
965  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
966  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
967  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
968  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
969 
970  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
971 
972  if (moduleRouter->hasModule(frame->getModuleID())) {
973 
974  if (!checksum(*frame)) {
975 
976  JErrorStream(logger) << "Invalid data at "
977  << "run = " << timeslice.getRunNumber() << ";"
978  << "frame index = " << timeslice.getFrameIndex() << ";"
979  << "module = " << frame->getModuleID() << ";"
980  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
981 
982  timesliceTX.push_back(*frame);
983 
984  continue;
985  }
986 
987  const JModule& module = moduleRouter->getModule(frame->getModuleID());
988  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
989 
990  // Apply high-rate veto
991 
992  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
993 
994  // L0
995 
996  timesliceL0.push_back(JSuperFrame1D_t(buffer));
997 
998  // Nano-beacon trigger
999 
1000  if (parameters.triggerNB.enabled) {
1001 
1002  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1003 
1004  if (buffer.begin() != __end) {
1005 
1006  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1007  frame->getModuleIdentifier(),
1008  module.getPosition()));
1009 
1010  JSuperFrame1D_t zbuf;
1011 
1012  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1013 
1014  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1015  }
1016  }
1017 
1018  // L1
1019 
1020  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1021  frame->getModuleIdentifier(),
1022  module.getPosition()));
1023 
1024  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1025 
1026  // L2
1027 
1028  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1029  frame->getModuleIdentifier(),
1030  module.getPosition()));
1031 
1032  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1033 
1034  // SN
1035 
1036  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1037  frame->getModuleIdentifier(),
1038  module.getPosition()));
1039 
1040  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1041 
1042  } else {
1043 
1044  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1045  }
1046  }
1047 
1048  if (!timesliceTX.empty()) {
1049 
1050  if (dumpCount < dumpLimit) {
1051 
1052  this->put(timesliceTX);
1053 
1054  dumpCount += 1;
1055  }
1056  }
1057 
1058  // Trigger
1059 
1060  if (parameters.triggerNB.enabled) {
1061 
1062  const JTriggerInput trigger_input(timesliceNB);
1063 
1064  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1065 
1066  if (parameters.triggerNB.write()) {
1067 
1068  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1069  getTriggerMask(triggerNB->getTriggerBit()),
1070  *hit,
1071  *timesliceRouter,
1072  *moduleRouter,
1073  parameters.TMaxLocal_ns,
1074  parameters.triggerNB.DMax_m,
1075  getTimeRange(parameters.triggerNB));
1076 
1077  this->put(tev);
1078  }
1079  }
1080  }
1081 
1082  JTriggerInput trigger_input(timesliceL2);
1083  JTriggerOutput trigger_output;
1084 
1085  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1086  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1087  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1088 
1089  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1090 
1091  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1092 
1093  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1094 
1095  if (this->put(object)) {
1096  numberOfEvents += 1;
1097  }
1098  }
1099 
1100  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1101 
1102  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1103 
1104  if (parameters.writeL1) { this->put(object); }
1105  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1106  }
1107 
1108  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1109 
1110  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1111 
1112  if (parameters.writeL2) { this->put(object); }
1113  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1114  }
1115 
1116  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1117 
1118  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1119 
1120  if (parameters.writeSN) { this->put(object); }
1121  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1122  }
1123 
1124  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1125 
1126  if (parameters.writeL0) { this->put(timeslice); }
1127  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1128  }
1129  }
1130 
1131  } catch(const exception& error) {
1132 
1133  JErrorStream(logger) << "Error = " << error.what() << ";"
1134  << "run = " << timeslice.getRunNumber() << ";"
1135  << "frame index = " << timeslice.getFrameIndex() << ";"
1136  << "time slice not correctly processed;"
1137  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1138 
1139  if (dumpCount < dumpLimit) {
1140 
1141  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1142 
1143  dumpCount += 1;
1144  }
1145  }
1146 
1147  timesliceRouter->reset();
1148  }
1149 
1150 
1151  /**
1152  * Report status to message logger.
1153  */
1154  void typeout()
1155  {
1156  timer.stop();
1157 
1158  const double T_us = (double) timer.usec_wall;
1159 
1160  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1161  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1162  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1163  try {
1164  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1165  }
1166  catch(const std::exception&) {}
1167  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1168  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1169 
1170  if (number_of_packets_received > 0) {
1171  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1172  }
1173 
1174  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1175  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1176 
1177  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1178 
1179  if (numberOfTimeslicesProcessed > 0) {
1180  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1181  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1182  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1183  }
1184 
1185  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1186  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1187 
1188  if (processedSlicesTime_us > 0) {
1189  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1190  }
1191  if (processedDetectorTime_us > 0) {
1192  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1193  }
1194 
1195  timer.start();
1196  }
1197 
1198 
1199  /**
1200  * Tagged action to handle alerts.
1201  *
1202  * \param tag tag
1203  * \param length number of characters
1204  * \param buffer message
1205  */
1206  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1207  {
1208  using namespace std;
1209 
1210  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1211 
1212  if (tag == RC_ALERT) {
1213 
1214  if (c_buffer.is_open()) {
1215 
1216  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1217 
1218  c_buffer.close();
1219  }
1220 
1221  if (c_buffer.is_enabled()) {
1222 
1223  c_buffer.open(path, getUniqueTag());
1224 
1225  if (c_buffer.is_open()) {
1226 
1227  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1228 
1229  putObject(c_buffer.getFile(), meta);
1230 
1231  } else {
1232 
1233  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1234 
1235  c_buffer.disable();
1236  }
1237  }
1238 
1239  } else {
1240 
1241  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1242  }
1243  }
1244 
1245  JMeta meta; //!< meta data
1246 
1247  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1248 
1249  private:
1250 
1252  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1253  std::string hostname; //!< host name of data server
1254 
1255  /**
1256  * Auxiliary method to send object to data server.
1257  *
1258  * \param object object to be sent
1259  * \return true if sent; else false
1260  */
1261  template<class T>
1262  bool put(const T& object)
1263  {
1264  try {
1265 
1266  datawriter->put(object);
1267 
1268  numberOfBytes += getSizeof(object);
1269 
1270  return true;
1271  }
1272  catch(const std::exception& error) {
1273  JErrorStream(logger) << error.what();
1274  }
1275 
1276  return false;
1277  }
1278 
1279 
1280  int port; //!< server socket port
1281  int backlog;
1283 
1284  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1285  JChannelList_t channelList; //!< connections to data queue
1286 
1287  JTimer timer;
1289 
1290  std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1292  unsigned int frames_per_slice;
1295 
1296  // trigger
1297 
1300 
1307 
1312 
1317 
1320 
1321  // process management
1322 
1325 
1326  // memory management
1327 
1328  long long int totalCPURAM;
1329  unsigned int maxQueueDepth;
1330  long long int maxQueueSize;
1331  long long int queueSize;
1332 
1333  // statistics
1334 
1336 
1337  long long int numberOfEvents;
1338  long long int numberOfBytes;
1341 
1344 
1345  // temporary
1346 
1349  long long int number_of_reads;
1351 
1352  // circular buffer
1353 
1355  std::string path; //!< directory for writing circular buffer
1356  };
1357 }
1358 
1359 /**
1360  * \file
1361  *
1362  * Application for real-time filtering of data.
1363  * For more information, see KM3NETDAQ::JDataFilter.
1364  *
1365  * \author rbruijn and mdejong
1366  */
1367 int main(int argc, char* argv[])
1368 {
1369  using namespace std;
1370  using namespace JPP;
1371  using namespace KM3NETDAQ;
1372 
1373  string server;
1374  string logger;
1375  string hostname;
1376  string client_name;
1377  int port;
1378  int backlog;
1379  int buffer_size;
1380  bool use_cout;
1381  string path;
1382  int debug;
1383 
1384 
1385  try {
1386 
1387  JParser<> zap("Application for real-time filtering of data.");
1388 
1389  zap['H'] = make_field(server) = "localhost";
1390  zap['M'] = make_field(logger) = "localhost";
1391  zap['D'] = make_field(hostname) = "";
1392  zap['u'] = make_field(client_name) = "JDataFilter";
1393  zap['P'] = make_field(port);
1394  zap['q'] = make_field(backlog) = 1024;
1395  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1396  zap['c'] = make_field(use_cout);
1397  zap['p'] = make_field(path) = "/tmp/";
1398  zap['d'] = make_field(debug) = 0;
1399 
1400  zap(argc, argv);
1401  }
1402  catch(const exception& error) {
1403  FATAL(error.what() << endl);
1404  }
1405 
1406 
1407  JLogger* out = NULL;
1408 
1409  if (use_cout)
1410  out = new JStreamLogger(cout);
1411  else
1412  out = new JControlHostLogger(logger);
1413 
1414  JDataFilter dfilter(client_name,
1415  server,
1416  hostname,
1417  out,
1418  debug,
1419  port,
1420  backlog,
1421  buffer_size,
1422  path);
1423 
1424  dfilter.meta = JMeta(argc, argv);
1425 
1426  dfilter.enter();
1427  dfilter.run();
1428 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:831
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1517
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:941
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:77
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:184
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:272
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:160
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:157
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:152
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:259
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:291
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:43
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:147
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:159
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:173
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:228
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:156
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:158
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:373
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:226
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:650
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:153
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:243
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1993
Byte array binary input.
Definition: JByteArrayIO.hh:25
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:846
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:317
Level specific message streamers.
long long int number_of_packets_received
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:674
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DOM<$WORKDIR/ev_configure_domsimulator.txt > RC_DWRT path
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:261
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:251
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:567
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:664
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:321
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:694
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:258
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:110
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:152
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:656
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
double u[N+1]
Definition: JPolint.hh:776
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:334
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:617
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:260
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
int debug
debug level
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:680
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:355
Time slice with calibrated data.
Definition: JTimeslice.hh:26