Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JTools/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * JDataFilter::setSelect and
93  * JDataFilter::actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
96  * When a JDAQTimeslice is complete,
97  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
98  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
99  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
100  * <pre>
101  * numberOfFramesPerSlice = frames_per_slice = 1;
102  * </pre>
103  * Note that this parameter can change during operation (see below).
104  *
105  * A timeout may occur when the total amount of data or the number of incomplete time slices
106  * in the queue exceeds the corresponding pre-set limit:\n
107  * <pre>
108  * queueSize = <maximal queue size [B]>;
109  * queueDepth = <maximal queue depth>;
110  * </pre>
111  * Note that these values apply per JDataFilter.\n
112  *
113  * When a timeout occurs,
114  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
115  * When data are received with a frame index below the current frame index,
116  * the parameter <tt>frames_per_slice</tt> is incremented by one,
117  * upto the original value set at JDataFilter::actionConfigure.\n
118  *
119  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
120  * The following parameters can be used to configure the circular buffer.\n
121  * <pre>
122  * path = <write directory for circular buffer>;
123  * c_sizeL0 = <L0 buffer size>;
124  * c_sizeL1 = <L1 buffer size>;
125  * c_sizeL2 = <L2 buffer size>;
126  * c_sizeSN = <SN buffer size>;
127  * </pre>
128  *
129  * Note that when path is set to a valid director:
130  * - There will always be a file created which is closed when the state machine is exited;
131  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
132  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
133  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
134  * - After closure of the file, a new file will be opened if path is set;
135  *
136  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
137  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
138  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
139  * (e.g. when there is more than one alert on a given day).
140  *
141  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
142  *
143  * The script JDOMandDataFilter.sh can be used to test this application.
144  */
145  class JDataFilter :
146  public JDAQClient
147  {
148  public:
149 
152 
153  typedef double hit_type;
159 
160  /**
161  * Circular buffer.
162  */
164  public JTreeRecorder<JDAQTimesliceTypes_t>
165  {
167 
168  /**
169  * Default constructor.
170  */
172  {
173  disable();
174  }
175 
176  /**
177  * Open file.
178  *
179  * \param path directory
180  * \param tag unique process tag
181  */
182  void open(const std::string& path, const JTag& tag)
183  {
184  using namespace std;
185 
186  const JDateAndTime cal;
187 
188  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
189 
190  ostringstream os;
191 
192  os << getFullPath(path)
193  << "KM3NeT"
194  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
195  << "_" << tag;
196 
197  if (i != 0) {
198  os << "_" << i;
199  }
200 
201  os << ".root";
202 
203  try {
204  this->open(os.str().c_str());
205  }
206  catch(const exception& error) {}
207  }
208  }
209 
210  /**
211  * Disable writing.
212  */
213  void disable()
214  {
215  sizeL0 = 0;
216  sizeL1 = 0;
217  sizeL2 = 0;
218  sizeSN = 0;
219  }
220 
221  /**
222  * Check whether writing of data is enabled.
223  *
224  * \return true if writing enabled; else false
225  */
226  bool is_enabled() const
227  {
228  return (sizeL0 > 0 ||
229  sizeL1 > 0 ||
230  sizeL2 > 0 ||
231  sizeSN > 0);
232  }
233 
234  /**
235  * Write circular buffer to output stream.
236  *
237  * \param out output stream
238  * \param object circular buffer
239  * \return output stream
240  */
241  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
242  {
243  if (object.is_open())
244  out << object.getFile()->GetName();
245  else
246  out << "void";
247 
248  out << ' ';
249 
250  return out << object.sizeL0 << '/'
251  << object.sizeL1 << '/'
252  << object.sizeL2 << '/'
253  << object.sizeSN << '/';
254  }
255 
256  Long64_t sizeL0; //!< Number of L0 time slices
257  Long64_t sizeL1; //!< Number of L1 time slices
258  Long64_t sizeL2; //!< Number of L2 time slices
259  Long64_t sizeSN; //!< Number of SN time slices
260  };
261 
262 
263  /**
264  * Sort DAQ process by index.
265  *
266  * \param first first DAQ process
267  * \param second second DAQ process
268  * \return true if index of first DAQ process less than that of second; else false
269  */
270  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
271  {
272  return first.index < second.index;
273  }
274 
275 
276  /**
277  * Constructor.
278  *
279  * \param name name of client
280  * \param server name of command message server
281  * \param hostname name of data server
282  * \param logger pointer to logger
283  * \param level debug level
284  * \param port server port
285  * \param backlog server backlog
286  * \param buffer_size server buffer
287  * \param path directory for writing circular buffer
288  */
289  JDataFilter(const std::string& name,
290  const std::string& server,
291  const std::string& hostname,
292  JLogger* logger,
293  const int level,
294  const int port,
295  const int backlog,
296  const int buffer_size,
297  const std::string& path) :
298  JDAQClient (name,server,logger,level),
299  hostname (hostname),
300  port (port),
301  backlog (backlog),
302  buffer_size(buffer_size),
303  path (path)
304  {
305  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
306 
307  addSubscription(JSubscriptionAll(RC_ALERT));
308 
309  totalCPURAM = getRAM();
310  current_slice_index = -1;
311  reporting = false;
312  }
313 
314 
315  virtual void actionEnter()
316  {}
317 
318 
319  virtual void actionExit()
320  {
321  if (c_buffer.is_open()) {
322 
323  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
324 
325  c_buffer.close();
326  }
327 
328  datawriter.reset();
329  }
330 
331 
332  virtual void actionInit(int length, const char* buffer)
333  {
334  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
335 
336  try {
337 
338  JDebugStream(logger) << "Start server.";
339 
340  serversocket.reset(new JServerSocket(port,backlog));
341  }
342  catch(const std::exception& error) {
343  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
344  ev_error();
345  }
346  }
347 
348 
349  virtual void actionConfigure(int length, const char* buffer)
350  {
351  using namespace std;
352 
353  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
354 
355  long long int update_s = 10;
356  long long int logger_s = 5;
357 
358  parameters .reset();
359  dataFilters.clear();
360  dataQueues .clear();
361 
362  reporting = false;
363 
364  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
365 
366  properties["dataWriter"] = hostname;
367  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
368  properties["detector"] = detector;
369  properties["triggerParameters"] = parameters;
370  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
371  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
372  properties["frameIndex"] = maximal_frame_index = 100000;
373  properties["logger_s"] = logger_s;
374  properties["update_s"] = update_s;
375  properties["JDataFilter"] = dataFilters;
376  properties["DataQueue"] = dataQueues;
377  properties["path"] = path;
378  properties["c_sizeL0"] = c_buffer.sizeL0;
379  properties["c_sizeL1"] = c_buffer.sizeL1;
380  properties["c_sizeL2"] = c_buffer.sizeL2;
381  properties["c_sizeSN"] = c_buffer.sizeSN;
382 
383  try {
384  properties.read(string(buffer, length));
385  }
386  catch(const exception& error) {
387  JErrorStream(logger) << error.what();
388  }
389 
390  if (update_s <= 0) { update_s = 1; }
391  if (logger_s <= 0) { logger_s = 1; }
392 
393  setClockInterval(update_s * 1000000LL);
394 
395  hostname = trim(hostname);
396 
397  if (hostname != "")
398  datawriter.reset(new JControlHost_t(hostname));
399  else
400  throw JException("Undefined data writer host name.");
401 
402  maximum_frames_per_slice = frames_per_slice;
403 
404  // process processlist
405 
406  if (dataFilters.empty()) {
407  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
408  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
409  }
410 
411  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
412 
413  unsigned int numberOfDataFiltersOnThisMachine = 0;
414  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
415 
417 
418  {
419  JNoticeStream notice(logger);
420 
421  notice << "My IP addresses:";
422 
423  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
424  notice << ' ' << *i;
425  }
426  }
427 
428  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
429 
430  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
431 
432  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
433 
434  numberOfDataFiltersOnThisMachine++;
435 
436  if (i->port == this->port) {
437  thisProcess = i;
438  }
439  }
440  }
441 
442  if (numberOfDataFiltersOnThisMachine == 0) {
443  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
444  << "Assuming one datafilter on this machine.";
445  numberOfDataFiltersOnThisMachine = 1;
446  }
447 
448  if (thisProcess == dataFilters.end()) {
449  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
450  }
451 
452  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
453  JErrorStream(logger) << "Mismatch between given process names: "
454  << "I am called " << getName()
455  << ", but in the process list I am referred to as " << thisProcess->index;
456  }
457 
458  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
459  reporting = true;
460  }
461 
462  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
463 
464  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
465 
466  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
467  << "Queue size reduced to "
468  << maxQueueSize << " bytes." ;
469  }
470 
471  // trigger parameters
472 
474 
475  triggerNB .reset(new JTriggerNB (parameters));
476  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
477  trigger3DShower.reset(new JTrigger3DShower(parameters));
478  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
479 
480  moduleRouter.reset(new JModuleRouter(detector));
481 
482  if (reporting) {
483  JNoticeStream(logger) << "This data filter process will report.";
484  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
485  JDebugStream (logger) << "Trigger parameters: " << parameters;
486  JDebugStream (logger) << "Detector description: " << endl << detector;
487  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
488  }
489 
490  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
491 
492  // set L1, L2 and SN builders
493 
494  buildL1.reset(new JBuildL1_t(parameters));
495  buildL2.reset(new JBuildL2_t(parameters.L2));
496  buildSN.reset(new JBuildL2_t(parameters.SN));
497 
498  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
499  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
500  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
501 
502  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
503  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
504  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
505  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
506 
507  if (c_buffer.is_enabled()) {
508 
509  if (!c_buffer.is_open()) {
510 
511  c_buffer.open(path, getUniqueTag());
512 
513  if (c_buffer.is_open()) {
514 
515  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
516 
517  putObject(c_buffer.getFile(), meta);
518 
519  } else {
520 
521  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
522 
523  c_buffer.disable();
524  }
525 
526  } else {
527 
528  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
529  }
530 
531  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
532  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
533  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
534  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
535 
536  } else {
537 
538  if (c_buffer.is_open()) {
539  c_buffer.close();
540  }
541  }
542  }
543 
544 
545  virtual void actionStart(int length, const char* buffer)
546  {
547  using namespace std;
548 
549  if (reporting) {
550  JNoticeStream(logger) << "Start run " << getRunNumber();
551  }
552 
553  timeslices.clear();
554 
555  current_slice_index = -1;
556  queueSize = 0;
557 
558  numberOfEvents = 0;
559  numberOfBytes = 0;
560  numberOfTimeslicesProcessed = 0;
561  numberOfIncompleteTimeslicesProcessed = 0;
562 
563  number_of_packets_received = 0;
564  number_of_packets_discarded = 0;
565  number_of_bytes_received = 0;
566  number_of_reads = 0;
567 
568  minFrameNumber = numeric_limits<int>::max();
569  maxFrameNumber = numeric_limits<int>::min();
570 
571  // Reset global trigger counter.
572 
574 
575  logErrorRun .reset();
576  logErrorDetector .reset();
577  logErrorIndex .reset();
578  logErrorIncomplete.reset();
579 
580  timer.reset();
581  timer.start();
582 
583  Qt.reset();
584 
585  // send trigger parameters to the datawriter
586 
587  ostringstream os;
588 
589  os << getRunNumber() << ' ' << parameters;
590 
591  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
592  }
593 
594 
595  virtual void actionPause(int length, const char* buffer)
596  {
597  using namespace std;
598 
599  if (!timeslices.empty()) {
600 
601  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
602 
603  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
604  queueSize -= getSizeof(*i);
605  }
606 
607  timeslices.clear();
608  }
609 
610  { // force clearance of memory
611 
613 
614  timeslices.swap(buffer);
615  }
616 
617  if (queueSize != 0) {
618  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
619  }
620 
621  current_slice_index = -1;
622  queueSize = 0;
623 
624  timer.stop();
625  }
626 
627 
628  virtual void actionContinue(int length, const char* buffer)
629  {
630  timer.start();
631  }
632 
633 
634  virtual void actionStop(int length, const char* buffer)
635  {
636  typeout();
637 
638  datawriter.reset();
639  }
640 
641 
642  virtual void actionReset(int length, const char* buffer)
643  {
644  if (serversocket.is_valid()) {
645  serversocket->shutdown();
646  }
647 
648  serversocket.reset();
649  }
650 
651 
652  virtual void actionQuit(int length, const char* buffer)
653  {
654  datawriter.reset();
655  }
656 
657 
658  virtual void setSelect(JFileDescriptorMask& mask) const
659  {
660  if (serversocket.is_valid()) {
661  mask.set(*serversocket);
662  }
663 
664  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
665  if (!channel->isReady()) {
666  mask.set(channel->getFileDescriptor());
667  }
668  }
669  }
670 
671 
672  virtual void actionSelect(const JFileDescriptorMask& mask)
673  {
674  using namespace std;
675 
676  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
677 
678  try {
679 
680  if (mask.has(channel->getFileDescriptor())) {
681  channel->read();
682  }
683 
684  if (channel->isReady()) {
685 
686  number_of_packets_received += 1;
687  number_of_reads += channel->getCounter();
688  number_of_bytes_received += channel->size();
689 
690  if (isRunning()) {
691 
692  updateFrameQueue(channel);
693 
694  } else {
695 
696  JErrorStream(logErrorRun) << "Receiving data while not running.";
697 
698  number_of_packets_discarded += 1;
699  }
700 
701  channel->reset();
702  }
703 
704  ++channel;
705  }
706  catch(const exception& error) {
707 
708  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
709 
710  channel->shutdown();
711 
712  channel = channelList.erase(channel);
713  }
714  }
715 
716 
717  if (serversocket.is_valid()) {
718 
719  if (mask.has(*serversocket)) {
720 
721  JSocket socket;
722 
723  socket.accept(serversocket->getFileDescriptor());
724 
725  //socket.setSendBufferSize (buffer_size);
727 
728  socket.setKeepAlive (true);
729  socket.setNonBlocking(true);
730 
731  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
732 
733  channelList.push_back(JSocketInputChannel_t(socket));
734  }
735  }
736 
737 
738  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
739  timeslices.size() >= maxQueueDepth ||
740  queueSize >= maxQueueSize)) {
741 
742  const JDAQTimesliceL0& pending_slice = timeslices.front();
743  queueSize -= getSizeof(pending_slice);
744 
745  current_slice_index = pending_slice.getFrameIndex();
746  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
747  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
748 
749  if (pending_slice.size() > frames_per_slice) {
750 
751  JErrorStream(logger) << "More frames in timeslice than expected "
752  << pending_slice.size() << " > " << frames_per_slice;
753 
754  if (pending_slice.size() <= maximum_frames_per_slice) {
755 
756  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
757 
758  frames_per_slice = pending_slice.size();
759  }
760  }
761 
762  if (!pending_slice.empty()) {
763 
764  const localtime_t t0 = getLocalTime();
765 
766  processTimeSlice(pending_slice);
767 
768  const localtime_t t1 = getLocalTime();
769 
770  numberOfTimeslicesProcessed += 1;
771 
772  Qt.put(t1 - t0);
773 
774  if (pending_slice.size() < frames_per_slice) {
775 
776  numberOfIncompleteTimeslicesProcessed += 1;
777 
778  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
779  << "Frame index = " << pending_slice.getFrameIndex() << ';'
780  << "Size of timeslice = " << pending_slice.size() << ';'
781  << "Queue depth = " << timeslices.size() << ';'
782  << "Queue size = " << queueSize;
783 
784  if (!timeslices.empty()) {
785 
786  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
787 
788  frames_per_slice = pending_slice.size();
789  }
790  }
791  }
792 
793  timeslices.pop_front();
794  }
795  }
796 
797 
798  virtual void actionRunning()
799  {
800  if (reporting) {
801  typeout();
802  }
803  }
804 
805 
806  /**
807  * Update queue with data frames.
808  *
809  * Note that any discarded data will be reported.
810  *
811  * \param channel incoming data channel
812  */
813  void updateFrameQueue(const JChannelList_t::const_iterator channel)
814  {
815  using namespace std;
816 
817  JByteArrayReader in(channel->data(), channel->size());
818 
819  JDAQPreamble preamble;
820  JDAQSuperFrameHeader header;
821 
822  in >> preamble;
823  in >> header;
824 
825  if (preamble.getLength() != channel->size()) {
826 
827  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
828  << "preamble.getLength() = " << preamble.getLength() << ';'
829  << "channel->size(): " << channel->size() << ';';
830 
831  number_of_packets_discarded += 1;
832 
833  return;
834  }
835 
836  if (header.getRunNumber() != getRunNumber()) {
837 
838  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
839  << " unequal to current run " << getRunNumber()
840  << " -> Dropping frame.";
841 
842  number_of_packets_discarded += 1;
843 
844  return;
845  }
846 
847  if (header.getFrameIndex() <= current_slice_index) {
848 
849  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
850  << " -> Dropping frame.";
851 
852  number_of_packets_discarded += 1;
853 
854  if (frames_per_slice < maximum_frames_per_slice) {
855 
856  frames_per_slice++;
857 
858  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
859  }
860 
861  return;
862  }
863 
864  if (header.getFrameIndex() > current_slice_index + maximal_frame_index) {
865 
866  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
867  << " -> Dropping frame.";
868 
869  number_of_packets_discarded += 1;
870 
871  return;
872  }
873 
874  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
875 
876  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
877  ++timesliceIterator;
878  }
879 
880  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
881 
882  // The corresponding time slice already exists
883 
884  } else {
885 
886  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
887 
888  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
889 
890  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
891 
892  queueSize += getSizeof(*timesliceIterator);
893  }
894 
895  timesliceIterator->push_back(JDAQSuperFrame(header));
896 
897  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
898 
899  queueSize += getSizeof(*timesliceIterator->rbegin());
900  }
901 
902 
903  /**
904  * Process time slice.
905  *
906  * \param timeslice time slice
907  */
908  void processTimeSlice(const JDAQTimesliceL0& timeslice)
909  {
910  using namespace std;
911 
912  try {
913 
914  if (parameters.writeSummary()) {
915  this->put(JDAQSummaryslice(timeslice));
916  }
917 
918  if (parameters.trigger3DMuon.enabled ||
919  parameters.trigger3DShower.enabled ||
920  parameters.triggerMXShower.enabled ||
921  parameters.triggerNB.enabled ||
922  parameters.writeL0.prescale ||
923  parameters.writeL1.prescale ||
924  parameters.writeL2.prescale ||
925  parameters.writeSN.prescale ||
926  c_buffer.is_enabled()) {
927 
928  timesliceRouter->configure(timeslice);
929 
930  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
931  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
932  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
933  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
934  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
935  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
936 
937  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
938 
939  if (moduleRouter->hasModule(frame->getModuleID())) {
940 
941  if (!checksum(*frame)) {
942 
943  JErrorStream(logger) << "Invalid data at "
944  << "run = " << timeslice.getRunNumber() << ";"
945  << "frame index = " << timeslice.getFrameIndex() << ";"
946  << "module = " << frame->getModuleID() << ";"
947  << "discard and dump";
948 
949  timesliceTX.push_back(*frame);
950 
951  continue;
952  }
953 
954  const JModule& module = moduleRouter->getModule(frame->getModuleID());
955  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
956 
957  // Apply high-rate veto
958 
959  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
960 
961  // L0
962 
963  timesliceL0.push_back(JSuperFrame1D_t(buffer));
964 
965  // Nano-beacon trigger
966 
967  if (parameters.triggerNB.enabled) {
968 
969  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
970 
971  if (buffer.begin() != __end) {
972 
973  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
974  frame->getModuleIdentifier(),
975  module.getPosition()));
976 
977  (*buildL1)(buffer.begin(), __end , back_inserter(*timesliceNB.rbegin()));
978  }
979  }
980 
981  // L1
982 
983  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
984  frame->getModuleIdentifier(),
985  module.getPosition()));
986 
987  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
988 
989  // L2
990 
991  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
992  frame->getModuleIdentifier(),
993  module.getPosition()));
994 
995  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
996 
997  // SN
998 
999  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1000  frame->getModuleIdentifier(),
1001  module.getPosition()));
1002 
1003  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1004 
1005  } else {
1006 
1007  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1008  }
1009  }
1010 
1011  if (!timesliceTX.empty()) {
1012  this->put(timesliceTX);
1013  }
1014 
1015  // Trigger
1016 
1017  if (parameters.triggerNB.enabled) {
1018 
1019  const JTriggerInput trigger_input(timesliceNB);
1020 
1021  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1022 
1023  if (parameters.triggerNB.write()) {
1024 
1025  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1026  getTriggerMask(triggerNB->getTriggerBit()),
1027  *hit,
1028  *timesliceRouter,
1029  *moduleRouter,
1030  parameters.TMaxLocal_ns,
1031  parameters.triggerNB.DMax_m,
1032  getTimeRange(parameters.triggerNB));
1033 
1034  this->put(tev);
1035  }
1036  }
1037  }
1038 
1039  JTriggerInput trigger_input(timesliceL2);
1040  JTriggerOutput trigger_output;
1041 
1042  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1043  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1044  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1045 
1046  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1047 
1048  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1049 
1050  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1051 
1052  if (this->put(object)) {
1053  numberOfEvents += 1;
1054  }
1055  }
1056 
1057  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1058 
1059  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1060 
1061  if (parameters.writeL1) { this->put(object); }
1062  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1063  }
1064 
1065  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1066 
1067  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1068 
1069  if (parameters.writeL2) { this->put(object); }
1070  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1071  }
1072 
1073  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1074 
1075  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1076 
1077  if (parameters.writeSN) { this->put(object); }
1078  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1079  }
1080 
1081  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1082 
1083  if (parameters.writeL0) { this->put(timeslice); }
1084  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1085  }
1086  }
1087 
1088  } catch(const exception& error) {
1089  JErrorStream(logger) << "Error = " << error.what() << ";"
1090  << "run = " << timeslice.getRunNumber() << ";"
1091  << "frame index = " << timeslice.getFrameIndex() << ";"
1092  << "time slice not correctly processed!";
1093  }
1094  }
1095 
1096 
1097  /**
1098  * Report status to message logger.
1099  */
1100  void typeout()
1101  {
1102  timer.stop();
1103 
1104  const double T_us = (double) timer.usec_wall;
1105 
1106  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1107  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1108  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1109  try {
1110  JNoticeStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1111  }
1112  catch(const std::exception&) {}
1113  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1114  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1115 
1116  if (number_of_packets_received > 0) {
1117  JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1118  }
1119 
1120  JNoticeStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1121  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1122 
1123  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1124 
1125  if (numberOfTimeslicesProcessed > 0) {
1126  JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1127  JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1128  JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1129  }
1130 
1131  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1132  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1133 
1134  if (processedSlicesTime_us > 0) {
1135  JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1136  }
1137  if (processedDetectorTime_us > 0) {
1138  JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1139  }
1140 
1141  timer.start();
1142  }
1143 
1144 
1145  /**
1146  * Tagged action to handle alerts.
1147  *
1148  * \param tag tag
1149  * \param length number of characters
1150  * \param buffer message
1151  */
1152  virtual void actionTagged(const JTag& tag, int length, const char* buffer)
1153  {
1154  using namespace std;
1155 
1156  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1157 
1158  if (tag == RC_ALERT) {
1159 
1160  if (c_buffer.is_open()) {
1161 
1162  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1163 
1164  c_buffer.close();
1165  }
1166 
1167  if (c_buffer.is_enabled()) {
1168 
1169  c_buffer.open(path, getUniqueTag());
1170 
1171  if (c_buffer.is_open()) {
1172 
1173  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1174 
1175  putObject(c_buffer.getFile(), meta);
1176 
1177  } else {
1178 
1179  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1180 
1181  c_buffer.disable();
1182  }
1183  }
1184 
1185  } else {
1186 
1187  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1188  }
1189  }
1190 
1191  JMeta meta; //!< meta data
1192 
1193  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1194 
1195  private:
1196 
1198  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1199  std::string hostname; //!< host name of data server
1200 
1201  /**
1202  * Auxiliary method to send object to data server.
1203  *
1204  * \param object object to be sent
1205  * \return true if sent; else false
1206  */
1207  template<class T>
1208  bool put(const T& object)
1209  {
1210  try {
1211 
1212  datawriter->put(object);
1213 
1214  numberOfBytes += getSizeof(object);
1215 
1216  return true;
1217  }
1218  catch(const std::exception& error) {
1219  JErrorStream(logger) << error.what();
1220  }
1221 
1222  return false;
1223  }
1224 
1225 
1226  int port; //!< server socket port
1227  int backlog;
1229 
1230  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1231  JChannelList_t channelList; //!< connections to data queue
1232 
1234  JQuantile Qt;
1235 
1236  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1238  unsigned int frames_per_slice;
1241 
1242  // trigger
1243 
1246 
1252 
1257 
1262 
1263  // process management
1264 
1267 
1268  // memory management
1269 
1270  long long int totalCPURAM;
1271  unsigned int maxQueueDepth;
1272  long long int maxQueueSize;
1273  long long int queueSize;
1274 
1275  // statistics
1276 
1278 
1279  long long int numberOfEvents;
1280  long long int numberOfBytes;
1283 
1286 
1287  // temporary
1288 
1291  long long int number_of_reads;
1293 
1294  // circular buffer
1295 
1297  std::string path; //!< directory for writing circular buffer
1298  };
1299 }
1300 
1301 /**
1302  * \file
1303  *
1304  * Application for real-time filtering of data.
1305  * For more information, see KM3NETDAQ::JDataFilter.
1306  *
1307  * \author rbruijn and mdejong
1308  */
1309 int main(int argc, char* argv[])
1310 {
1311  using namespace std;
1312  using namespace JPP;
1313  using namespace KM3NETDAQ;
1314 
1315  string server;
1316  string logger;
1317  string hostname;
1318  string client_name;
1319  int port;
1320  int backlog;
1321  int buffer_size;
1322  bool use_cout;
1323  string path;
1324  int debug;
1325 
1326 
1327  try {
1328 
1329  JParser<> zap("Application for real-time filtering of data.");
1330 
1331  zap['H'] = make_field(server) = "localhost";
1332  zap['M'] = make_field(logger) = "localhost";
1333  zap['D'] = make_field(hostname) = "";
1334  zap['u'] = make_field(client_name) = "JDataFilter";
1335  zap['P'] = make_field(port);
1336  zap['q'] = make_field(backlog) = 1024;
1337  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1338  zap['c'] = make_field(use_cout);
1339  zap['p'] = make_field(path) = "/tmp/";
1340  zap['d'] = make_field(debug) = 0;
1341 
1342  zap(argc, argv);
1343  }
1344  catch(const exception& error) {
1345  FATAL(error.what() << endl);
1346  }
1347 
1348 
1349  JLogger* out = NULL;
1350 
1351  if (use_cout)
1352  out = new JStreamLogger(cout);
1353  else
1354  out = new JControlHostLogger(logger);
1355 
1356  JDataFilter dfilter(client_name,
1357  server,
1358  hostname,
1359  out,
1360  debug,
1361  port,
1362  backlog,
1363  buffer_size,
1364  path);
1365 
1366  dfilter.meta = JMeta(argc, argv);
1367 
1368  dfilter.enter();
1369  dfilter.run();
1370 }
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:71
Utility class to parse command line options.
Definition: JParser.hh:1493
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:908
virtual void actionStart(int length, const char *buffer)
Definition: JDataFilter.cc:545
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:182
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
ROOT TTree parameter settings.
then JLigiers sh continue fi cat driver txt<< EOFprocess dfilter $HOST1 ssh\$HOST\$"JDataFilter -H \$SERVER\$ -M \$LOGGER\$ -d $DEBUG &";enterevent ev_init{RC_CMD%< ev_init.txt > from me< ev_init.txt > event ev_configure
Data structure for a composite optical module.
Definition: JModule.hh:50
JSinglePointer< JTriggerNB > triggerNB
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:270
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:158
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
virtual void actionContinue(int length, const char *buffer)
Definition: JDataFilter.cc:628
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:155
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:150
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:257
Detector data structure.
Definition: JDetector.hh:80
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:289
virtual void actionExit()
Definition: JDataFilter.cc:319
virtual void actionPause(int length, const char *buffer)
Definition: JDataFilter.cc:595
esac print_variable DETECTOR INPUT_FILE OUTPUT_FILE CDF for TYPE in
Definition: JSirene.sh:45
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:42
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:145
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:128
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
static const long long int GIGABYTE
Number of bytes in a megabyte.
Definition: JConstants.hh:81
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:306
Tools for handling different hit types.
Utility class to parse parameter values.
virtual void setSelect(JFileDescriptorMask &mask) const
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:658
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:170
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:315
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:157
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:171
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:226
Basic data structure for L0 hit.
std::string index
index in process list
do cat driver txt<< EOFevent ev_configure{RC_EVT%< ev_configure.txt > RC_DWRT path
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:154
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
const_iterator end() const
get iterator to end of data (without end marker)
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:156
JSinglePointer< JTrigger3DShower > trigger3DShower
Constants.
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:130
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
static const long long int MEGABYTE
Number of bytes in a kilobyte.
Definition: JConstants.hh:80
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:151
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:241
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1954
Byte array binary input.
Definition: JByteArrayIO.hh:25
virtual void actionStop(int length, const char *buffer)
Definition: JDataFilter.cc:634
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:813
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:61
long long int number_of_packets_received
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:129
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:259
Auxiliary class for all subscription.
Definition: JControlHost.hh:96
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JPrint.hh:361
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:167
then echo n User name
Definition: JCookie.sh:45
std::vector< value_type >::iterator iterator
long long int queueSize
virtual void actionConfigure(int length, const char *buffer)
Definition: JDataFilter.cc:349
JSinglePointer< JTimesliceRouter > timesliceRouter
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
virtual void actionReset(int length, const char *buffer)
Definition: JDataFilter.cc:642
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:256
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System calls via shell interactions.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:53
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
ControlHost tag.
Definition: JTag.hh:35
virtual void actionTagged(const JTag &tag, int length, const char *buffer)
Tagged action to handle alerts.
virtual void actionQuit(int length, const char *buffer)
Definition: JDataFilter.cc:652
KM3NeT DAQ constants, bit handling, etc.
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:798
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:258
Data frame of one optical module.
long long int number_of_reads
virtual void actionSelect(const JFileDescriptorMask &mask)
Action method following last select call.
Definition: JDataFilter.cc:672
virtual void actionInit(int length, const char *buffer)
Definition: JDataFilter.cc:332
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
Time slice with calibrated data.
Definition: JTimeslice.hh:26
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.
int main(int argc, char *argv[])
Definition: Main.cpp:15