Jpp - the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JPhysics/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * JDataFilter::setSelect and
93  * JDataFilter::actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
96  * When a JDAQTimeslice is complete,
97  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
98  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
99  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
100  * <pre>
101  * numberOfFramesPerSlice = frames_per_slice = 1;
102  * </pre>
103  * Note that this parameter can change during operation (see below).
104  *
105  * A timeout may occur when the total amount of data or the number of incomplete time slices
106  * in the queue exceeds the corresponding pre-set limit:\n
107  * <pre>
108  * queueSize = <maximal queue size [B]>;
109  * queueDepth = <maximal queue depth>;
110  * </pre>
111  * Note that these values apply per JDataFilter.\n
112  *
113  * When a timeout occurs,
114  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
115  * When data are received with a frame index below the current frame index,
116  * the parameter <tt>frames_per_slice</tt> is incremented by one,
117  * upto the original value set at JDataFilter::actionConfigure.\n
118  *
119  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
120  * The following parameters can be used to configure the circular buffer.\n
121  * <pre>
122  * path = <write directory for circular buffer>;
123  * c_sizeL0 = <L0 buffer size>;
124  * c_sizeL1 = <L1 buffer size>;
125  * c_sizeL2 = <L2 buffer size>;
126  * c_sizeSN = <SN buffer size>;
127  * </pre>
128  *
129  * Note that when path is set to a valid director:
130  * - There will always be a file created which is closed when the state machine is exited;
131  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
132  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
133  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
134  * - After closure of the file, a new file will be opened if path is set;
135  *
136  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
137  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
138  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
139  * (e.g. when there is more than one alert on a given day).
140  *
141  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
142  *
143  * The script JDOMandDataFilter.sh can be used to test this application.
144  */
145  class JDataFilter :
146  public JDAQClient
147  {
148  public:
149 
152 
153  typedef double hit_type;
159 
160  /**
161  * Circular buffer.
162  */
164  public JTreeRecorder<JDAQTimesliceTypes_t>
165  {
167 
168  /**
169  * Default constructor.
170  */
172  {
173  disable();
174  }
175 
176  /**
177  * Open file.
178  *
179  * \param path directory
180  * \param tag unique process tag
181  */
182  void open(const std::string& path, const JTag& tag)
183  {
184  using namespace std;
185 
186  const JDateAndTime cal;
187 
188  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
189 
190  ostringstream os;
191 
192  os << getFullPath(path)
193  << "KM3NeT"
194  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
195  << "_" << tag;
196 
197  if (i != 0) {
198  os << "_" << i;
199  }
200 
201  os << ".root";
202 
203  try {
204  this->open(os.str().c_str());
205  }
206  catch(const exception& error) {}
207  }
208  }
209 
210  /**
211  * Disable writing.
212  */
213  void disable()
214  {
215  sizeL0 = 0;
216  sizeL1 = 0;
217  sizeL2 = 0;
218  sizeSN = 0;
219  }
220 
221  /**
222  * Check whether writing of data is enabled.
223  *
224  * \return true if writing enabled; else false
225  */
226  bool is_enabled() const
227  {
228  return (sizeL0 > 0 ||
229  sizeL1 > 0 ||
230  sizeL2 > 0 ||
231  sizeSN > 0);
232  }
233 
234  /**
235  * Write circular buffer to output stream.
236  *
237  * \param out output stream
238  * \param object circular buffer
239  * \return output stream
240  */
241  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
242  {
243  if (object.is_open())
244  out << object.getFile()->GetName();
245  else
246  out << "void";
247 
248  out << ' ';
249 
250  return out << object.sizeL0 << '/'
251  << object.sizeL1 << '/'
252  << object.sizeL2 << '/'
253  << object.sizeSN << '/';
254  }
255 
256  Long64_t sizeL0; //!< Number of L0 time slices
257  Long64_t sizeL1; //!< Number of L1 time slices
258  Long64_t sizeL2; //!< Number of L2 time slices
259  Long64_t sizeSN; //!< Number of SN time slices
260  };
261 
262 
263  /**
264  * Sort DAQ process by index.
265  *
266  * \param first first DAQ process
267  * \param second second DAQ process
268  * \return true if index of first DAQ process less than that of second; else false
269  */
270  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
271  {
272  return first.index < second.index;
273  }
274 
275 
276  /**
277  * Constructor.
278  *
279  * \param name name of client
280  * \param server name of command message server
281  * \param hostname name of data server
282  * \param logger pointer to logger
283  * \param level debug level
284  * \param port server port
285  * \param backlog server backlog
286  * \param buffer_size server buffer
287  * \param path directory for writing circular buffer
288  */
289  JDataFilter(const std::string& name,
290  const std::string& server,
291  const std::string& hostname,
292  JLogger* logger,
293  const int level,
294  const int port,
295  const int backlog,
296  const int buffer_size,
297  const std::string& path) :
298  JDAQClient (name,server,logger,level),
299  hostname (hostname),
300  port (port),
301  backlog (backlog),
302  buffer_size(buffer_size),
303  path (path)
304  {
305  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
306 
307  addSubscription(JSubscriptionAll(RC_ALERT));
308 
309  totalCPURAM = getRAM();
310  current_slice_index = -1;
311  reporting = false;
312  }
313 
314 
315  virtual void actionEnter() override
316  {}
317 
318 
319  virtual void actionExit() override
320  {
321  if (c_buffer.is_open()) {
322 
323  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
324 
325  c_buffer.close();
326  }
327 
328  datawriter.reset();
329  }
330 
331 
332  virtual void actionInit(int length, const char* buffer) override
333  {
334  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
335 
336  try {
337 
338  JDebugStream(logger) << "Start server.";
339 
340  if (serversocket.is_valid()) {
341  serversocket->shutdown();
342  }
343 
344  serversocket.reset(new JServerSocket(port,backlog));
345  }
346  catch(const std::exception& error) {
347  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
348  ev_error();
349  }
350  }
351 
352 
353  virtual void actionConfigure(int length, const char* buffer) override
354  {
355  using namespace std;
356 
357  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
358 
359  long long int update_s = 10;
360  long long int logger_s = 5;
361 
362  parameters .reset();
363  dataFilters.clear();
364  dataQueues .clear();
365 
366  reporting = false;
367  dumpCount = 0;
368  dumpLimit = 1;
369 
370  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
371 
372  properties["dataWriter"] = hostname;
373  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
374  properties["detector"] = detector;
375  properties["triggerParameters"] = parameters;
376  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
377  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
378  properties["frameIndex"] = maximal_frame_index = 100000;
379  properties["logger_s"] = logger_s;
380  properties["update_s"] = update_s;
381  properties["JDataFilter"] = dataFilters;
382  properties["DataQueue"] = dataQueues;
383  properties["path"] = path;
384  properties["c_sizeL0"] = c_buffer.sizeL0;
385  properties["c_sizeL1"] = c_buffer.sizeL1;
386  properties["c_sizeL2"] = c_buffer.sizeL2;
387  properties["c_sizeSN"] = c_buffer.sizeSN;
388  properties["dumpLimit"] = dumpLimit;
389 
390  try {
391  properties.read(string(buffer, length));
392  }
393  catch(const exception& error) {
394  JErrorStream(logger) << error.what();
395  }
396 
397  if (update_s <= 0) { update_s = 1; }
398  if (logger_s <= 0) { logger_s = 1; }
399 
400  setClockInterval(update_s * 1000000LL);
401 
402  hostname = trim(hostname);
403 
404  if (hostname != "")
405  datawriter.reset(new JControlHost_t(hostname));
406  else
407  throw JException("Undefined data writer host name.");
408 
409  maximum_frames_per_slice = frames_per_slice;
410 
411  // process processlist
412 
413  if (dataFilters.empty()) {
414  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
415  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
416  }
417 
418  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
419 
420  unsigned int numberOfDataFiltersOnThisMachine = 0;
421  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
422 
424 
425  {
426  JNoticeStream notice(logger);
427 
428  notice << "My IP addresses:";
429 
430  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
431  notice << ' ' << *i;
432  }
433  }
434 
435  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
436 
437  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
438 
439  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
440 
441  numberOfDataFiltersOnThisMachine++;
442 
443  if (i->port == this->port) {
444  thisProcess = i;
445  }
446  }
447  }
448 
449  if (numberOfDataFiltersOnThisMachine == 0) {
450  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
451  << "Assuming one datafilter on this machine.";
452  numberOfDataFiltersOnThisMachine = 1;
453  }
454 
455  if (thisProcess == dataFilters.end()) {
456  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
457  }
458 
459  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
460  JErrorStream(logger) << "Mismatch between given process names: "
461  << "I am called " << getName()
462  << ", but in the process list I am referred to as " << thisProcess->index;
463  }
464 
465  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
466  reporting = true;
467  }
468 
469  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
470 
471  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
472 
473  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
474  << "Queue size reduced to "
475  << maxQueueSize << " bytes." ;
476  }
477 
478  // trigger parameters
479 
481 
482  triggerNB .reset(new JTriggerNB (parameters));
483  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
484  trigger3DShower.reset(new JTrigger3DShower(parameters));
485  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
486 
487  moduleRouter.reset(new JModuleRouter(detector));
488 
489  if (reporting) {
490  JNoticeStream(logger) << "This data filter process will report.";
491  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
492  JDebugStream (logger) << "Trigger parameters: " << parameters;
493  JDebugStream (logger) << "Detector description: " << endl << detector;
494  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
495  }
496 
497  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
498 
499  // set L1, L2 and SN builders
500 
501  buildL1.reset(new JBuildL1_t(parameters));
502  buildL2.reset(new JBuildL2_t(parameters.L2));
503  buildSN.reset(new JBuildL2_t(parameters.SN));
504  buildNB.reset(new JBuildL2_t(parameters.NB));
505 
506  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
507  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
508  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
509  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
510 
511  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
512  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
513  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
514  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
515 
516  if (c_buffer.is_enabled()) {
517 
518  if (!c_buffer.is_open()) {
519 
520  c_buffer.open(path, getUniqueTag());
521 
522  if (c_buffer.is_open()) {
523 
524  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
525 
526  putObject(c_buffer.getFile(), meta);
527 
528  } else {
529 
530  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
531 
532  c_buffer.disable();
533  }
534 
535  } else {
536 
537  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
538  }
539 
540  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
541  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
542  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
543  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
544 
545  } else {
546 
547  if (c_buffer.is_open()) {
548  c_buffer.close();
549  }
550  }
551  }
552 
553 
554  virtual void actionStart(int length, const char* buffer) override
555  {
556  using namespace std;
557 
558  if (reporting) {
559  JNoticeStream(logger) << "Start run " << getRunNumber();
560  }
561 
562  timeslices.clear();
563 
564  current_slice_index = -1;
565  queueSize = 0;
566 
567  numberOfEvents = 0;
568  numberOfBytes = 0;
569  numberOfTimeslicesProcessed = 0;
570  numberOfIncompleteTimeslicesProcessed = 0;
571 
572  number_of_packets_received = 0;
573  number_of_packets_discarded = 0;
574  number_of_bytes_received = 0;
575  number_of_reads = 0;
576 
577  minFrameNumber = numeric_limits<int>::max();
578  maxFrameNumber = numeric_limits<int>::min();
579 
580  // Reset global trigger counter.
581 
583 
584  logErrorRun .reset();
585  logErrorDetector .reset();
586  logErrorIndex .reset();
587  logErrorIncomplete.reset();
588 
589  timer.reset();
590  timer.start();
591 
592  Qt.reset();
593 
594  // send trigger parameters to the datawriter
595 
596  ostringstream os;
597 
598  os << getRunNumber() << ' ' << parameters;
599 
600  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
601  }
602 
603 
604  virtual void actionPause(int length, const char* buffer) override
605  {
606  using namespace std;
607 
608  if (!timeslices.empty()) {
609 
610  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
611 
612  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
613  queueSize -= getSizeof(*i);
614  }
615 
616  timeslices.clear();
617  }
618 
619  { // force clearance of memory
620 
622 
623  timeslices.swap(buffer);
624  }
625 
626  if (queueSize != 0) {
627  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
628  }
629 
630  current_slice_index = -1;
631  queueSize = 0;
632 
633  timer.stop();
634  }
635 
636 
637  virtual void actionContinue(int length, const char* buffer) override
638  {
639  timer.start();
640  }
641 
642 
643  virtual void actionStop(int length, const char* buffer) override
644  {
645  typeout();
646 
647  datawriter.reset();
648  }
649 
650 
651  virtual void actionReset(int length, const char* buffer) override
652  {
653  if (serversocket.is_valid()) {
654  serversocket->shutdown();
655  }
656 
657  serversocket.reset();
658  }
659 
660 
661  virtual void actionQuit(int length, const char* buffer) override
662  {
663  datawriter.reset();
664  }
665 
666 
667  virtual void setSelect(JFileDescriptorMask& mask) const override
668  {
669  if (serversocket.is_valid()) {
670  mask.set(*serversocket);
671  }
672 
673  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
674  if (!channel->isReady()) {
675  mask.set(channel->getFileDescriptor());
676  }
677  }
678  }
679 
680 
681  virtual void actionSelect(const JFileDescriptorMask& mask) override
682  {
683  using namespace std;
684 
685  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
686 
687  try {
688 
689  if (mask.has(channel->getFileDescriptor())) {
690  channel->read();
691  }
692 
693  if (channel->isReady()) {
694 
695  number_of_packets_received += 1;
696  number_of_reads += channel->getCounter();
697  number_of_bytes_received += channel->size();
698 
699  if (isRunning()) {
700 
701  updateFrameQueue(channel);
702 
703  } else {
704 
705  JErrorStream(logErrorRun) << "Receiving data while not running.";
706 
707  number_of_packets_discarded += 1;
708  }
709 
710  channel->reset();
711  }
712 
713  ++channel;
714  }
715  catch(const exception& error) {
716 
717  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
718 
719  channel->shutdown();
720 
721  channel = channelList.erase(channel);
722  }
723  }
724 
725 
726  if (serversocket.is_valid()) {
727 
728  if (mask.has(*serversocket)) {
729 
730  JSocket socket;
731 
732  socket.accept(serversocket->getFileDescriptor());
733 
734  //socket.setSendBufferSize (buffer_size);
736 
737  socket.setKeepAlive (true);
738  socket.setNonBlocking(true);
739 
740  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
741 
742  channelList.push_back(JSocketInputChannel_t(socket));
743  }
744  }
745 
746 
747  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
748  timeslices.size() >= maxQueueDepth ||
749  queueSize >= maxQueueSize)) {
750 
751  const JDAQTimesliceL0& pending_slice = timeslices.front();
752  queueSize -= getSizeof(pending_slice);
753 
754  current_slice_index = pending_slice.getFrameIndex();
755  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
756  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
757 
758  if (pending_slice.size() > frames_per_slice) {
759 
760  JErrorStream(logger) << "More frames in timeslice than expected "
761  << pending_slice.size() << " > " << frames_per_slice;
762 
763  if (pending_slice.size() <= maximum_frames_per_slice) {
764 
765  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
766 
767  frames_per_slice = pending_slice.size();
768  }
769  }
770 
771  if (!pending_slice.empty()) {
772 
773  const localtime_t t0 = getLocalTime();
774 
775  processTimeSlice(pending_slice);
776 
777  const localtime_t t1 = getLocalTime();
778 
779  numberOfTimeslicesProcessed += 1;
780 
781  Qt.put(t1 - t0);
782 
783  if (pending_slice.size() < frames_per_slice) {
784 
785  numberOfIncompleteTimeslicesProcessed += 1;
786 
787  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
788  << "Frame index = " << pending_slice.getFrameIndex() << ';'
789  << "Size of timeslice = " << pending_slice.size() << ';'
790  << "Queue depth = " << timeslices.size() << ';'
791  << "Queue size = " << queueSize;
792 
793  if (!timeslices.empty()) {
794 
795  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
796 
797  frames_per_slice = pending_slice.size();
798  }
799  }
800  }
801 
802  timeslices.pop_front();
803  }
804  }
805 
806 
807  virtual void actionRunning() override
808  {
809  if (reporting) {
810  typeout();
811  }
812  }
813 
814 
815  /**
816  * Update queue with data frames.
817  *
818  * Note that any discarded data will be reported.
819  *
820  * \param channel incoming data channel
821  */
822  void updateFrameQueue(const JChannelList_t::const_iterator channel)
823  {
824  using namespace std;
825 
826  JByteArrayReader in(channel->data(), channel->size());
827 
828  JDAQPreamble preamble;
829  JDAQSuperFrameHeader header;
830 
831  in >> preamble;
832  in >> header;
833 
834  if (preamble.getLength() != channel->size()) {
835 
836  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
837  << "preamble.getLength() = " << preamble.getLength() << ';'
838  << "channel->size(): " << channel->size() << ';';
839 
840  number_of_packets_discarded += 1;
841 
842  return;
843  }
844 
845  if (header.getRunNumber() != getRunNumber()) {
846 
847  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
848  << " unequal to current run " << getRunNumber()
849  << " -> Dropping frame.";
850 
851  number_of_packets_discarded += 1;
852 
853  return;
854  }
855 
856  if (header.getFrameIndex() <= current_slice_index) {
857 
858  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
859  << " -> Dropping frame.";
860 
861  number_of_packets_discarded += 1;
862 
863  if (frames_per_slice < maximum_frames_per_slice) {
864 
865  frames_per_slice++;
866 
867  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
868  }
869 
870  return;
871  }
872 
873  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
874 
875  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
876  << " -> Dropping frame.";
877 
878  number_of_packets_discarded += 1;
879 
880  return;
881  }
882 
883  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
884 
885  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
886  ++timesliceIterator;
887  }
888 
889  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
890 
891  // The corresponding time slice already exists
892 
893  } else {
894 
895  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
896 
897  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
898 
899  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
900 
901  queueSize += getSizeof(*timesliceIterator);
902  }
903 
904  timesliceIterator->push_back(JDAQSuperFrame(header));
905 
906  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
907 
908  queueSize += getSizeof(*timesliceIterator->rbegin());
909  }
910 
911 
912  /**
913  * Process time slice.
914  *
915  * \param timeslice time slice
916  */
917  void processTimeSlice(const JDAQTimesliceL0& timeslice)
918  {
919  using namespace std;
920 
921  try {
922 
923  timesliceRouter->configure(timeslice);
924 
925  if (parameters.writeSummary()) {
926  this->put(JDAQSummaryslice(timeslice));
927  }
928 
929  if (parameters.trigger3DMuon.enabled ||
930  parameters.trigger3DShower.enabled ||
931  parameters.triggerMXShower.enabled ||
932  parameters.triggerNB.enabled ||
933  parameters.writeL0.prescale ||
934  parameters.writeL1.prescale ||
935  parameters.writeL2.prescale ||
936  parameters.writeSN.prescale ||
937  c_buffer.is_enabled()) {
938 
939  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
940  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
941  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
942  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
943  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
944  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
945 
946  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
947 
948  if (moduleRouter->hasModule(frame->getModuleID())) {
949 
950  if (!checksum(*frame)) {
951 
952  JErrorStream(logger) << "Invalid data at "
953  << "run = " << timeslice.getRunNumber() << ";"
954  << "frame index = " << timeslice.getFrameIndex() << ";"
955  << "module = " << frame->getModuleID() << ";"
956  << "discard and dump";
957 
958  timesliceTX.push_back(*frame);
959 
960  continue;
961  }
962 
963  const JModule& module = moduleRouter->getModule(frame->getModuleID());
964  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
965 
966  // Apply high-rate veto
967 
968  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
969 
970  // L0
971 
972  timesliceL0.push_back(JSuperFrame1D_t(buffer));
973 
974  // Nano-beacon trigger
975 
976  if (parameters.triggerNB.enabled) {
977 
978  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
979 
980  if (buffer.begin() != __end) {
981 
982  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
983  frame->getModuleIdentifier(),
984  module.getPosition()));
985 
986  JSuperFrame1D_t zbuf;
987 
988  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
989 
990  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
991  }
992  }
993 
994  // L1
995 
996  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
997  frame->getModuleIdentifier(),
998  module.getPosition()));
999 
1000  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1001 
1002  // L2
1003 
1004  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1005  frame->getModuleIdentifier(),
1006  module.getPosition()));
1007 
1008  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1009 
1010  // SN
1011 
1012  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1013  frame->getModuleIdentifier(),
1014  module.getPosition()));
1015 
1016  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1017 
1018  } else {
1019 
1020  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1021  }
1022  }
1023 
1024  if (!timesliceTX.empty()) {
1025  this->put(timesliceTX);
1026  }
1027 
1028  // Trigger
1029 
1030  if (parameters.triggerNB.enabled) {
1031 
1032  const JTriggerInput trigger_input(timesliceNB);
1033 
1034  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1035 
1036  if (parameters.triggerNB.write()) {
1037 
1038  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1039  getTriggerMask(triggerNB->getTriggerBit()),
1040  *hit,
1041  *timesliceRouter,
1042  *moduleRouter,
1043  parameters.TMaxLocal_ns,
1044  parameters.triggerNB.DMax_m,
1045  getTimeRange(parameters.triggerNB));
1046 
1047  this->put(tev);
1048  }
1049  }
1050  }
1051 
1052  JTriggerInput trigger_input(timesliceL2);
1053  JTriggerOutput trigger_output;
1054 
1055  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1056  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1057  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1058 
1059  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1060 
1061  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1062 
1063  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1064 
1065  if (this->put(object)) {
1066  numberOfEvents += 1;
1067  }
1068  }
1069 
1070  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1071 
1072  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1073 
1074  if (parameters.writeL1) { this->put(object); }
1075  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1076  }
1077 
1078  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1079 
1080  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1081 
1082  if (parameters.writeL2) { this->put(object); }
1083  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1084  }
1085 
1086  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1087 
1088  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1089 
1090  if (parameters.writeSN) { this->put(object); }
1091  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1092  }
1093 
1094  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1095 
1096  if (parameters.writeL0) { this->put(timeslice); }
1097  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1098  }
1099  }
1100 
1101  } catch(const exception& error) {
1102 
1103  JErrorStream(logger) << "Error = " << error.what() << ";"
1104  << "run = " << timeslice.getRunNumber() << ";"
1105  << "frame index = " << timeslice.getFrameIndex() << ";"
1106  << "time slice not correctly processed!";
1107 
1108  if (dumpCount < dumpLimit) {
1109 
1110  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1111 
1112  dumpCount += 1;
1113  }
1114  }
1115 
1116  timesliceRouter->reset();
1117  }
1118 
1119 
1120  /**
1121  * Report status to message logger.
1122  */
1123  void typeout()
1124  {
1125  timer.stop();
1126 
1127  const double T_us = (double) timer.usec_wall;
1128 
1129  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1130  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1131  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1132  try {
1133  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1134  }
1135  catch(const std::exception&) {}
1136  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1137  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1138 
1139  if (number_of_packets_received > 0) {
1140  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1141  }
1142 
1143  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1144  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1145 
1146  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1147 
1148  if (numberOfTimeslicesProcessed > 0) {
1149  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1150  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1151  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1152  }
1153 
1154  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1155  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1156 
1157  if (processedSlicesTime_us > 0) {
1158  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1159  }
1160  if (processedDetectorTime_us > 0) {
1161  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1162  }
1163 
1164  timer.start();
1165  }
1166 
1167 
1168  /**
1169  * Tagged action to handle alerts.
1170  *
1171  * \param tag tag
1172  * \param length number of characters
1173  * \param buffer message
1174  */
1175  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1176  {
1177  using namespace std;
1178 
1179  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1180 
1181  if (tag == RC_ALERT) {
1182 
1183  if (c_buffer.is_open()) {
1184 
1185  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1186 
1187  c_buffer.close();
1188  }
1189 
1190  if (c_buffer.is_enabled()) {
1191 
1192  c_buffer.open(path, getUniqueTag());
1193 
1194  if (c_buffer.is_open()) {
1195 
1196  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1197 
1198  putObject(c_buffer.getFile(), meta);
1199 
1200  } else {
1201 
1202  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1203 
1204  c_buffer.disable();
1205  }
1206  }
1207 
1208  } else {
1209 
1210  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1211  }
1212  }
1213 
1214  JMeta meta; //!< meta data
1215 
1216  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1217 
1218  private:
1219 
1221  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1222  std::string hostname; //!< host name of data server
1223 
1224  /**
1225  * Auxiliary method to send object to data server.
1226  *
1227  * \param object object to be sent
1228  * \return true if sent; else false
1229  */
1230  template<class T>
1231  bool put(const T& object)
1232  {
1233  try {
1234 
1235  datawriter->put(object);
1236 
1237  numberOfBytes += getSizeof(object);
1238 
1239  return true;
1240  }
1241  catch(const std::exception& error) {
1242  JErrorStream(logger) << error.what();
1243  }
1244 
1245  return false;
1246  }
1247 
1248 
1249  int port; //!< server socket port
1250  int backlog;
1252 
1253  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1254  JChannelList_t channelList; //!< connections to data queue
1255 
1257  JQuantile Qt;
1258 
1259  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1261  unsigned int frames_per_slice;
1264 
1265  // trigger
1266 
1269 
1276 
1281 
1286 
1289 
1290  // process management
1291 
1294 
1295  // memory management
1296 
1297  long long int totalCPURAM;
1298  unsigned int maxQueueDepth;
1299  long long int maxQueueSize;
1300  long long int queueSize;
1301 
1302  // statistics
1303 
1305 
1306  long long int numberOfEvents;
1307  long long int numberOfBytes;
1310 
1313 
1314  // temporary
1315 
1318  long long int number_of_reads;
1320 
1321  // circular buffer
1322 
1324  std::string path; //!< directory for writing circular buffer
1325  };
1326 }
1327 
1328 /**
1329  * \file
1330  *
1331  * Application for real-time filtering of data.
1332  * For more information, see KM3NETDAQ::JDataFilter.
1333  *
1334  * \author rbruijn and mdejong
1335  */
1336 int main(int argc, char* argv[])
1337 {
1338  using namespace std;
1339  using namespace JPP;
1340  using namespace KM3NETDAQ;
1341 
1342  string server;
1343  string logger;
1344  string hostname;
1345  string client_name;
1346  int port;
1347  int backlog;
1348  int buffer_size;
1349  bool use_cout;
1350  string path;
1351  int debug;
1352 
1353 
1354  try {
1355 
1356  JParser<> zap("Application for real-time filtering of data.");
1357 
1358  zap['H'] = make_field(server) = "localhost";
1359  zap['M'] = make_field(logger) = "localhost";
1360  zap['D'] = make_field(hostname) = "";
1361  zap['u'] = make_field(client_name) = "JDataFilter";
1362  zap['P'] = make_field(port);
1363  zap['q'] = make_field(backlog) = 1024;
1364  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1365  zap['c'] = make_field(use_cout);
1366  zap['p'] = make_field(path) = "/tmp/";
1367  zap['d'] = make_field(debug) = 0;
1368 
1369  zap(argc, argv);
1370  }
1371  catch(const exception& error) {
1372  FATAL(error.what() << endl);
1373  }
1374 
1375 
1376  JLogger* out = NULL;
1377 
1378  if (use_cout)
1379  out = new JStreamLogger(cout);
1380  else
1381  out = new JControlHostLogger(logger);
1382 
1383  JDataFilter dfilter(client_name,
1384  server,
1385  hostname,
1386  out,
1387  debug,
1388  port,
1389  backlog,
1390  buffer_size,
1391  path);
1392 
1393  dfilter.meta = JMeta(argc, argv);
1394 
1395  dfilter.enter();
1396  dfilter.run();
1397 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:807
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1500
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:917
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:182
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
then JLigiers sh continue fi cat driver txt<< EOFprocess dfilter $HOST1 ssh\$HOST\$"JDataFilter -H \$SERVER\$ -M \$LOGGER\$ -d $DEBUG &";enterevent ev_init{RC_CMD%< ev_init.txt > from me< ev_init.txt > event ev_configure
Data structure for a composite optical module.
Definition: JModule.hh:57
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:270
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:158
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:155
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:150
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:257
Detector data structure.
Definition: JDetector.hh:80
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:289
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:40
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:42
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:145
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:128
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:306
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:157
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:171
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:226
Basic data structure for L0 hit.
std::string index
index in process list
do cat driver txt<< EOFevent ev_configure{RC_EVT%< ev_configure.txt > RC_DWRT path
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:154
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
const_iterator end() const
get iterator to end of data (without end marker)
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:156
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:196
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:637
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:151
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:241
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1961
Byte array binary input.
Definition: JByteArrayIO.hh:25
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:822
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:315
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:63
long long int number_of_packets_received
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:661
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:259
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:554
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:651
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:319
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:681
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:256
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:54
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:643
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:332
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY source JAcoustics sh $DETECTOR_ID CHECK_EXIT_CODE typeset A TRIPODS get_tripods $WORKDIR tripod txt TRIPODS for EMITTER in
Definition: JCanberra.sh:38
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:604
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:258
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:667
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:353
Time slice with calibrated data.
Definition: JTimeslice.hh:26
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.