Jpp  17.0.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JDetector/JDetector.hh"
30 #include "JTrigger/JHit.hh"
31 #include "JTrigger/JHitToolkit.hh"
34 #include "JTrigger/JTimeslice.hh"
35 #include "JTrigger/JHitL0.hh"
36 #include "JTrigger/JHitL1.hh"
37 #include "JTrigger/JBuildL1.hh"
38 #include "JTrigger/JBuildL2.hh"
42 #include "JTrigger/JTriggerNB.hh"
43 #include "JTrigger/JTriggerBits.hh"
47 #include "JTrigger/JTimesliceL1.hh"
50 #include "JTrigger/JChecksum.hh"
53 #include "JNet/JControlHost.hh"
55 #include "JNet/JSocket.hh"
56 #include "JNet/JSocketChannel.hh"
57 #include "JNet/JServerSocket.hh"
58 #include "JPhysics/JConstants.hh"
59 #include "JTools/JQuantile.hh"
60 #include "JSupport/JSupport.hh"
62 #include "JSupport/JMeta.hh"
63 #include "JSystem/JTime.hh"
65 #include "JSystem/JNetwork.hh"
66 
67 
68 namespace JNET {
69 
70  /**
71  * Get size of packeet.
72  *
73  * \param preamble DAQ data preamble
74  * \return size [B]
75  */
76  template<>
78  {
79  return preamble.getLength();
80  }
81 }
82 
83 namespace KM3NETDAQ {
84 
85  using namespace JPP;
86 
87  /**
88  * Main class for real-time filtering of data.
89  *
90  * This class implements the action methods for each transition of the state machine.\n
91  * When the state machine is entered, data are continually collected using
92  * custom implementation of virtual methods
93  * setSelect and
94  * actionSelect.
95  *
96  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
97  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
98  * When a JDAQTimeslice is complete,
99  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
100  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
101  * which, together with other parameters, is parsed in method actionConfigure.\n
102  * <pre>
103  * numberOfFramesPerSlice = frames_per_slice = 1;
104  * </pre>
105  * Note that this parameter can change during operation (see below).
106  *
107  * A timeout may occur when the total amount of data or the number of incomplete time slices
108  * in the queue exceeds one of the corresponding pre-set limits:\n
109  * <pre>
110  * queueSize = <maximal queue size [B]>;
111  * queueDepth = <maximal queue depth>;
112  * </pre>
113  * Note that these values apply per JDataFilter.\n
114  *
115  * When a timeout occurs,
116  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
117  * When data are received with a frame index below the current frame index,
118  * the parameter <tt>frames_per_slice</tt> is incremented by one,
119  * up to the original value set at actionConfigure.\n
120  *
121  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
122  * The following parameters can be used to configure the circular buffer.\n
123  * <pre>
124  * path = <write directory for circular buffer>;
125  * c_sizeL0 = <L0 buffer size>;
126  * c_sizeL1 = <L1 buffer size>;
127  * c_sizeL2 = <L2 buffer size>;
128  * c_sizeSN = <SN buffer size>;
129  * </pre>
130  *
131  * Note that when path is set to a valid directory:
132  * - There will always be a file created which is closed when the state machine is exited;
133  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
134  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
135  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
136  * - After closure of the file, a new file will be opened if path is set;
137  *
138  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
139  * the configuration or alert and <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
140  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
141  * (e.g. when there is more than one alert on a given day).
142  *
143  * The application JConvert.cc can be used to readily convert multiple input files to a single output file.
144  *
145  * The script JDataFilter.sh can be used to test this application.
146  */
147  class JDataFilter :
148  public JDAQClient
149  {
150  public:
151 
154 
155  typedef double hit_type;
161 
162  /**
163  * Circular buffer.
164  */
166  public JTreeRecorder<JDAQTimesliceTypes_t>
167  {
169 
170  /**
171  * Default constructor.
172  */
174  {
175  disable();
176  }
177 
178  /**
179  * Open file.
180  *
181  * \param path directory
182  * \param tag unique process tag
183  */
184  void open(const std::string& path, const JTag& tag)
185  {
186  using namespace std;
187 
188  const JDateAndTime cal;
189 
190  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
191 
192  ostringstream os;
193 
194  os << getFullPath(path)
195  << "KM3NeT"
196  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
197  << "_" << tag;
198 
199  if (i != 0) {
200  os << "_" << i;
201  }
202 
203  os << ".root";
204 
205  try {
206  this->open(os.str().c_str());
207  }
208  catch(const exception& error) {}
209  }
210  }
211 
212  /**
213  * Disable writing.
214  */
215  void disable()
216  {
217  sizeL0 = 0;
218  sizeL1 = 0;
219  sizeL2 = 0;
220  sizeSN = 0;
221  }
222 
223  /**
224  * Check whether writing of data is enabled.
225  *
226  * \return true if writing enabled; else false
227  */
228  bool is_enabled() const
229  {
230  return (sizeL0 > 0 ||
231  sizeL1 > 0 ||
232  sizeL2 > 0 ||
233  sizeSN > 0);
234  }
235 
236  /**
237  * Write circular buffer to output stream.
238  *
239  * \param out output stream
240  * \param object circular buffer
241  * \return output stream
242  */
243  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
244  {
245  if (object.is_open())
246  out << object.getFile()->GetName();
247  else
248  out << "void";
249 
250  out << ' ';
251 
252  return out << object.sizeL0 << '/'
253  << object.sizeL1 << '/'
254  << object.sizeL2 << '/'
255  << object.sizeSN << '/';
256  }
257 
258  Long64_t sizeL0; //!< Number of L0 time slices
259  Long64_t sizeL1; //!< Number of L1 time slices
260  Long64_t sizeL2; //!< Number of L2 time slices
261  Long64_t sizeSN; //!< Number of SN time slices
262  };
263 
264 
265  /**
266  * Sort DAQ process by index.
267  *
268  * \param first first DAQ process
269  * \param second second DAQ process
270  * \return true if index of first DAQ process less than that of second; else false
271  */
272  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
273  {
274  return first.index < second.index;
275  }
276 
277 
278  /**
279  * Constructor.
280  *
281  * \param name name of client
282  * \param server name of command message server
283  * \param hostname name of data server
284  * \param logger pointer to logger
285  * \param level debug level
286  * \param port server port
287  * \param backlog server backlog
288  * \param buffer_size server buffer
289  * \param path directory for writing circular buffer
290  */
291  JDataFilter(const std::string& name,
292  const std::string& server,
293  const std::string& hostname,
294  JLogger* logger,
295  const int level,
296  const int port,
297  const int backlog,
298  const int buffer_size,
299  const std::string& path) :
300  JDAQClient (name,server,logger,level),
301  hostname (hostname),
302  port (port),
303  backlog (backlog),
304  buffer_size(buffer_size),
305  path (path)
306  {
307  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
308 
309  addSubscription(JSubscriptionAll(RC_ALERT));
310 
311  totalCPURAM = getRAM();
312  current_slice_index = -1;
313  reporting = false;
314  }
315 
316 
317  virtual void actionEnter() override
318  {}
319 
320 
321  virtual void actionExit() override
322  {
323  if (c_buffer.is_open()) {
324 
325  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
326 
327  c_buffer.close();
328  }
329 
330  datawriter.reset();
331  }
332 
333 
334  virtual void actionInit(int length, const char* buffer) override
335  {
336  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
337 
338  try {
339 
340  JDebugStream(logger) << "Start server.";
341 
342  if (serversocket.is_valid()) {
343  serversocket->shutdown();
344  }
345 
346  serversocket.reset(new JServerSocket(port,backlog));
347  }
348  catch(const std::exception& error) {
349  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
350  ev_error();
351  }
352  }
353 
354 
355  virtual void actionConfigure(int length, const char* buffer) override
356  {
357  using namespace std;
358 
359  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
360 
361  long long int update_s = 10;
362  long long int logger_s = 5;
363 
364  parameters .reset();
365  dataFilters.clear();
366  dataQueues .clear();
367 
368  reporting = false;
369  dumpCount = 0;
370  dumpLimit = numeric_limits<int>::max();
371 
372  detector.clear();
373 
374  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
375 
376  properties["dataWriter"] = hostname;
377  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
378  properties["detector"] = detector;
379  properties["triggerParameters"] = parameters;
380  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
381  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
382  properties["frameIndex"] = maximal_frame_index = 100000;
383  properties["logger_s"] = logger_s;
384  properties["update_s"] = update_s;
385  properties["JDataFilter"] = dataFilters;
386  properties["DataQueue"] = dataQueues;
387  properties["path"] = path;
388  properties["c_sizeL0"] = c_buffer.sizeL0;
389  properties["c_sizeL1"] = c_buffer.sizeL1;
390  properties["c_sizeL2"] = c_buffer.sizeL2;
391  properties["c_sizeSN"] = c_buffer.sizeSN;
392  properties["dumpLimit"] = dumpLimit;
393 
394  try {
395  properties.read(string(buffer, length));
396  }
397  catch(const exception& error) {
398  JErrorStream(logger) << error.what();
399  }
400 
401  if (update_s <= 0) { update_s = 1; }
402  if (logger_s <= 0) { logger_s = 1; }
403 
404  setClockInterval(update_s * 1000000LL);
405 
406  hostname = trim(hostname);
407 
408  if (hostname != "")
409  datawriter.reset(new JControlHost_t(hostname));
410  else
411  throw JException("Undefined data writer host name.");
412 
413  maximum_frames_per_slice = frames_per_slice;
414 
415  // process processlist
416 
417  if (dataFilters.empty()) {
418  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
419  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
420  }
421 
422  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
423 
424  unsigned int numberOfDataFiltersOnThisMachine = 0;
425  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
426 
428 
429  {
430  JNoticeStream notice(logger);
431 
432  notice << "My IP addresses:";
433 
434  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
435  notice << ' ' << *i;
436  }
437  }
438 
439  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
440 
441  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
442 
443  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
444 
445  numberOfDataFiltersOnThisMachine++;
446 
447  if (i->port == this->port) {
448  thisProcess = i;
449  }
450  }
451  }
452 
453  if (numberOfDataFiltersOnThisMachine == 0) {
454  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
455  << "Assuming one datafilter on this machine.";
456  numberOfDataFiltersOnThisMachine = 1;
457  }
458 
459  if (thisProcess == dataFilters.end()) {
460  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
461  }
462 
463  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
464  JErrorStream(logger) << "Mismatch between given process names: "
465  << "I am called " << getName()
466  << ", but in the process list I am referred to as " << thisProcess->index;
467  }
468 
469  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
470  reporting = true;
471  }
472 
473  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
474 
475  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
476 
477  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
478  << "Queue size reduced to "
479  << maxQueueSize << " bytes." ;
480  }
481 
482  // detector
483 
484  if (parameters.disableHighRateVeto) {
485 
486  JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
487 
488  detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
489  }
490 
491  // trigger parameters
492 
494 
495  triggerNB .reset(new JTriggerNB (parameters));
496  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
497  trigger3DShower.reset(new JTrigger3DShower(parameters));
498  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
499 
500  moduleRouter.reset(new JModuleRouter(detector));
501 
502  if (reporting) {
503  JNoticeStream(logger) << "This data filter process will report.";
504  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
505  JDebugStream (logger) << "Trigger parameters: " << parameters;
506  JDebugStream (logger) << "Detector description: " << endl << detector;
507  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
508  }
509 
510  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
511 
512  // set L1, L2 and SN builders
513 
514  buildL1.reset(new JBuildL1_t(parameters));
515  buildL2.reset(new JBuildL2_t(parameters.L2));
516  buildSN.reset(new JBuildL2_t(parameters.SN));
517  buildNB.reset(new JBuildL2_t(parameters.NB));
518 
519  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
520  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
521  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
522  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
523 
524  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
525  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
526  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
527  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
528 
529  if (c_buffer.is_enabled()) {
530 
531  if (!c_buffer.is_open()) {
532 
533  c_buffer.open(path, getUniqueTag());
534 
535  if (c_buffer.is_open()) {
536 
537  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
538 
539  putObject(c_buffer.getFile(), meta);
540 
541  } else {
542 
543  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
544 
545  c_buffer.disable();
546  }
547 
548  } else {
549 
550  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
551  }
552 
553  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
554  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
555  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
556  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
557 
558  } else {
559 
560  if (c_buffer.is_open()) {
561  c_buffer.close();
562  }
563  }
564  }
565 
566 
567  virtual void actionStart(int length, const char* buffer) override
568  {
569  using namespace std;
570 
571  if (reporting) {
572  JNoticeStream(logger) << "Start run " << getRunNumber();
573  }
574 
575  timeslices.clear();
576 
577  current_slice_index = -1;
578  queueSize = 0;
579 
580  numberOfEvents = 0;
581  numberOfBytes = 0;
582  numberOfTimeslicesProcessed = 0;
583  numberOfIncompleteTimeslicesProcessed = 0;
584 
585  number_of_packets_received = 0;
586  number_of_packets_discarded = 0;
587  number_of_bytes_received = 0;
588  number_of_reads = 0;
589 
590  minFrameNumber = numeric_limits<int>::max();
591  maxFrameNumber = numeric_limits<int>::min();
592 
593  // Reset global trigger counter.
594 
596 
597  logErrorRun .reset();
598  logErrorDetector .reset();
599  logErrorIndex .reset();
600  logErrorIncomplete.reset();
601 
602  timer.reset();
603  timer.start();
604 
605  Qt.reset();
606 
607  // send trigger parameters to the datawriter
608 
609  ostringstream os;
610 
611  os << getRunNumber() << ' ' << parameters;
612 
613  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
614  }
615 
616 
617  virtual void actionPause(int length, const char* buffer) override
618  {
619  using namespace std;
620 
621  if (!timeslices.empty()) {
622 
623  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
624 
625  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
626  queueSize -= getSizeof(*i);
627  }
628 
629  timeslices.clear();
630  }
631 
632  { // force clearance of memory
633 
635 
636  timeslices.swap(buffer);
637  }
638 
639  if (queueSize != 0) {
640  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
641  }
642 
643  current_slice_index = -1;
644  queueSize = 0;
645 
646  timer.stop();
647  }
648 
649 
650  virtual void actionContinue(int length, const char* buffer) override
651  {
652  timer.start();
653  }
654 
655 
656  virtual void actionStop(int length, const char* buffer) override
657  {
658  typeout();
659 
660  datawriter.reset();
661  }
662 
663 
664  virtual void actionReset(int length, const char* buffer) override
665  {
666  if (serversocket.is_valid()) {
667  serversocket->shutdown();
668  }
669 
670  serversocket.reset();
671  }
672 
673 
674  virtual void actionQuit(int length, const char* buffer) override
675  {
676  datawriter.reset();
677  }
678 
679 
680  virtual void setSelect(JFileDescriptorMask& mask) const override
681  {
682  if (serversocket.is_valid()) {
683  mask.set(*serversocket);
684  }
685 
686  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
687  if (!channel->isReady()) {
688  mask.set(channel->getFileDescriptor());
689  }
690  }
691  }
692 
693 
694  virtual void actionSelect(const JFileDescriptorMask& mask) override
695  {
696  using namespace std;
697 
698  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
699 
700  try {
701 
702  if (mask.has(channel->getFileDescriptor())) {
703  channel->read();
704  }
705 
706  if (channel->isReady()) {
707 
708  number_of_packets_received += 1;
709  number_of_reads += channel->getCounter();
710  number_of_bytes_received += channel->size();
711 
712  if (isRunning()) {
713 
714  updateFrameQueue(channel);
715 
716  } else {
717 
718  JErrorStream(logErrorRun) << "Receiving data while not running.";
719 
720  number_of_packets_discarded += 1;
721  }
722 
723  channel->reset();
724  }
725 
726  ++channel;
727  }
728  catch(const exception& error) {
729 
730  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
731 
732  channel->shutdown();
733 
734  channel = channelList.erase(channel);
735  }
736  }
737 
738 
739  if (serversocket.is_valid()) {
740 
741  if (mask.has(*serversocket)) {
742 
743  JSocket socket;
744 
745  socket.accept(serversocket->getFileDescriptor());
746 
747  //socket.setSendBufferSize (buffer_size);
749 
750  socket.setKeepAlive (true);
751  socket.setNonBlocking(true);
752 
753  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
754 
755  channelList.push_back(JSocketInputChannel_t(socket));
756  }
757  }
758 
759 
760  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
761  timeslices.size() >= maxQueueDepth ||
762  queueSize >= maxQueueSize)) {
763 
764  const JDAQTimesliceL0& pending_slice = timeslices.front();
765  queueSize -= getSizeof(pending_slice);
766 
767  current_slice_index = pending_slice.getFrameIndex();
768  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
769  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
770 
771  if (pending_slice.size() > frames_per_slice) {
772 
773  JErrorStream(logger) << "More frames in timeslice than expected "
774  << pending_slice.size() << " > " << frames_per_slice;
775 
776  if (pending_slice.size() <= maximum_frames_per_slice) {
777 
778  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
779 
780  frames_per_slice = pending_slice.size();
781  }
782  }
783 
784  if (!pending_slice.empty()) {
785 
786  const localtime_t t0 = getLocalTime();
787 
788  processTimeSlice(pending_slice);
789 
790  const localtime_t t1 = getLocalTime();
791 
792  numberOfTimeslicesProcessed += 1;
793 
794  Qt.put(t1 - t0);
795 
796  if (pending_slice.size() < frames_per_slice) {
797 
798  numberOfIncompleteTimeslicesProcessed += 1;
799 
800  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
801  << "Frame index = " << pending_slice.getFrameIndex() << ';'
802  << "Size of timeslice = " << pending_slice.size() << ';'
803  << "Queue depth = " << timeslices.size() << ';'
804  << "Queue size = " << queueSize;
805 
806  if (!timeslices.empty()) {
807 
808  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
809 
810  frames_per_slice = pending_slice.size();
811  }
812  }
813  }
814 
815  timeslices.pop_front();
816  }
817  }
818 
819 
820  virtual void actionRunning() override
821  {
822  if (reporting) {
823  typeout();
824  }
825  }
826 
827 
828  /**
829  * Update queue with data frames.
830  *
831  * Note that any discarded data will be reported.
832  *
833  * \param channel incoming data channel
834  */
835  void updateFrameQueue(const JChannelList_t::const_iterator channel)
836  {
837  using namespace std;
838 
839  JByteArrayReader in(channel->data(), channel->size());
840 
841  JDAQPreamble preamble;
842  JDAQSuperFrameHeader header;
843 
844  in >> preamble;
845  in >> header;
846 
847  if (preamble.getLength() != channel->size()) {
848 
849  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
850  << "preamble.getLength() = " << preamble.getLength() << ';'
851  << "channel->size(): " << channel->size() << ';';
852 
853  number_of_packets_discarded += 1;
854 
855  return;
856  }
857 
858  if (header.getRunNumber() != getRunNumber()) {
859 
860  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
861  << " unequal to current run " << getRunNumber()
862  << " -> Dropping frame.";
863 
864  number_of_packets_discarded += 1;
865 
866  return;
867  }
868 
869  if (header.getFrameIndex() <= current_slice_index) {
870 
871  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
872  << " -> Dropping frame.";
873 
874  number_of_packets_discarded += 1;
875 
876  if (frames_per_slice < maximum_frames_per_slice) {
877 
878  frames_per_slice++;
879 
880  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
881  }
882 
883  return;
884  }
885 
886  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
887 
888  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
889  << " -> Dropping frame.";
890 
891  number_of_packets_discarded += 1;
892 
893  return;
894  }
895 
896  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
897 
898  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
899  ++timesliceIterator;
900  }
901 
902  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
903 
904  // The corresponding time slice already exists
905 
906  } else {
907 
908  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
909 
910  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
911 
912  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
913 
914  queueSize += getSizeof(*timesliceIterator);
915  }
916 
917  timesliceIterator->push_back(JDAQSuperFrame(header));
918 
919  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
920 
921  queueSize += getSizeof(*timesliceIterator->rbegin());
922  }
923 
924 
925  /**
926  * Process time slice.
927  *
928  * \param timeslice time slice
929  */
930  void processTimeSlice(const JDAQTimesliceL0& timeslice)
931  {
932  using namespace std;
933 
934  try {
935 
936  timesliceRouter->configure(timeslice);
937 
938  if (parameters.writeSummary()) {
939  this->put(JDAQSummaryslice(timeslice));
940  }
941 
942  if (parameters.trigger3DMuon.enabled ||
943  parameters.trigger3DShower.enabled ||
944  parameters.triggerMXShower.enabled ||
945  parameters.triggerNB.enabled ||
946  parameters.writeL0.prescale ||
947  parameters.writeL1.prescale ||
948  parameters.writeL2.prescale ||
949  parameters.writeSN.prescale ||
950  c_buffer.is_enabled()) {
951 
952  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
953  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
954  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
955  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
956  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
957  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
958 
959  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
960 
961  if (moduleRouter->hasModule(frame->getModuleID())) {
962 
963  if (!checksum(*frame)) {
964 
965  JErrorStream(logger) << "Invalid data at "
966  << "run = " << timeslice.getRunNumber() << ";"
967  << "frame index = " << timeslice.getFrameIndex() << ";"
968  << "module = " << frame->getModuleID() << ";"
969  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
970 
971  timesliceTX.push_back(*frame);
972 
973  continue;
974  }
975 
976  const JModule& module = moduleRouter->getModule(frame->getModuleID());
977  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
978 
979  // Apply high-rate veto
980 
981  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
982 
983  // L0
984 
985  timesliceL0.push_back(JSuperFrame1D_t(buffer));
986 
987  // Nano-beacon trigger
988 
989  if (parameters.triggerNB.enabled) {
990 
991  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
992 
993  if (buffer.begin() != __end) {
994 
995  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
996  frame->getModuleIdentifier(),
997  module.getPosition()));
998 
999  JSuperFrame1D_t zbuf;
1000 
1001  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1002 
1003  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1004  }
1005  }
1006 
1007  // L1
1008 
1009  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1010  frame->getModuleIdentifier(),
1011  module.getPosition()));
1012 
1013  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1014 
1015  // L2
1016 
1017  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1018  frame->getModuleIdentifier(),
1019  module.getPosition()));
1020 
1021  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1022 
1023  // SN
1024 
1025  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1026  frame->getModuleIdentifier(),
1027  module.getPosition()));
1028 
1029  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1030 
1031  } else {
1032 
1033  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1034  }
1035  }
1036 
1037  if (!timesliceTX.empty()) {
1038 
1039  if (dumpCount < dumpLimit) {
1040 
1041  this->put(timesliceTX);
1042 
1043  dumpCount += 1;
1044  }
1045  }
1046 
1047  // Trigger
1048 
1049  if (parameters.triggerNB.enabled) {
1050 
1051  const JTriggerInput trigger_input(timesliceNB);
1052 
1053  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1054 
1055  if (parameters.triggerNB.write()) {
1056 
1057  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1058  getTriggerMask(triggerNB->getTriggerBit()),
1059  *hit,
1060  *timesliceRouter,
1061  *moduleRouter,
1062  parameters.TMaxLocal_ns,
1063  parameters.triggerNB.DMax_m,
1064  getTimeRange(parameters.triggerNB));
1065 
1066  this->put(tev);
1067  }
1068  }
1069  }
1070 
1071  JTriggerInput trigger_input(timesliceL2);
1072  JTriggerOutput trigger_output;
1073 
1074  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1075  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1076  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1077 
1078  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1079 
1080  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1081 
1082  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1083 
1084  if (this->put(object)) {
1085  numberOfEvents += 1;
1086  }
1087  }
1088 
1089  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1090 
1091  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1092 
1093  if (parameters.writeL1) { this->put(object); }
1094  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1095  }
1096 
1097  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1098 
1099  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1100 
1101  if (parameters.writeL2) { this->put(object); }
1102  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1103  }
1104 
1105  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1106 
1107  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1108 
1109  if (parameters.writeSN) { this->put(object); }
1110  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1111  }
1112 
1113  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1114 
1115  if (parameters.writeL0) { this->put(timeslice); }
1116  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1117  }
1118  }
1119 
1120  } catch(const exception& error) {
1121 
1122  JErrorStream(logger) << "Error = " << error.what() << ";"
1123  << "run = " << timeslice.getRunNumber() << ";"
1124  << "frame index = " << timeslice.getFrameIndex() << ";"
1125  << "time slice not correctly processed;"
1126  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1127 
1128  if (dumpCount < dumpLimit) {
1129 
1130  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1131 
1132  dumpCount += 1;
1133  }
1134  }
1135 
1136  timesliceRouter->reset();
1137  }
1138 
1139 
1140  /**
1141  * Report status to message logger.
1142  */
1143  void typeout()
1144  {
1145  timer.stop();
1146 
1147  const double T_us = (double) timer.usec_wall;
1148 
1149  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1150  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1151  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1152  try {
1153  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1154  }
1155  catch(const std::exception&) {}
1156  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1157  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1158 
1159  if (number_of_packets_received > 0) {
1160  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1161  }
1162 
1163  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1164  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1165 
1166  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1167 
1168  if (numberOfTimeslicesProcessed > 0) {
1169  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1170  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1171  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1172  }
1173 
1174  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1175  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1176 
1177  if (processedSlicesTime_us > 0) {
1178  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1179  }
1180  if (processedDetectorTime_us > 0) {
1181  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1182  }
1183 
1184  timer.start();
1185  }
1186 
1187 
1188  /**
1189  * Tagged action to handle alerts.
1190  *
1191  * \param tag tag
1192  * \param length number of characters
1193  * \param buffer message
1194  */
1195  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1196  {
1197  using namespace std;
1198 
1199  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1200 
1201  if (tag == RC_ALERT) {
1202 
1203  if (c_buffer.is_open()) {
1204 
1205  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1206 
1207  c_buffer.close();
1208  }
1209 
1210  if (c_buffer.is_enabled()) {
1211 
1212  c_buffer.open(path, getUniqueTag());
1213 
1214  if (c_buffer.is_open()) {
1215 
1216  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1217 
1218  putObject(c_buffer.getFile(), meta);
1219 
1220  } else {
1221 
1222  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1223 
1224  c_buffer.disable();
1225  }
1226  }
1227 
1228  } else {
1229 
1230  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1231  }
1232  }
1233 
1234  JMeta meta; //!< meta data
1235 
1236  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1237 
1238  private:
1239 
1241  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1242  std::string hostname; //!< host name of data server
1243 
1244  /**
1245  * Auxiliary method to send object to data server.
1246  *
1247  * \param object object to be sent
1248  * \return true if sent; else false
1249  */
1250  template<class T>
1251  bool put(const T& object)
1252  {
1253  try {
1254 
1255  datawriter->put(object);
1256 
1257  numberOfBytes += getSizeof(object);
1258 
1259  return true;
1260  }
1261  catch(const std::exception& error) {
1262  JErrorStream(logger) << error.what();
1263  }
1264 
1265  return false;
1266  }
1267 
1268 
1269  int port; //!< server socket port
1270  int backlog;
1272 
1273  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1274  JChannelList_t channelList; //!< connections to data queue
1275 
1277  JQuantile Qt;
1278 
1279  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1281  unsigned int frames_per_slice;
1284 
1285  // trigger
1286 
1289 
1296 
1301 
1306 
1309 
1310  // process management
1311 
1314 
1315  // memory management
1316 
1317  long long int totalCPURAM;
1318  unsigned int maxQueueDepth;
1319  long long int maxQueueSize;
1320  long long int queueSize;
1321 
1322  // statistics
1323 
1325 
1326  long long int numberOfEvents;
1327  long long int numberOfBytes;
1330 
1333 
1334  // temporary
1335 
1338  long long int number_of_reads;
1340 
1341  // circular buffer
1342 
1344  std::string path; //!< directory for writing circular buffer
1345  };
1346 }
1347 
1348 /**
1349  * \file
1350  *
1351  * Application for real-time filtering of data.
1352  * For more information, see KM3NETDAQ::JDataFilter.
1353  *
1354  * \author rbruijn and mdejong
1355  */
1356 int main(int argc, char* argv[])
1357 {
1358  using namespace std;
1359  using namespace JPP;
1360  using namespace KM3NETDAQ;
1361 
1362  string server;
1363  string logger;
1364  string hostname;
1365  string client_name;
1366  int port;
1367  int backlog;
1368  int buffer_size;
1369  bool use_cout;
1370  string path;
1371  int debug;
1372 
1373 
1374  try {
1375 
1376  JParser<> zap("Application for real-time filtering of data.");
1377 
1378  zap['H'] = make_field(server) = "localhost";
1379  zap['M'] = make_field(logger) = "localhost";
1380  zap['D'] = make_field(hostname) = "";
1381  zap['u'] = make_field(client_name) = "JDataFilter";
1382  zap['P'] = make_field(port);
1383  zap['q'] = make_field(backlog) = 1024;
1384  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1385  zap['c'] = make_field(use_cout);
1386  zap['p'] = make_field(path) = "/tmp/";
1387  zap['d'] = make_field(debug) = 0;
1388 
1389  zap(argc, argv);
1390  }
1391  catch(const exception& error) {
1392  FATAL(error.what() << endl);
1393  }
1394 
1395 
1396  JLogger* out = NULL;
1397 
1398  if (use_cout)
1399  out = new JStreamLogger(cout);
1400  else
1401  out = new JControlHostLogger(logger);
1402 
1403  JDataFilter dfilter(client_name,
1404  server,
1405  hostname,
1406  out,
1407  debug,
1408  port,
1409  backlog,
1410  buffer_size,
1411  path);
1412 
1413  dfilter.meta = JMeta(argc, argv);
1414 
1415  dfilter.enter();
1416  dfilter.run();
1417 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:820
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1500
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:930
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:77
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:184
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:272
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:160
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:157
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:152
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:259
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:291
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition: pmt_status.hh:13
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:43
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:147
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:346
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:159
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:173
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:228
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:156
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:158
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:373
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:224
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:650
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:153
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:243
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1961
Byte array binary input.
Definition: JByteArrayIO.hh:25
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:835
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:317
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:66
long long int number_of_packets_received
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:674
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DOM<$WORKDIR/ev_configure_domsimulator.txt > RC_DWRT path
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:261
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:251
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:567
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:664
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:321
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:694
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:258
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:110
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:152
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:656
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:334
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:617
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:260
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:680
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:355
Time slice with calibrated data.
Definition: JTimeslice.hh:26