Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JPhysics/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * JDataFilter::setSelect and
93  * JDataFilter::actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
96  * When a JDAQTimeslice is complete,
97  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
98  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
99  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
100  * <pre>
101  * numberOfFramesPerSlice = frames_per_slice = 1;
102  * </pre>
103  * Note that this parameter can change during operation (see below).
104  *
105  * A timeout may occur when the total amount of data or the number of incomplete time slices
106  * in the queue exceeds the corresponding pre-set limit:\n
107  * <pre>
108  * queueSize = <maximal queue size [B]>;
109  * queueDepth = <maximal queue depth>;
110  * </pre>
111  * Note that these values apply per JDataFilter.\n
112  *
113  * When a timeout occurs,
114  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
115  * When data are received with a frame index below the current frame index,
116  * the parameter <tt>frames_per_slice</tt> is incremented by one,
117  * upto the original value set at JDataFilter::actionConfigure.\n
118  *
119  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
120  * The following parameters can be used to configure the circular buffer.\n
121  * <pre>
122  * path = <write directory for circular buffer>;
123  * c_sizeL0 = <L0 buffer size>;
124  * c_sizeL1 = <L1 buffer size>;
125  * c_sizeL2 = <L2 buffer size>;
126  * c_sizeSN = <SN buffer size>;
127  * </pre>
128  *
129  * Note that when path is set to a valid director:
130  * - There will always be a file created which is closed when the state machine is exited;
131  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
132  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
133  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
134  * - After closure of the file, a new file will be opened if path is set;
135  *
136  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
137  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
138  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
139  * (e.g. when there is more than one alert on a given day).
140  *
141  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
142  *
143  * The script JDOMandDataFilter.sh can be used to test this application.
144  */
145  class JDataFilter :
146  public JDAQClient
147  {
148  public:
149 
152 
153  typedef double hit_type;
159 
160  /**
161  * Circular buffer.
162  */
164  public JTreeRecorder<JDAQTimesliceTypes_t>
165  {
167 
168  /**
169  * Default constructor.
170  */
172  {
173  disable();
174  }
175 
176  /**
177  * Open file.
178  *
179  * \param path directory
180  * \param tag unique process tag
181  */
182  void open(const std::string& path, const JTag& tag)
183  {
184  using namespace std;
185 
186  const JDateAndTime cal;
187 
188  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
189 
190  ostringstream os;
191 
192  os << getFullPath(path)
193  << "KM3NeT"
194  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
195  << "_" << tag;
196 
197  if (i != 0) {
198  os << "_" << i;
199  }
200 
201  os << ".root";
202 
203  try {
204  this->open(os.str().c_str());
205  }
206  catch(const exception& error) {}
207  }
208  }
209 
210  /**
211  * Disable writing.
212  */
213  void disable()
214  {
215  sizeL0 = 0;
216  sizeL1 = 0;
217  sizeL2 = 0;
218  sizeSN = 0;
219  }
220 
221  /**
222  * Check whether writing of data is enabled.
223  *
224  * \return true if writing enabled; else false
225  */
226  bool is_enabled() const
227  {
228  return (sizeL0 > 0 ||
229  sizeL1 > 0 ||
230  sizeL2 > 0 ||
231  sizeSN > 0);
232  }
233 
234  /**
235  * Write circular buffer to output stream.
236  *
237  * \param out output stream
238  * \param object circular buffer
239  * \return output stream
240  */
241  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
242  {
243  if (object.is_open())
244  out << object.getFile()->GetName();
245  else
246  out << "void";
247 
248  out << ' ';
249 
250  return out << object.sizeL0 << '/'
251  << object.sizeL1 << '/'
252  << object.sizeL2 << '/'
253  << object.sizeSN << '/';
254  }
255 
256  Long64_t sizeL0; //!< Number of L0 time slices
257  Long64_t sizeL1; //!< Number of L1 time slices
258  Long64_t sizeL2; //!< Number of L2 time slices
259  Long64_t sizeSN; //!< Number of SN time slices
260  };
261 
262 
263  /**
264  * Sort DAQ process by index.
265  *
266  * \param first first DAQ process
267  * \param second second DAQ process
268  * \return true if index of first DAQ process less than that of second; else false
269  */
270  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
271  {
272  return first.index < second.index;
273  }
274 
275 
276  /**
277  * Constructor.
278  *
279  * \param name name of client
280  * \param server name of command message server
281  * \param hostname name of data server
282  * \param logger pointer to logger
283  * \param level debug level
284  * \param port server port
285  * \param backlog server backlog
286  * \param buffer_size server buffer
287  * \param path directory for writing circular buffer
288  */
289  JDataFilter(const std::string& name,
290  const std::string& server,
291  const std::string& hostname,
292  JLogger* logger,
293  const int level,
294  const int port,
295  const int backlog,
296  const int buffer_size,
297  const std::string& path) :
298  JDAQClient (name,server,logger,level),
299  hostname (hostname),
300  port (port),
301  backlog (backlog),
302  buffer_size(buffer_size),
303  path (path)
304  {
305  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
306 
307  addSubscription(JSubscriptionAll(RC_ALERT));
308 
309  totalCPURAM = getRAM();
310  current_slice_index = -1;
311  reporting = false;
312  }
313 
314 
315  virtual void actionEnter() override
316  {}
317 
318 
319  virtual void actionExit() override
320  {
321  if (c_buffer.is_open()) {
322 
323  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
324 
325  c_buffer.close();
326  }
327 
328  datawriter.reset();
329  }
330 
331 
332  virtual void actionInit(int length, const char* buffer) override
333  {
334  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
335 
336  try {
337 
338  JDebugStream(logger) << "Start server.";
339 
340  serversocket.reset(new JServerSocket(port,backlog));
341  }
342  catch(const std::exception& error) {
343  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
344  ev_error();
345  }
346  }
347 
348 
349  virtual void actionConfigure(int length, const char* buffer) override
350  {
351  using namespace std;
352 
353  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
354 
355  long long int update_s = 10;
356  long long int logger_s = 5;
357 
358  parameters .reset();
359  dataFilters.clear();
360  dataQueues .clear();
361 
362  reporting = false;
363 
364  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
365 
366  properties["dataWriter"] = hostname;
367  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
368  properties["detector"] = detector;
369  properties["triggerParameters"] = parameters;
370  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
371  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
372  properties["frameIndex"] = maximal_frame_index = 100000;
373  properties["logger_s"] = logger_s;
374  properties["update_s"] = update_s;
375  properties["JDataFilter"] = dataFilters;
376  properties["DataQueue"] = dataQueues;
377  properties["path"] = path;
378  properties["c_sizeL0"] = c_buffer.sizeL0;
379  properties["c_sizeL1"] = c_buffer.sizeL1;
380  properties["c_sizeL2"] = c_buffer.sizeL2;
381  properties["c_sizeSN"] = c_buffer.sizeSN;
382 
383  try {
384  properties.read(string(buffer, length));
385  }
386  catch(const exception& error) {
387  JErrorStream(logger) << error.what();
388  }
389 
390  if (update_s <= 0) { update_s = 1; }
391  if (logger_s <= 0) { logger_s = 1; }
392 
393  setClockInterval(update_s * 1000000LL);
394 
395  hostname = trim(hostname);
396 
397  if (hostname != "")
398  datawriter.reset(new JControlHost_t(hostname));
399  else
400  throw JException("Undefined data writer host name.");
401 
402  maximum_frames_per_slice = frames_per_slice;
403 
404  // process processlist
405 
406  if (dataFilters.empty()) {
407  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
408  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
409  }
410 
411  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
412 
413  unsigned int numberOfDataFiltersOnThisMachine = 0;
414  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
415 
417 
418  {
419  JNoticeStream notice(logger);
420 
421  notice << "My IP addresses:";
422 
423  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
424  notice << ' ' << *i;
425  }
426  }
427 
428  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
429 
430  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
431 
432  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
433 
434  numberOfDataFiltersOnThisMachine++;
435 
436  if (i->port == this->port) {
437  thisProcess = i;
438  }
439  }
440  }
441 
442  if (numberOfDataFiltersOnThisMachine == 0) {
443  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
444  << "Assuming one datafilter on this machine.";
445  numberOfDataFiltersOnThisMachine = 1;
446  }
447 
448  if (thisProcess == dataFilters.end()) {
449  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
450  }
451 
452  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
453  JErrorStream(logger) << "Mismatch between given process names: "
454  << "I am called " << getName()
455  << ", but in the process list I am referred to as " << thisProcess->index;
456  }
457 
458  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
459  reporting = true;
460  }
461 
462  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
463 
464  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
465 
466  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
467  << "Queue size reduced to "
468  << maxQueueSize << " bytes." ;
469  }
470 
471  // trigger parameters
472 
474 
475  triggerNB .reset(new JTriggerNB (parameters));
476  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
477  trigger3DShower.reset(new JTrigger3DShower(parameters));
478  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
479 
480  moduleRouter.reset(new JModuleRouter(detector));
481 
482  if (reporting) {
483  JNoticeStream(logger) << "This data filter process will report.";
484  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
485  JDebugStream (logger) << "Trigger parameters: " << parameters;
486  JDebugStream (logger) << "Detector description: " << endl << detector;
487  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
488  }
489 
490  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
491 
492  // set L1, L2 and SN builders
493 
494  buildL1.reset(new JBuildL1_t(parameters));
495  buildL2.reset(new JBuildL2_t(parameters.L2));
496  buildSN.reset(new JBuildL2_t(parameters.SN));
497  buildNB.reset(new JBuildL2_t(parameters.NB));
498 
499  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
500  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
501  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
502  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
503 
504  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
505  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
506  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
507  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
508 
509  if (c_buffer.is_enabled()) {
510 
511  if (!c_buffer.is_open()) {
512 
513  c_buffer.open(path, getUniqueTag());
514 
515  if (c_buffer.is_open()) {
516 
517  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
518 
519  putObject(c_buffer.getFile(), meta);
520 
521  } else {
522 
523  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
524 
525  c_buffer.disable();
526  }
527 
528  } else {
529 
530  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
531  }
532 
533  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
534  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
535  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
536  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
537 
538  } else {
539 
540  if (c_buffer.is_open()) {
541  c_buffer.close();
542  }
543  }
544  }
545 
546 
547  virtual void actionStart(int length, const char* buffer) override
548  {
549  using namespace std;
550 
551  if (reporting) {
552  JNoticeStream(logger) << "Start run " << getRunNumber();
553  }
554 
555  timeslices.clear();
556 
557  current_slice_index = -1;
558  queueSize = 0;
559 
560  numberOfEvents = 0;
561  numberOfBytes = 0;
562  numberOfTimeslicesProcessed = 0;
563  numberOfIncompleteTimeslicesProcessed = 0;
564 
565  number_of_packets_received = 0;
566  number_of_packets_discarded = 0;
567  number_of_bytes_received = 0;
568  number_of_reads = 0;
569 
570  minFrameNumber = numeric_limits<int>::max();
571  maxFrameNumber = numeric_limits<int>::min();
572 
573  // Reset global trigger counter.
574 
576 
577  logErrorRun .reset();
578  logErrorDetector .reset();
579  logErrorIndex .reset();
580  logErrorIncomplete.reset();
581 
582  timer.reset();
583  timer.start();
584 
585  Qt.reset();
586 
587  // send trigger parameters to the datawriter
588 
589  ostringstream os;
590 
591  os << getRunNumber() << ' ' << parameters;
592 
593  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
594  }
595 
596 
597  virtual void actionPause(int length, const char* buffer) override
598  {
599  using namespace std;
600 
601  if (!timeslices.empty()) {
602 
603  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
604 
605  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
606  queueSize -= getSizeof(*i);
607  }
608 
609  timeslices.clear();
610  }
611 
612  { // force clearance of memory
613 
615 
616  timeslices.swap(buffer);
617  }
618 
619  if (queueSize != 0) {
620  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
621  }
622 
623  current_slice_index = -1;
624  queueSize = 0;
625 
626  timer.stop();
627  }
628 
629 
630  virtual void actionContinue(int length, const char* buffer) override
631  {
632  timer.start();
633  }
634 
635 
636  virtual void actionStop(int length, const char* buffer) override
637  {
638  typeout();
639 
640  datawriter.reset();
641  }
642 
643 
644  virtual void actionReset(int length, const char* buffer) override
645  {
646  if (serversocket.is_valid()) {
647  serversocket->shutdown();
648  }
649 
650  serversocket.reset();
651  }
652 
653 
654  virtual void actionQuit(int length, const char* buffer) override
655  {
656  datawriter.reset();
657  }
658 
659 
660  virtual void setSelect(JFileDescriptorMask& mask) const override
661  {
662  if (serversocket.is_valid()) {
663  mask.set(*serversocket);
664  }
665 
666  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
667  if (!channel->isReady()) {
668  mask.set(channel->getFileDescriptor());
669  }
670  }
671  }
672 
673 
674  virtual void actionSelect(const JFileDescriptorMask& mask) override
675  {
676  using namespace std;
677 
678  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
679 
680  try {
681 
682  if (mask.has(channel->getFileDescriptor())) {
683  channel->read();
684  }
685 
686  if (channel->isReady()) {
687 
688  number_of_packets_received += 1;
689  number_of_reads += channel->getCounter();
690  number_of_bytes_received += channel->size();
691 
692  if (isRunning()) {
693 
694  updateFrameQueue(channel);
695 
696  } else {
697 
698  JErrorStream(logErrorRun) << "Receiving data while not running.";
699 
700  number_of_packets_discarded += 1;
701  }
702 
703  channel->reset();
704  }
705 
706  ++channel;
707  }
708  catch(const exception& error) {
709 
710  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
711 
712  channel->shutdown();
713 
714  channel = channelList.erase(channel);
715  }
716  }
717 
718 
719  if (serversocket.is_valid()) {
720 
721  if (mask.has(*serversocket)) {
722 
723  JSocket socket;
724 
725  socket.accept(serversocket->getFileDescriptor());
726 
727  //socket.setSendBufferSize (buffer_size);
729 
730  socket.setKeepAlive (true);
731  socket.setNonBlocking(true);
732 
733  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
734 
735  channelList.push_back(JSocketInputChannel_t(socket));
736  }
737  }
738 
739 
740  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
741  timeslices.size() >= maxQueueDepth ||
742  queueSize >= maxQueueSize)) {
743 
744  const JDAQTimesliceL0& pending_slice = timeslices.front();
745  queueSize -= getSizeof(pending_slice);
746 
747  current_slice_index = pending_slice.getFrameIndex();
748  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
749  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
750 
751  if (pending_slice.size() > frames_per_slice) {
752 
753  JErrorStream(logger) << "More frames in timeslice than expected "
754  << pending_slice.size() << " > " << frames_per_slice;
755 
756  if (pending_slice.size() <= maximum_frames_per_slice) {
757 
758  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
759 
760  frames_per_slice = pending_slice.size();
761  }
762  }
763 
764  if (!pending_slice.empty()) {
765 
766  const localtime_t t0 = getLocalTime();
767 
768  processTimeSlice(pending_slice);
769 
770  const localtime_t t1 = getLocalTime();
771 
772  numberOfTimeslicesProcessed += 1;
773 
774  Qt.put(t1 - t0);
775 
776  if (pending_slice.size() < frames_per_slice) {
777 
778  numberOfIncompleteTimeslicesProcessed += 1;
779 
780  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
781  << "Frame index = " << pending_slice.getFrameIndex() << ';'
782  << "Size of timeslice = " << pending_slice.size() << ';'
783  << "Queue depth = " << timeslices.size() << ';'
784  << "Queue size = " << queueSize;
785 
786  if (!timeslices.empty()) {
787 
788  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
789 
790  frames_per_slice = pending_slice.size();
791  }
792  }
793  }
794 
795  timeslices.pop_front();
796  }
797  }
798 
799 
800  virtual void actionRunning() override
801  {
802  if (reporting) {
803  typeout();
804  }
805  }
806 
807 
808  /**
809  * Update queue with data frames.
810  *
811  * Note that any discarded data will be reported.
812  *
813  * \param channel incoming data channel
814  */
815  void updateFrameQueue(const JChannelList_t::const_iterator channel)
816  {
817  using namespace std;
818 
819  JByteArrayReader in(channel->data(), channel->size());
820 
821  JDAQPreamble preamble;
822  JDAQSuperFrameHeader header;
823 
824  in >> preamble;
825  in >> header;
826 
827  if (preamble.getLength() != channel->size()) {
828 
829  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
830  << "preamble.getLength() = " << preamble.getLength() << ';'
831  << "channel->size(): " << channel->size() << ';';
832 
833  number_of_packets_discarded += 1;
834 
835  return;
836  }
837 
838  if (header.getRunNumber() != getRunNumber()) {
839 
840  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
841  << " unequal to current run " << getRunNumber()
842  << " -> Dropping frame.";
843 
844  number_of_packets_discarded += 1;
845 
846  return;
847  }
848 
849  if (header.getFrameIndex() <= current_slice_index) {
850 
851  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
852  << " -> Dropping frame.";
853 
854  number_of_packets_discarded += 1;
855 
856  if (frames_per_slice < maximum_frames_per_slice) {
857 
858  frames_per_slice++;
859 
860  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
861  }
862 
863  return;
864  }
865 
866  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
867 
868  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
869  << " -> Dropping frame.";
870 
871  number_of_packets_discarded += 1;
872 
873  return;
874  }
875 
876  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
877 
878  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
879  ++timesliceIterator;
880  }
881 
882  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
883 
884  // The corresponding time slice already exists
885 
886  } else {
887 
888  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
889 
890  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
891 
892  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
893 
894  queueSize += getSizeof(*timesliceIterator);
895  }
896 
897  timesliceIterator->push_back(JDAQSuperFrame(header));
898 
899  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
900 
901  queueSize += getSizeof(*timesliceIterator->rbegin());
902  }
903 
904 
905  /**
906  * Process time slice.
907  *
908  * \param timeslice time slice
909  */
910  void processTimeSlice(const JDAQTimesliceL0& timeslice)
911  {
912  using namespace std;
913 
914  try {
915 
916  if (parameters.writeSummary()) {
917  this->put(JDAQSummaryslice(timeslice));
918  }
919 
920  if (parameters.trigger3DMuon.enabled ||
921  parameters.trigger3DShower.enabled ||
922  parameters.triggerMXShower.enabled ||
923  parameters.triggerNB.enabled ||
924  parameters.writeL0.prescale ||
925  parameters.writeL1.prescale ||
926  parameters.writeL2.prescale ||
927  parameters.writeSN.prescale ||
928  c_buffer.is_enabled()) {
929 
930  timesliceRouter->configure(timeslice);
931 
932  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
933  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
934  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
935  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
936  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
937  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
938 
939  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
940 
941  if (moduleRouter->hasModule(frame->getModuleID())) {
942 
943  if (!checksum(*frame)) {
944 
945  JErrorStream(logger) << "Invalid data at "
946  << "run = " << timeslice.getRunNumber() << ";"
947  << "frame index = " << timeslice.getFrameIndex() << ";"
948  << "module = " << frame->getModuleID() << ";"
949  << "discard and dump";
950 
951  timesliceTX.push_back(*frame);
952 
953  continue;
954  }
955 
956  const JModule& module = moduleRouter->getModule(frame->getModuleID());
957  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
958 
959  // Apply high-rate veto
960 
961  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
962 
963  // L0
964 
965  timesliceL0.push_back(JSuperFrame1D_t(buffer));
966 
967  // Nano-beacon trigger
968 
969  if (parameters.triggerNB.enabled) {
970 
971  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
972 
973  if (buffer.begin() != __end) {
974 
975  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
976  frame->getModuleIdentifier(),
977  module.getPosition()));
978 
979  JSuperFrame1D_t zbuf;
980 
981  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
982 
983  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
984  }
985  }
986 
987  // L1
988 
989  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
990  frame->getModuleIdentifier(),
991  module.getPosition()));
992 
993  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
994 
995  // L2
996 
997  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
998  frame->getModuleIdentifier(),
999  module.getPosition()));
1000 
1001  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1002 
1003  // SN
1004 
1005  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1006  frame->getModuleIdentifier(),
1007  module.getPosition()));
1008 
1009  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1010 
1011  } else {
1012 
1013  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1014  }
1015  }
1016 
1017  if (!timesliceTX.empty()) {
1018  this->put(timesliceTX);
1019  }
1020 
1021  // Trigger
1022 
1023  if (parameters.triggerNB.enabled) {
1024 
1025  const JTriggerInput trigger_input(timesliceNB);
1026 
1027  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1028 
1029  if (parameters.triggerNB.write()) {
1030 
1031  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1032  getTriggerMask(triggerNB->getTriggerBit()),
1033  *hit,
1034  *timesliceRouter,
1035  *moduleRouter,
1036  parameters.TMaxLocal_ns,
1037  parameters.triggerNB.DMax_m,
1038  getTimeRange(parameters.triggerNB));
1039 
1040  this->put(tev);
1041  }
1042  }
1043  }
1044 
1045  JTriggerInput trigger_input(timesliceL2);
1046  JTriggerOutput trigger_output;
1047 
1048  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1049  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1050  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1051 
1052  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1053 
1054  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1055 
1056  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1057 
1058  if (this->put(object)) {
1059  numberOfEvents += 1;
1060  }
1061  }
1062 
1063  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1064 
1065  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1066 
1067  if (parameters.writeL1) { this->put(object); }
1068  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1069  }
1070 
1071  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1072 
1073  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1074 
1075  if (parameters.writeL2) { this->put(object); }
1076  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1077  }
1078 
1079  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1080 
1081  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1082 
1083  if (parameters.writeSN) { this->put(object); }
1084  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1085  }
1086 
1087  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1088 
1089  if (parameters.writeL0) { this->put(timeslice); }
1090  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1091  }
1092  }
1093 
1094  } catch(const exception& error) {
1095  JErrorStream(logger) << "Error = " << error.what() << ";"
1096  << "run = " << timeslice.getRunNumber() << ";"
1097  << "frame index = " << timeslice.getFrameIndex() << ";"
1098  << "time slice not correctly processed!";
1099  }
1100  }
1101 
1102 
1103  /**
1104  * Report status to message logger.
1105  */
1106  void typeout()
1107  {
1108  timer.stop();
1109 
1110  const double T_us = (double) timer.usec_wall;
1111 
1112  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1113  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1114  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1115  try {
1116  JNoticeStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1117  }
1118  catch(const std::exception&) {}
1119  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1120  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1121 
1122  if (number_of_packets_received > 0) {
1123  JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1124  }
1125 
1126  JNoticeStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1127  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1128 
1129  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1130 
1131  if (numberOfTimeslicesProcessed > 0) {
1132  JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1133  JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1134  JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1135  }
1136 
1137  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1138  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1139 
1140  if (processedSlicesTime_us > 0) {
1141  JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1142  }
1143  if (processedDetectorTime_us > 0) {
1144  JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1145  }
1146 
1147  timer.start();
1148  }
1149 
1150 
1151  /**
1152  * Tagged action to handle alerts.
1153  *
1154  * \param tag tag
1155  * \param length number of characters
1156  * \param buffer message
1157  */
1158  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1159  {
1160  using namespace std;
1161 
1162  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1163 
1164  if (tag == RC_ALERT) {
1165 
1166  if (c_buffer.is_open()) {
1167 
1168  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1169 
1170  c_buffer.close();
1171  }
1172 
1173  if (c_buffer.is_enabled()) {
1174 
1175  c_buffer.open(path, getUniqueTag());
1176 
1177  if (c_buffer.is_open()) {
1178 
1179  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1180 
1181  putObject(c_buffer.getFile(), meta);
1182 
1183  } else {
1184 
1185  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1186 
1187  c_buffer.disable();
1188  }
1189  }
1190 
1191  } else {
1192 
1193  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1194  }
1195  }
1196 
1197  JMeta meta; //!< meta data
1198 
1199  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1200 
1201  private:
1202 
1204  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1205  std::string hostname; //!< host name of data server
1206 
1207  /**
1208  * Auxiliary method to send object to data server.
1209  *
1210  * \param object object to be sent
1211  * \return true if sent; else false
1212  */
1213  template<class T>
1214  bool put(const T& object)
1215  {
1216  try {
1217 
1218  datawriter->put(object);
1219 
1220  numberOfBytes += getSizeof(object);
1221 
1222  return true;
1223  }
1224  catch(const std::exception& error) {
1225  JErrorStream(logger) << error.what();
1226  }
1227 
1228  return false;
1229  }
1230 
1231 
1232  int port; //!< server socket port
1233  int backlog;
1235 
1236  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1237  JChannelList_t channelList; //!< connections to data queue
1238 
1240  JQuantile Qt;
1241 
1242  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1244  unsigned int frames_per_slice;
1247 
1248  // trigger
1249 
1252 
1259 
1264 
1269 
1270  // process management
1271 
1274 
1275  // memory management
1276 
1277  long long int totalCPURAM;
1278  unsigned int maxQueueDepth;
1279  long long int maxQueueSize;
1280  long long int queueSize;
1281 
1282  // statistics
1283 
1285 
1286  long long int numberOfEvents;
1287  long long int numberOfBytes;
1290 
1293 
1294  // temporary
1295 
1298  long long int number_of_reads;
1300 
1301  // circular buffer
1302 
1304  std::string path; //!< directory for writing circular buffer
1305  };
1306 }
1307 
1308 /**
1309  * \file
1310  *
1311  * Application for real-time filtering of data.
1312  * For more information, see KM3NETDAQ::JDataFilter.
1313  *
1314  * \author rbruijn and mdejong
1315  */
1316 int main(int argc, char* argv[])
1317 {
1318  using namespace std;
1319  using namespace JPP;
1320  using namespace KM3NETDAQ;
1321 
1322  string server;
1323  string logger;
1324  string hostname;
1325  string client_name;
1326  int port;
1327  int backlog;
1328  int buffer_size;
1329  bool use_cout;
1330  string path;
1331  int debug;
1332 
1333 
1334  try {
1335 
1336  JParser<> zap("Application for real-time filtering of data.");
1337 
1338  zap['H'] = make_field(server) = "localhost";
1339  zap['M'] = make_field(logger) = "localhost";
1340  zap['D'] = make_field(hostname) = "";
1341  zap['u'] = make_field(client_name) = "JDataFilter";
1342  zap['P'] = make_field(port);
1343  zap['q'] = make_field(backlog) = 1024;
1344  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1345  zap['c'] = make_field(use_cout);
1346  zap['p'] = make_field(path) = "/tmp/";
1347  zap['d'] = make_field(debug) = 0;
1348 
1349  zap(argc, argv);
1350  }
1351  catch(const exception& error) {
1352  FATAL(error.what() << endl);
1353  }
1354 
1355 
1356  JLogger* out = NULL;
1357 
1358  if (use_cout)
1359  out = new JStreamLogger(cout);
1360  else
1361  out = new JControlHostLogger(logger);
1362 
1363  JDataFilter dfilter(client_name,
1364  server,
1365  hostname,
1366  out,
1367  debug,
1368  port,
1369  backlog,
1370  buffer_size,
1371  path);
1372 
1373  dfilter.meta = JMeta(argc, argv);
1374 
1375  dfilter.enter();
1376  dfilter.run();
1377 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:800
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1500
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:910
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:182
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
ROOT TTree parameter settings of various packages.
then JLigiers sh continue fi cat driver txt<< EOFprocess dfilter $HOST1 ssh\$HOST\$"JDataFilter -H \$SERVER\$ -M \$LOGGER\$ -d $DEBUG &";enterevent ev_init{RC_CMD%< ev_init.txt > from me< ev_init.txt > event ev_configure
Data structure for a composite optical module.
Definition: JModule.hh:57
JSinglePointer< JTriggerNB > triggerNB
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:270
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:158
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:155
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:150
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:257
Detector data structure.
Definition: JDetector.hh:80
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:289
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:42
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:145
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:128
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:306
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:167
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:157
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:171
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:226
Basic data structure for L0 hit.
std::string index
index in process list
do cat driver txt<< EOFevent ev_configure{RC_EVT%< ev_configure.txt > RC_DWRT path
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:154
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
const_iterator end() const
get iterator to end of data (without end marker)
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:156
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:196
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:630
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:151
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:241
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1961
Byte array binary input.
Definition: JByteArrayIO.hh:25
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:815
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:315
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:63
long long int number_of_packets_received
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:654
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:259
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:327
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:547
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
then echo n User name
Definition: JCookie.sh:45
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:644
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:319
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:674
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:256
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:53
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:636
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:332
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY source JAcoustics sh $DETECTOR_ID typeset A TRIPODS get_tripods $WORKDIR tripod txt TRIPODS for EMITTER in
Definition: JCanberra.sh:36
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:597
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:258
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:660
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:349
Time slice with calibrated data.
Definition: JTimeslice.hh:26
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.
int main(int argc, char *argv[])
Definition: Main.cpp:15