Jpp
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
20 #include "JDAQ/JDAQ.hh"
21 #include "JDAQ/JDAQTags.hh"
22 #include "JDAQ/JDAQEvent.hh"
23 #include "JDAQ/JDAQTimeslice.hh"
25 #include "JDAQ/JDAQPreamble.hh"
26 #include "JDAQ/JDAQSuperFrame.hh"
27 #include "JDAQ/JDAQFrame.hh"
28 #include "JDAQ/JDAQSummaryslice.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JTools/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * JDataFilter::setSelect and
93  * JDataFilter::actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
96  * When a JDAQTimeslice is complete,
97  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
98  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
99  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
100  * <pre>
101  * numberOfFramesPerSlice = frames_per_slice = 1;
102  * </pre>
103  * Note that this parameter can change during operation (see below).
104  *
105  * A timeout may occur when the total amount of data or the number of incomplete time slices
106  * in the queue exceeds the corresponding pre-set limit:\n
107  * <pre>
108  * queueSize = <maximal queue size [B]>;
109  * queueDepth = <maximal queue depth>;
110  * </pre>
111  * Note that these values apply per JDataFilter.\n
112  *
113  * When a timeout occurs,
114  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
115  * When data are received with a frame index below the current frame index,
116  * the parameter <tt>frames_per_slice</tt> is incremented by one,
117  * upto the original value set at JDataFilter::actionConfigure.\n
118  *
119  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
120  * The following parameters can be used to configure the circular buffer.\n
121  * <pre>
122  * path = <write directory for circular buffer>;
123  * c_sizeL0 = <L0 buffer size>;
124  * c_sizeL1 = <L1 buffer size>;
125  * c_sizeL2 = <L2 buffer size>;
126  * c_sizeSN = <SN buffer size>;
127  * </pre>
128  *
129  * Note that when path is set to a valid director:
130  * - There will always be a file created which is closed when the state machine is exited;
131  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
132  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
133  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
134  * - After closure of the file, a new file will be opened if path is set;
135  *
136  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
137  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
138  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
139  * (e.g. when there is more than one alert on a given day).
140  *
141  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
142  *
143  * The script JDOMandDataFilter.sh can be used to test this application.
144  */
145  class JDataFilter :
146  public JDAQClient
147  {
148  public:
149 
152 
153  typedef double hit_type;
159 
160  /**
161  * Circular buffer.
162  */
164  public JTreeRecorder<JDAQTimesliceTypes_t>
165  {
167 
168  /**
169  * Default constructor.
170  */
172  {
173  disable();
174  }
175 
176  /**
177  * Open file.
178  *
179  * \param path directory
180  * \param tag unique process tag
181  */
182  void open(const std::string& path, const JTag& tag)
183  {
184  using namespace std;
185 
186  const JDateAndTime cal;
187 
188  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
189 
190  ostringstream os;
191 
192  os << getFullPath(path)
193  << "KM3NeT"
194  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
195  << "_" << tag;
196 
197  if (i != 0) {
198  os << "_" << i;
199  }
200 
201  os << ".root";
202 
203  try {
204  this->open(os.str().c_str());
205  }
206  catch(const exception& error) {}
207  }
208  }
209 
210  /**
211  * Disable writing.
212  */
213  void disable()
214  {
215  sizeL0 = 0;
216  sizeL1 = 0;
217  sizeL2 = 0;
218  sizeSN = 0;
219  }
220 
221  /**
222  * Check whether writing of data is enabled.
223  *
224  * \return true if writing enabled; else false
225  */
226  bool is_enabled() const
227  {
228  return (sizeL0 > 0 ||
229  sizeL1 > 0 ||
230  sizeL2 > 0 ||
231  sizeSN > 0);
232  }
233 
234  /**
235  * Write circular buffer to output stream.
236  *
237  * \param out output stream
238  * \param object circular buffer
239  * \return output stream
240  */
241  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
242  {
243  if (object.is_open())
244  out << object.getFile()->GetName();
245  else
246  out << "void";
247 
248  out << ' ';
249 
250  return out << object.sizeL0 << '/'
251  << object.sizeL1 << '/'
252  << object.sizeL2 << '/'
253  << object.sizeSN << '/';
254  }
255 
256  Long64_t sizeL0; //!< Number of L0 time slices
257  Long64_t sizeL1; //!< Number of L1 time slices
258  Long64_t sizeL2; //!< Number of L2 time slices
259  Long64_t sizeSN; //!< Number of SN time slices
260  };
261 
262 
263  /**
264  * Sort DAQ process by index.
265  *
266  * \param first first DAQ process
267  * \param second second DAQ process
268  * \return true if index of first DAQ process less than that of second; else false
269  */
270  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
271  {
272  return first.index < second.index;
273  }
274 
275 
276  /**
277  * Constructor.
278  *
279  * \param name name of client
280  * \param server name of command message server
281  * \param hostname name of data server
282  * \param logger pointer to logger
283  * \param level debug level
284  * \param port server port
285  * \param backlog server backlog
286  * \param buffer_size server buffer
287  * \param path directory for writing circular buffer
288  */
289  JDataFilter(const std::string& name,
290  const std::string& server,
291  const std::string& hostname,
292  JLogger* logger,
293  const int level,
294  const int port,
295  const int backlog,
296  const int buffer_size,
297  const std::string& path) :
298  JDAQClient (name,server,logger,level),
299  hostname (hostname),
300  port (port),
301  backlog (backlog),
303  path (path)
304  {
305  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
306 
307  addSubscription(JSubscriptionAll(RC_ALERT));
308 
309  totalCPURAM = getRAM();
310  current_slice_index = -1;
311  reporting = false;
312  }
313 
314 
315  virtual void actionEnter()
316  {}
317 
318 
319  virtual void actionExit()
320  {
321  if (c_buffer.is_open()) {
322 
323  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
324 
325  c_buffer.close();
326  }
327 
328  datawriter.reset();
329  }
330 
331 
332  virtual void actionInit(int length, const char* buffer)
333  {
334  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
335 
336  try {
337 
338  JDebugStream(logger) << "Start server.";
339 
340  serversocket.reset(new JServerSocket(port,backlog));
341  }
342  catch(const std::exception& error) {
343  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
344  ev_error();
345  }
346  }
347 
348 
349  virtual void actionConfigure(int length, const char* buffer)
350  {
351  using namespace std;
352 
353  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
354 
355  long long int update_s = 10;
356  long long int logger_s = 5;
357 
358  parameters .reset();
359  dataFilters.clear();
360  dataQueues .clear();
361 
362  reporting = false;
363 
364  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
365 
366  properties["dataWriter"] = hostname;
367  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
368  properties["detector"] = detector;
369  properties["triggerParameters"] = parameters;
370  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
371  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
372  properties["frameIndex"] = maximal_frame_index = 100000;
373  properties["logger_s"] = logger_s;
374  properties["update_s"] = update_s;
375  properties["JDataFilter"] = dataFilters;
376  properties["DataQueue"] = dataQueues;
377  properties["path"] = path;
378  properties["c_sizeL0"] = c_buffer.sizeL0;
379  properties["c_sizeL1"] = c_buffer.sizeL1;
380  properties["c_sizeL2"] = c_buffer.sizeL2;
381  properties["c_sizeSN"] = c_buffer.sizeSN;
382 
383  try {
384  properties.read(string(buffer, length));
385  }
386  catch(const exception& error) {
387  JErrorStream(logger) << error.what();
388  }
389 
390  if (update_s <= 0) { update_s = 1; }
391  if (logger_s <= 0) { logger_s = 1; }
392 
393  setClockInterval(update_s * 1000000LL);
394 
395  hostname = trim(hostname);
396 
397  if (hostname != "")
398  datawriter.reset(new JControlHost_t(hostname));
399  else
400  throw JException("Undefined data writer host name.");
401 
402  maximum_frames_per_slice = frames_per_slice;
403 
404  // process processlist
405 
406  if (dataFilters.empty()) {
407  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
408  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
409  }
410 
411  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
412 
413  unsigned int numberOfDataFiltersOnThisMachine = 0;
414  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
415 
417 
418  {
419  JNoticeStream notice(logger);
420 
421  notice << "My IP addresses:";
422 
423  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
424  notice << ' ' << *i;
425  }
426  }
427 
428  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
429 
430  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
431 
432  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
433 
434  numberOfDataFiltersOnThisMachine++;
435 
436  if (i->port == this->port) {
437  thisProcess = i;
438  }
439  }
440  }
441 
442  if (numberOfDataFiltersOnThisMachine == 0) {
443  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
444  << "Assuming one datafilter on this machine.";
445  numberOfDataFiltersOnThisMachine = 1;
446  }
447 
448  if (thisProcess == dataFilters.end()) {
449  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
450  }
451 
452  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
453  JErrorStream(logger) << "Mismatch between given process names: "
454  << "I am called " << getName()
455  << ", but in the process list I am referred to as " << thisProcess->index;
456  }
457 
458  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
459  reporting = true;
460  }
461 
462  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
463 
464  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
465 
466  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
467  << "Queue size reduced to "
468  << maxQueueSize << " bytes." ;
469  }
470 
471  // trigger parameters
472 
473  parameters.set(getMaximalDistance(detector));
474 
475  triggerNB .reset(new JTriggerNB (parameters));
476  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
477  trigger3DShower.reset(new JTrigger3DShower(parameters));
478  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
479 
480  moduleRouter.reset(new JModuleRouter(detector));
481 
482  if (reporting) {
483  JNoticeStream(logger) << "This data filter process will report.";
484  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
485  JDebugStream (logger) << "Trigger parameters: " << parameters;
486  JDebugStream (logger) << "Detector description: " << endl << detector;
487  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
488  }
489 
490  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
491 
492  // set L1, L2 and SN builders
493 
494  buildL1.reset(new JBuildL1_t(parameters));
495  buildL2.reset(new JBuildL2_t(parameters.L2));
496  buildSN.reset(new JBuildL2_t(parameters.SN));
497 
498  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
499  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
500  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
501 
502  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
503  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
504  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
505  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
506 
507  if (c_buffer.is_enabled()) {
508 
509  if (!c_buffer.is_open()) {
510 
511  c_buffer.open(path, getUniqueTag());
512 
513  if (c_buffer.is_open()) {
514 
515  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
516 
517  putObject(c_buffer.getFile(), meta);
518 
519  } else {
520 
521  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
522 
523  c_buffer.disable();
524  }
525 
526  } else {
527 
528  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
529  }
530 
531  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
532  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
533  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
534  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
535 
536  } else {
537 
538  if (c_buffer.is_open()) {
539  c_buffer.close();
540  }
541  }
542  }
543 
544 
545  virtual void actionStart(int length, const char* buffer)
546  {
547  using namespace std;
548 
549  if (reporting) {
550  JNoticeStream(logger) << "Start run " << getRunNumber();
551  }
552 
553  timeslices.clear();
554 
555  current_slice_index = -1;
556  queueSize = 0;
557 
558  numberOfEvents = 0;
559  numberOfBytes = 0;
560  numberOfTimeslicesProcessed = 0;
561  numberOfIncompleteTimeslicesProcessed = 0;
562 
563  number_of_packets_received = 0;
564  number_of_packets_discarded = 0;
565  number_of_bytes_received = 0;
566  number_of_reads = 0;
567 
568  minFrameNumber = numeric_limits<int>::max();
569  maxFrameNumber = numeric_limits<int>::min();
570 
571  // Reset global trigger counter.
572 
574 
575  logErrorRun .reset();
576  logErrorDetector .reset();
577  logErrorIndex .reset();
578  logErrorIncomplete.reset();
579 
580  timer.reset();
581  timer.start();
582 
583  Qt.reset();
584 
585  // send trigger parameters to the datawriter
586 
587  ostringstream os;
588 
589  os << getRunNumber() << ' ' << parameters;
590 
591  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
592  }
593 
594 
595  virtual void actionPause(int length, const char* buffer)
596  {
597  using namespace std;
598 
599  if (!timeslices.empty()) {
600 
601  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
602 
603  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
604  queueSize -= i->getSize();
605  }
606 
607  timeslices.clear();
608  }
609 
610  { // force clearance of memory
611 
613 
614  timeslices.swap(buffer);
615  }
616 
617  if (queueSize != 0) {
618  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
619  }
620 
621  current_slice_index = -1;
622  queueSize = 0;
623 
624  timer.stop();
625  }
626 
627 
628  virtual void actionContinue(int length, const char* buffer)
629  {
630  timer.start();
631  }
632 
633 
634  virtual void actionStop(int length, const char* buffer)
635  {
636  typeout();
637 
638  datawriter.reset();
639  }
640 
641 
642  virtual void actionReset(int length, const char* buffer)
643  {
644  serversocket.reset();
645  }
646 
647 
648  virtual void actionQuit(int length, const char* buffer)
649  {
650  datawriter.reset();
651  }
652 
653 
654  virtual void setSelect(JFileDescriptorMask& mask) const
655  {
656  if (serversocket.is_valid()) {
657  mask.set(*serversocket);
658  }
659 
660  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
661  if (!channel->isReady()) {
662  mask.set(channel->getFileDescriptor());
663  }
664  }
665  }
666 
667 
668  virtual void actionSelect(const JFileDescriptorMask& mask)
669  {
670  using namespace std;
671 
672  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
673 
674  try {
675 
676  if (mask.has(channel->getFileDescriptor())) {
677  channel->read();
678  }
679 
680  if (channel->isReady()) {
681 
682  number_of_packets_received += 1;
683  number_of_reads += channel->getCounter();
684  number_of_bytes_received += channel->size();
685 
686  if (isRunning()) {
687 
688  updateFrameQueue(channel);
689 
690  } else {
691 
692  JErrorStream(logErrorRun) << "Receiving data while not running.";
693 
694  number_of_packets_discarded += 1;
695  }
696 
697  channel->reset();
698  }
699 
700  ++channel;
701  }
702  catch(const exception& error) {
703 
704  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
705 
706  channel->shutdown();
707 
708  channel = channelList.erase(channel);
709  }
710  }
711 
712 
713  if (serversocket.is_valid()) {
714 
715  if (mask.has(*serversocket)) {
716 
717  JSocket socket;
718 
719  socket.accept(serversocket->getFileDescriptor());
720 
721  //socket.setSendBufferSize (buffer_size);
723 
724  socket.setKeepAlive (true);
725  socket.setNonBlocking(true);
726 
727  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
728 
729  channelList.push_back(JSocketInputChannel_t(socket));
730  }
731  }
732 
733 
734  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
735  timeslices.size() >= maxQueueDepth ||
736  queueSize >= maxQueueSize)) {
737 
738  const JDAQTimesliceL0& pending_slice = timeslices.front();
739  queueSize -= pending_slice.getSize();
740 
741  current_slice_index = pending_slice.getFrameIndex();
742  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
743  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
744 
745  if (pending_slice.size() > frames_per_slice) {
746 
747  JErrorStream(logger) << "More frames in timeslice than expected "
748  << pending_slice.size() << " > " << frames_per_slice;
749 
750  if (pending_slice.size() <= maximum_frames_per_slice) {
751 
752  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
753 
754  frames_per_slice = pending_slice.size();
755  }
756  }
757 
758  if (!pending_slice.empty()) {
759 
760  const localtime_t t0 = getLocalTime();
761 
762  processTimeSlice(pending_slice);
763 
764  const localtime_t t1 = getLocalTime();
765 
766  numberOfTimeslicesProcessed += 1;
767 
768  Qt.put(t1 - t0);
769 
770  if (pending_slice.size() < frames_per_slice) {
771 
772  numberOfIncompleteTimeslicesProcessed += 1;
773 
774  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
775  << "Frame index = " << pending_slice.getFrameIndex() << ';'
776  << "Size of timeslice = " << pending_slice.size() << ';'
777  << "Queue depth = " << timeslices.size() << ';'
778  << "Queue size = " << queueSize;
779 
780  if (!timeslices.empty()) {
781 
782  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
783 
784  frames_per_slice = pending_slice.size();
785  }
786  }
787  }
788 
789  timeslices.pop_front();
790  }
791  }
792 
793 
794  virtual void actionRunning()
795  {
796  if (reporting) {
797  typeout();
798  }
799  }
800 
801 
802  /**
803  * Update queue with data frames.
804  *
805  * Note that any discarded data will be reported.
806  *
807  * \param channel incoming data channel
808  */
809  void updateFrameQueue(const JChannelList_t::const_iterator channel)
810  {
811  using namespace std;
812 
813  JByteArrayReader in(channel->data(), channel->size());
814 
815  JDAQPreamble preamble;
816  JDAQSuperFrameHeader header;
817 
818  in >> preamble;
819  in >> header;
820 
821  if (preamble.getLength() != channel->size()) {
822 
823  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
824  << "preamble.getLength() = " << preamble.getLength() << ';'
825  << "channel->size(): " << channel->size() << ';';
826 
827  number_of_packets_discarded += 1;
828 
829  return;
830  }
831 
832  if (header.getRunNumber() != getRunNumber()) {
833 
834  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
835  << " unequal to current run " << getRunNumber()
836  << " -> Dropping frame.";
837 
838  number_of_packets_discarded += 1;
839 
840  return;
841  }
842 
843  if (header.getFrameIndex() <= current_slice_index) {
844 
845  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
846  << " -> Dropping frame.";
847 
848  number_of_packets_discarded += 1;
849 
850  if (frames_per_slice < maximum_frames_per_slice) {
851 
852  frames_per_slice++;
853 
854  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
855  }
856 
857  return;
858  }
859 
860  if (header.getFrameIndex() > current_slice_index + maximal_frame_index) {
861 
862  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
863  << " -> Dropping frame.";
864 
865  number_of_packets_discarded += 1;
866 
867  return;
868  }
869 
870  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
871 
872  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
873  ++timesliceIterator;
874  }
875 
876  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
877 
878  // The corresponding time slice already exists
879 
880  } else {
881 
882  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
883 
884  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
885 
886  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
887 
888  queueSize += timesliceIterator->getSize();
889  }
890 
891  timesliceIterator->push_back(JDAQSuperFrame(header));
892 
893  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
894 
895  queueSize += timesliceIterator->rbegin()->getSize();
896  }
897 
898 
899  /**
900  * Process time slice.
901  *
902  * \param timeslice time slice
903  */
904  void processTimeSlice(const JDAQTimesliceL0& timeslice)
905  {
906  using namespace std;
907 
908  try {
909 
910  if (parameters.writeSummary()) {
911  this->put(JDAQSummaryslice(timeslice));
912  }
913 
914  if (parameters.trigger3DMuon.enabled ||
915  parameters.trigger3DShower.enabled ||
916  parameters.triggerMXShower.enabled ||
917  parameters.triggerNB.enabled ||
918  parameters.writeL0.prescale ||
919  parameters.writeL1.prescale ||
920  parameters.writeL2.prescale ||
921  parameters.writeSN.prescale ||
922  c_buffer.is_enabled()) {
923 
924  timesliceRouter->configure(timeslice);
925 
926  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
927  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
928  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
929  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
930  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
931  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
932 
933  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
934 
935  if (moduleRouter->hasModule(frame->getModuleID())) {
936 
937  if (!checksum(*frame)) {
938 
939  JErrorStream(logger) << "Invalid data at "
940  << "run = " << timeslice.getRunNumber() << ";"
941  << "frame index = " << timeslice.getFrameIndex() << ";"
942  << "module = " << frame->getModuleID() << ";"
943  << "discard and dump";
944 
945  timesliceTX.push_back(*frame);
946 
947  continue;
948  }
949 
950  const JModule& module = moduleRouter->getModule(frame->getModuleID());
951  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
952 
953  // Apply high-rate veto
954 
955  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
956 
957  // L0
958 
959  timesliceL0.push_back(JSuperFrame1D_t(buffer));
960 
961  // Nano-beacon trigger
962 
963  if (parameters.triggerNB.enabled) {
964 
965  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
966 
967  if (buffer.begin() != __end) {
968 
969  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
970  frame->getModuleIdentifier(),
971  module.getPosition()));
972 
973  (*buildL1)(buffer.begin(), __end , back_inserter(*timesliceNB.rbegin()));
974  }
975  }
976 
977  // L1
978 
979  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
980  frame->getModuleIdentifier(),
981  module.getPosition()));
982 
983  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
984 
985  // L2
986 
987  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
988  frame->getModuleIdentifier(),
989  module.getPosition()));
990 
991  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
992 
993  // SN
994 
995  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
996  frame->getModuleIdentifier(),
997  module.getPosition()));
998 
999  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1000 
1001  } else {
1002 
1003  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1004  }
1005  }
1006 
1007  if (!timesliceTX.empty()) {
1008  this->put(timesliceTX);
1009  }
1010 
1011  // Trigger
1012 
1013  if (parameters.triggerNB.enabled) {
1014 
1015  const JTriggerInput trigger_input(timesliceNB);
1016 
1017  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1018 
1019  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1020  getTriggerMask(triggerNB->getTriggerBit()),
1021  *hit,
1022  *timesliceRouter,
1023  *moduleRouter,
1024  parameters.TMaxLocal_ns,
1025  parameters.triggerNB.DMax_m,
1026  getTimeRange(parameters.triggerNB));
1027 
1028  this->put(tev);
1029  }
1030  }
1031 
1032  JTriggerInput trigger_input(timesliceL2);
1033  JTriggerOutput trigger_output;
1034 
1035  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1036  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1037  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1038 
1039  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1040 
1041  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1042 
1043  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1044 
1045  if (this->put(object)) {
1046  numberOfEvents += 1;
1047  }
1048  }
1049 
1050  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1051 
1052  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1053 
1054  if (parameters.writeL1) { this->put(object); }
1055  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1056  }
1057 
1058  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1059 
1060  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1061 
1062  if (parameters.writeL2) { this->put(object); }
1063  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1064  }
1065 
1066  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1067 
1068  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1069 
1070  if (parameters.writeSN) { this->put(object); }
1071  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1072  }
1073 
1074  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1075 
1076  if (parameters.writeL0) { this->put(timeslice); }
1077  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1078  }
1079  }
1080 
1081  } catch(const exception& error) {
1082  JErrorStream(logger) << "Error = " << error.what() << ";"
1083  << "run = " << timeslice.getRunNumber() << ";"
1084  << "frame index = " << timeslice.getFrameIndex() << ";"
1085  << "time slice not correctly processed!";
1086  }
1087  }
1088 
1089 
1090  /**
1091  * Report status to message logger.
1092  */
1093  void typeout()
1094  {
1095  timer.stop();
1096 
1097  const double T_us = (double) timer.usec_wall;
1098 
1099  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1100  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1101  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1102  try {
1103  JNoticeStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1104  }
1105  catch(const std::exception&) {}
1106  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1107  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1108 
1109  if (number_of_packets_received > 0) {
1110  JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1111  }
1112 
1113  JNoticeStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1114  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1115 
1116  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1117 
1118  if (numberOfTimeslicesProcessed > 0) {
1119  JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1120  JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1121  JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1122  }
1123 
1124  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1125  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1126 
1127  if (processedSlicesTime_us > 0) {
1128  JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1129  }
1130  if (processedDetectorTime_us > 0) {
1131  JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1132  }
1133 
1134  timer.start();
1135  }
1136 
1137 
1138  /**
1139  * Tagged action to handle alerts.
1140  *
1141  * \param tag tag
1142  * \param length number of characters
1143  * \param buffer message
1144  */
1145  virtual void actionTagged(const JTag& tag, int length, const char* buffer)
1146  {
1147  using namespace std;
1148 
1149  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1150 
1151  if (tag == RC_ALERT) {
1152 
1153  if (c_buffer.is_open()) {
1154 
1155  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1156 
1157  c_buffer.close();
1158  }
1159 
1160  if (c_buffer.is_enabled()) {
1161 
1162  c_buffer.open(path, getUniqueTag());
1163 
1164  if (c_buffer.is_open()) {
1165 
1166  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1167 
1168  putObject(c_buffer.getFile(), meta);
1169 
1170  } else {
1171 
1172  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1173 
1174  c_buffer.disable();
1175  }
1176  }
1177 
1178  } else {
1179 
1180  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1181  }
1182  }
1183 
1184  JMeta meta; //!< meta data
1185 
1186  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1187 
1188  private:
1189 
1191  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1192  std::string hostname; //!< host name of data server
1193 
1194  /**
1195  * Auxiliary method to send object to data server.
1196  *
1197  * \param object object to be sent
1198  * \return true if sent; else false
1199  */
1200  template<class T>
1201  bool put(const T& object)
1202  {
1203  try {
1204 
1205  datawriter->put(object);
1206 
1207  numberOfBytes += object.getSize();
1208 
1209  return true;
1210  }
1211  catch(const std::exception& error) {
1212  JErrorStream(logger) << error.what();
1213  }
1214 
1215  return false;
1216  }
1217 
1218 
1219  int port; //!< server socket port
1220  int backlog;
1222 
1223  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1224  JChannelList_t channelList; //!< connections to data queue
1225 
1227  JQuantile Qt;
1228 
1229  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1231  unsigned int frames_per_slice;
1234 
1235  // trigger
1236 
1239 
1245 
1250 
1255 
1256  // process management
1257 
1260 
1261  // memory management
1262 
1263  long long int totalCPURAM;
1264  unsigned int maxQueueDepth;
1265  long long int maxQueueSize;
1266  long long int queueSize;
1267 
1268  // statistics
1269 
1271 
1272  long long int numberOfEvents;
1273  long long int numberOfBytes;
1276 
1279 
1280  // temporary
1281 
1284  long long int number_of_reads;
1286 
1287  // circular buffer
1288 
1290  std::string path; //!< directory for writing circular buffer
1291  };
1292 }
1293 
1294 /**
1295  * \file
1296  *
1297  * Application for real-time filtering of data.
1298  * For more information, see KM3NETDAQ::JDataFilter.
1299  *
1300  * \author rbruijn and mdejong
1301  */
1302 int main(int argc, char* argv[])
1303 {
1304  using namespace std;
1305  using namespace JPP;
1306  using namespace KM3NETDAQ;
1307 
1308  string server;
1309  string logger;
1310  string hostname;
1311  string client_name;
1312  int port;
1313  int backlog;
1314  int buffer_size;
1315  bool use_cout;
1316  string path;
1317  int debug;
1318 
1319 
1320  try {
1321 
1322  JParser<> zap("Application for real-time filtering of data.");
1323 
1324  zap['H'] = make_field(server) = "localhost";
1325  zap['M'] = make_field(logger) = "localhost";
1326  zap['D'] = make_field(hostname) = "";
1327  zap['u'] = make_field(client_name) = "JDataFilter";
1328  zap['P'] = make_field(port);
1329  zap['q'] = make_field(backlog) = 1024;
1330  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1331  zap['c'] = make_field(use_cout);
1332  zap['p'] = make_field(path) = "/tmp/";
1333  zap['d'] = make_field(debug) = 0;
1334 
1335  zap(argc, argv);
1336  }
1337  catch(const exception& error) {
1338  FATAL(error.what() << endl);
1339  }
1340 
1341 
1342  JLogger* out = NULL;
1343 
1344  if (use_cout)
1345  out = new JStreamLogger(cout);
1346  else
1347  out = new JControlHostLogger(logger);
1348 
1349  JDataFilter dfilter(client_name,
1350  server,
1351  hostname,
1352  out,
1353  debug,
1354  port,
1355  backlog,
1356  buffer_size,
1357  path);
1358 
1359  dfilter.meta = JMeta(argc, argv);
1360 
1361  dfilter.enter();
1362  dfilter.run();
1363 }
JLOGGER::JDebugStream
Level specific message streamers.
Definition: JMessageStream.hh:113
JLOGGER::JLogger
Interface for logging messages.
Definition: JLogger.hh:22
KM3NETDAQ::JDataFilter::setSelect
virtual void setSelect(JFileDescriptorMask &mask) const
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:654
KM3NETDAQ::JDataFilter::hostname
std::string hostname
host name of data server
Definition: JDataFilter.cc:1192
JLANG::JAbstractFile::getFileDescriptor
int getFileDescriptor() const
Get file descriptor.
Definition: JAbstractFile.hh:75
JRuncontrolToolkit.hh
JMeta.hh
JLOGGER::JNoticeStream
Definition: JMessageStream.hh:116
JDAQ.hh
KM3NETDAQ::JDataFilter::maxQueueSize
long long int maxQueueSize
Definition: JDataFilter.cc:1265
JControlHostObjectOutput.hh
KM3NETDAQ::JDataFilter
Main class for real-time filtering of data.
Definition: JDataFilter.cc:145
KM3NETDAQ::JDataFilter::path
std::string path
directory for writing circular buffer
Definition: JDataFilter.cc:1290
JChecksum.hh
KM3NETDAQ::JDataFilter::actionEnter
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:315
KM3NETDAQ::JDataFilter::parameters
JTriggerParameters parameters
Definition: JDataFilter.cc:1237
KM3NETDAQ::JDataFilter::channelList
JChannelList_t channelList
connections to data queue
Definition: JDataFilter.cc:1224
KM3NETDAQ::JDataFilter::number_of_reads
long long int number_of_reads
Definition: JDataFilter.cc:1284
KM3NETDAQ::JDataFilter::JChannelList_t
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:151
JTriggerNB.hh
KM3NETDAQ::JDataFilter::maximum_frames_per_slice
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:1232
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL2
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:258
JTriggerParameters.hh
JLANG::JSinglePointer< JControlHost_t >
JDETECTOR::getMaximalDistance
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
Definition: JDetectorToolkit.hh:76
JSuperFrame2D.hh
KM3NETDAQ::JDataFilter::JSuperFrame1D_t
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:154
JEEP::JTimer
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
KM3NETDAQ::JDataFilter::JCircularBuffer_t::open
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:182
KM3NETDAQ::JDataFilter::numberOfBytes
long long int numberOfBytes
Definition: JDataFilter.cc:1273
JNET::JServerSocket
TCP Server socket.
Definition: JServerSocket.hh:27
JAANET::getTimeRange
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e.
Definition: JAAnetToolkit.hh:134
JTrigger3DMuon.hh
JTriggeredEvent.hh
KM3NETDAQ::JDataFilter::maximal_frame_index
int maximal_frame_index
Definition: JDataFilter.cc:1233
KM3NETDAQ::JDataFilter::actionReset
virtual void actionReset(int length, const char *buffer)
Definition: JDataFilter.cc:642
JMessage.hh
KM3NETDAQ::JDataFilter::timesliceRouter
JSinglePointer< JTimesliceRouter > timesliceRouter
Definition: JDataFilter.cc:1238
KM3NETDAQ::JDataFilter::actionTagged
virtual void actionTagged(const JTag &tag, int length, const char *buffer)
Tagged action to handle alerts.
Definition: JDataFilter.cc:1145
JPrint.hh
KM3NETDAQ::JDataFilter::c_buffer
JCircularBuffer_t c_buffer
Definition: JDataFilter.cc:1289
KM3NETDAQ::JDataFilter::meta
JMeta meta
meta data
Definition: JDataFilter.cc:1184
KM3NETDAQ::JDataFilter::trigger3DShower
JSinglePointer< JTrigger3DShower > trigger3DShower
Definition: JDataFilter.cc:1248
JTRIGGER::JTriggerParameters
Data structure for all trigger parameters.
Definition: JTriggerParameters.hh:116
JLOGGER::JErrorStream
Definition: JMessageStream.hh:115
JTriggerToolkit.hh
KM3NETDAQ::JDataFilter::triggerNB
JSinglePointer< JTriggerNB > triggerNB
Definition: JDataFilter.cc:1246
KM3NETDAQ::JDataFilter::numberOfEvents
long long int numberOfEvents
Definition: JDataFilter.cc:1272
JTimeslice.hh
JTRIGGER::JBuildL1
Template L1 hit builder.
Definition: JBuildL1.hh:85
JSinglePointer.hh
JSYSTEM::JDateAndTime::getDay
int getDay() const
day of the month [1-31]
Definition: JDate.hh:94
JNET::JSocketInputChannel_t
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
KM3NETDAQ::JDataFilter::logErrorRun
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:1251
KM3NETDAQ::JDataFilter::number_of_packets_discarded
long long int number_of_packets_discarded
Definition: JDataFilter.cc:1283
JTimekeeper.hh
JNetwork.hh
KM3NETDAQ::JDataFilter::minFrameNumber
int minFrameNumber
Definition: JDataFilter.cc:1277
KM3NETDAQ::JDataFilter::JBuildL1_t
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:157
KM3NETDAQ::getUniqueTag
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
Definition: JRuncontrolToolkit.hh:441
JTRIGGER::JSuperFrame1D
1-dimensional frame with time calibrated data from one optical module.
Definition: JSuperFrame1D.hh:35
KM3NETDAQ::JDAQClient::run
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
JTOOLS::GIGABYTE
static const long long int GIGABYTE
Number of bytes in a megabyte.
Definition: JConstants.hh:81
std::vector< JSocketInputChannel_t >
KM3NETDAQ::JDataFilter::port
int port
server socket port
Definition: JDataFilter.cc:1219
KM3NETDAQ::JDAQChronometer::getFrameIndex
int getFrameIndex() const
Get frame index.
Definition: JDAQChronometer.hh:132
KM3NETDAQ::JDataFilter::timeslices
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
Definition: JDataFilter.cc:1229
KM3NETDAQ::JDataFilter::actionStart
virtual void actionStart(int length, const char *buffer)
Definition: JDataFilter.cc:545
JLANG::JFileDescriptorMask::has
bool has(const int file_descriptor) const
Has file descriptor.
Definition: JFileDescriptorMask.hh:216
KM3NETDAQ::JDataFilter::actionRunning
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:794
KM3NETDAQ::JDataFilter::actionContinue
virtual void actionContinue(int length, const char *buffer)
Definition: JDataFilter.cc:628
KM3NETDAQ::JDAQTimeslice
Data time slice.
Definition: JDAQTimeslice.hh:36
JServerSocket.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t::disable
void disable()
Disable writing.
Definition: JDataFilter.cc:213
KM3NETDAQ::JDataFilter::buildL1
JSinglePointer< JBuildL1_t > buildL1
Definition: JDataFilter.cc:1242
KM3NETDAQ::JDataFilter::timer
JTimer timer
Definition: JDataFilter.cc:1226
JPARSER::JParser
Utility class to parse command line options.
Definition: JParser.hh:1493
KM3NETDAQ::JDataFilter::trigger3DMuon
JSinglePointer< JTrigger3DMuon > trigger3DMuon
Definition: JDataFilter.cc:1247
JEEP::JProperties::read
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
KM3NETDAQ::JDAQTimeslice::getSize
virtual int getSize() const
Get size of object.
Definition: JDAQTimeslice.hh:260
JNET::JSocket
Socket class.
Definition: JSocket.hh:42
JDAQAbstractPreamble.hh
KM3NETDAQ::JDataFilter::reporting
bool reporting
Definition: JDataFilter.cc:1270
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL1
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:257
KM3NETDAQ::JDataFilter::actionStop
virtual void actionStop(int length, const char *buffer)
Definition: JDataFilter.cc:634
JDAQTimeslice.hh
JTRIGGER::JTriggerInput
Data structure for input to trigger algorithm.
Definition: JTriggerInput.hh:32
JNET
Interprocess communication.
Definition: JDataFilter.cc:67
JROOT::putObject
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.
Definition: JRootFileWriter.hh:38
KM3NETDAQ::JDAQAbstractPreamble::getLength
int getLength() const
Get length.
Definition: JDAQAbstractPreamble.hh:48
JQuantile.hh
KM3NETDAQ::JDataFilter::backlog
int backlog
Definition: JDataFilter.cc:1220
JTriggerBits.hh
KM3NETDAQ::JDataFilter::buildL2
JSinglePointer< JBuildL2_t > buildL2
Definition: JDataFilter.cc:1243
JDAQSummaryslice.hh
JNET::JSocket::setReceiveBufferSize
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
JNET::JControlHostObjectOutput
Implemenation of object output through ControlHost.
Definition: JControlHostObjectOutput.hh:36
JIO::JByteArrayReader
Byte array binary input.
Definition: JByteArrayIO.hh:25
KM3NETDAQ::getFrameTime
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
JSupport.hh
JPP
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
Definition: JAAnetToolkit.hh:37
JBuildL1.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t::JCircularBuffer_t
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:171
JTriggerMXShower.hh
KM3NETDAQ::JDataFilter::buffer_size
int buffer_size
Definition: JDataFilter.cc:1221
KM3NETDAQ::JDataFilter::actionInit
virtual void actionInit(int length, const char *buffer)
Definition: JDataFilter.cc:332
JTRIGGER::JSuperFrame2D::applyHighRateVeto
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Definition: JSuperFrame2D.hh:122
KM3NETDAQ::JDataFilter::JTimeslice_t
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:156
JTRIGGER::JTriggerNB
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
KM3NETDAQ::JDataFilter::JCircularBuffer_t::is_enabled
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:226
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeSN
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:259
KM3NETDAQ::JDataFilter::datawriter
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
Definition: JDataFilter.cc:1191
debug
int debug
debug level
Definition: JSirene.cc:59
KM3NETDAQ::getTriggerMask
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
Definition: JDAQTriggerMask.hh:40
JNET::JTag
ControlHost tag.
Definition: JTag.hh:35
JTRIGGER::JTimesliceL1
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
KM3NETDAQ::JDataFilter::actionConfigure
virtual void actionConfigure(int length, const char *buffer)
Definition: JDataFilter.cc:349
FILL
Auxiliary data structure for sequence of same character.
Definition: JPrint.hh:361
JConstants.hh
KM3NETDAQ::JDataFilter::logErrorIncomplete
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:1254
KM3NETDAQ::JDataFilter::moduleRouter
JSinglePointer< JModuleRouter > moduleRouter
Definition: JDataFilter.cc:1241
JTRIGGER::JTriggerOutput::merge
void merge(const JMatch_t &match)
Merge events.
Definition: JTriggerOutput.hh:41
JHitToolkit.hh
KM3NETDAQ::JDataFilter::actionExit
virtual void actionExit()
Definition: JDataFilter.cc:319
KM3NETDAQ::JDataFilter::dataFilters
std::vector< JDAQProcess > dataFilters
Definition: JDataFilter.cc:1258
KM3NETDAQ::JDataFilter::serversocket
JSinglePointer< JServerSocket > serversocket
server for data queue connections
Definition: JDataFilter.cc:1223
JTimesliceL1.hh
KM3NETDAQ::JDataFilter::maxQueueDepth
unsigned int maxQueueDepth
Definition: JDataFilter.cc:1264
JEEP::getFullPath
std::string getFullPath(const std::string &path)
Get full path, i.e.
Definition: JeepToolkit.hh:126
JNET::JSocket::accept
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
JSYSTEM::getLocalTime
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
Definition: JSystem/JTime.hh:61
JLOGGER::JWarningStream
Definition: JMessageStream.hh:114
KM3NETDAQ::JDataFilter::JControlHost_t
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
Definition: JDataFilter.cc:1190
JDAQTags.hh
KM3NETDAQ::JDAQTimesliceL0
Timeslice data structure for L0 data.
Definition: JDAQTimeslice.hh:313
JSYSTEM::JDateAndTime::getYear
int getYear() const
year a.d.
Definition: JDate.hh:96
buffer_size
const static size_t buffer_size
Definition: clb_swiss_knife.cpp:169
JEventOverlap.hh
KM3NETDAQ::JDAQSummaryslice
Data summary slice.
Definition: JDAQSummaryslice.hh:31
JHit.hh
JTreeRecorder.hh
JTRIGGER::JTimeslice
Time slice with calibrated data.
Definition: JTimeslice.hh:26
JLANG::JFileDescriptorMask
Auxiliary class for method select.
Definition: JFileDescriptorMask.hh:24
JDETECTOR::JModule
Data structure for a composite optical module.
Definition: JModule.hh:49
JSUPPORT::JTreeRecorder
ROOT TTree object output.
Definition: JTreeRecorder.hh:28
KM3NETDAQ::JDataFilter::Qt
JQuantile Qt
Definition: JDataFilter.cc:1227
JMessageScheduler.hh
KM3NETDAQ::JDataFilter::maxFrameNumber
int maxFrameNumber
Definition: JDataFilter.cc:1278
JNET::JSocket::setKeepAlive
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
KM3NETDAQ::JDAQSuperFrameHeader
DAQ super frame header.
Definition: JDAQSuperFrameHeader.hh:25
JEEP::open
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:302
JTRIGGER::JTriggeredEvent
Auxiliary class to build JDAQEvent for a triggered event.
Definition: JTriggeredEvent.hh:41
KM3NETDAQ::JDataFilter::logErrorDetector
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:1252
KM3NETDAQ::RC_ALERT
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
JSYSTEM::localtime_t
long long int localtime_t
Type definition of local time.
Definition: JSystem/JTime.hh:21
KM3NETDAQ::JDataFilter::detector
JDetector detector
Definition: JDataFilter.cc:1240
JBuildL2.hh
JLANG::JFileDescriptorMask::set
void set(const int file_descriptor)
Set file descriptor.
Definition: JFileDescriptorMask.hh:154
JTRIGGER::checksum
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:161
KM3NETDAQ::JDAQClient
Control unit client base class.
Definition: JDAQClient.hh:272
JParser.hh
JSYSTEM::JDateAndTime::getMonth
int getMonth() const
month of the year [1-12]
Definition: JDate.hh:95
KM3NETDAQ::JDataFilter::JCircularBuffer_t::operator<<
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:241
KM3NETDAQ::JDataFilter::frames_per_slice
unsigned int frames_per_slice
Definition: JDataFilter.cc:1231
KM3NETDAQ::JDAQAbstractPreamble
Simple datastructure for the DAQ preamble without ROOT functionality.
Definition: JDAQAbstractPreamble.hh:20
JDETECTOR::JModuleRouter
Router for direct addressing of module data in detector data structure.
Definition: JModuleRouter.hh:34
JHitL1.hh
KM3NETDAQ::IO_TRIGGER_PARAMETERS
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
JSYSTEM::JDateAndTime
Auxililary class to get date and time.
Definition: JDate.hh:59
KM3NETDAQ::JDataFilter::number_of_bytes_received
long long int number_of_bytes_received
Definition: JDataFilter.cc:1285
JTimesliceRouter.hh
KM3NETDAQ::JDAQChronometer::getRunNumber
int getRunNumber() const
Get run number.
Definition: JDAQChronometer.hh:121
JSocketChannel.hh
KM3NETDAQ::JDataFilter::number_of_packets_received
long long int number_of_packets_received
Definition: JDataFilter.cc:1282
JLANG::JEquationParameters
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Definition: JEquationParameters.hh:20
JROOT::JTreeWriterObjectOutput
JTreeWriter object output.
Definition: JTreeWriterObjectOutput.hh:30
KM3NETDAQ::JDataFilter::processIndexSorter
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:270
make_field
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1954
JControlHost.hh
KM3NETDAQ::JDataFilter::current_slice_index
int current_slice_index
Definition: JDataFilter.cc:1230
JTrigger3DShower.hh
KM3NETDAQ::JDataFilter::processTimeSlice
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:904
JTOOLS::MEGABYTE
static const long long int MEGABYTE
Number of bytes in a kilobyte.
Definition: JConstants.hh:80
JDETECTOR::JDetector
Detector data structure.
Definition: JDetector.hh:80
JGEOMETRY3D::JPosition3D::getPosition
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:129
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL0
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:256
JNET::JSocket::setNonBlocking
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
KM3NETDAQ::JDataFilter::actionSelect
virtual void actionSelect(const JFileDescriptorMask &mask)
Action method following last select call.
Definition: JDataFilter.cc:668
JEEP::JProperties
Utility class to parse parameter values.
Definition: JProperties.hh:496
JTRIGGER::JTrigger3DShower
Shower trigger.
Definition: JTrigger3DShower.hh:22
JAANET::detector
Detector file.
Definition: JHead.hh:130
KM3NETDAQ::JDAQChronometer::getDAQChronometer
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
Definition: JDAQChronometer.hh:88
JROOT::getName
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:45
KM3NETDAQ::JDataFilter::actionQuit
virtual void actionQuit(int length, const char *buffer)
Definition: JDataFilter.cc:648
KM3NETDAQ::JDataFilter::typeout
void typeout()
Report status to message logger.
Definition: JDataFilter.cc:1093
JSuperFrame1D.hh
JSUPPORT::JMeta
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:71
JTRIGGER::JTriggerOutput
Set of triggered events.
Definition: JTriggerOutput.hh:23
std
Definition: jaanetDictionary.h:36
JEEP::JTimekeeper
Time keeper.
Definition: JTimekeeper.hh:34
KM3NETDAQ::JDataFilter::put
bool put(const T &object)
Auxiliary method to send object to data server.
Definition: JDataFilter.cc:1201
JDAQSuperFrame.hh
KM3NETDAQ::JDataFilter::numberOfIncompleteTimeslicesProcessed
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:1275
KM3NETDAQ
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
JSocket.hh
JeepToolkit.hh
KM3NETDAQ::JDAQSuperFrame
Data frame of one optical module.
Definition: JDAQSuperFrame.hh:27
KM3NETDAQ::RC_CMD
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
JDAQFrame.hh
JDAQEvent.hh
JTimer.hh
JProperties.hh
JTRIGGER::JSuperFrame2D::iterator
std::vector< value_type >::iterator iterator
Definition: JSuperFrame2D.hh:50
KM3NETDAQ::JDataFilter::JBuildL2_t
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:158
JSYSTEM::getRAM
unsigned long long int getRAM()
Get RAM of this CPU.
Definition: JSystemToolkit.hh:254
JTRIGGER::JTriggerInput::end
const_iterator end() const
get iterator to end of data (without end marker)
Definition: JTriggerInput.hh:74
JNET::JTag::toString
std::string toString() const
Convert tag to string.
Definition: JTag.hh:167
JTRIGGER::JTriggerMXShower
Shower trigger.
Definition: JTriggerMXShower.hh:76
JTime.hh
KM3NETDAQ::JDataFilter::buildSN
JSinglePointer< JBuildL2_t > buildSN
Definition: JDataFilter.cc:1244
KM3NETDAQ::JDataFilter::logErrorIndex
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:1253
KM3NETDAQ::JDataFilter::JDataFilter
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:289
KM3NETDAQ::JDataFilter::triggerMXShower
JSinglePointer< JTriggerMXShower > triggerMXShower
Definition: JDataFilter.cc:1249
KM3NETDAQ::JDataFilter::hit_type
double hit_type
Definition: JDataFilter.cc:153
std::list
Definition: JSTDTypes.hh:24
JHitL0.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t
Circular buffer.
Definition: JDataFilter.cc:163
JTRIGGER::JSuperFrame2D
2-dimensional frame with time calibrated data from one optical module.
Definition: JSuperFrame2D.hh:41
FATAL
#define FATAL(A)
Definition: JMessage.hh:67
KM3NETDAQ::JDataFilter::dataQueues
std::vector< JDAQProcess > dataQueues
Definition: JDataFilter.cc:1259
KM3NETDAQ::JDAQProcess
Auxiliary class for itemization of process list.
Definition: JRuncontrolToolkit.hh:464
JLANG::trim
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JLOGGER::JMessageScheduler
Message logger with time scheduler.
Definition: JMessageScheduler.hh:25
KM3NETDAQ::JDAQProcess::index
std::string index
index in process list
Definition: JRuncontrolToolkit.hh:580
main
int main(int argc, char *argv[])
Definition: JDataFilter.cc:1302
KM3NETDAQ::JDataFilter::updateFrameQueue
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:809
JTRIGGER::JTrigger3DMuon
Muon trigger.
Definition: JTrigger3DMuon.hh:24
JSYSTEM::getListOfIPaddresses
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
KM3NETDAQ::JDAQTriggerCounter::reset
static void reset()
Reset counter of unique instance of this class object.
Definition: JDAQTriggerCounter.hh:87
JTRIGGER::JEventOverlap
Match of two events considering overlap in time.
Definition: JEventOverlap.hh:20
JLANG::JException
General exception.
Definition: JException.hh:40
JDAQPreamble.hh
KM3NETDAQ::JDataFilter::JSuperFrame2D_t
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:155
JSystemToolkit.hh
KM3NETDAQ::JDAQClient::enter
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
KM3NETDAQ::RC_DFILTER
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
KM3NETDAQ::JDataFilter::JSocketInputChannel_t
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:150
KM3NETDAQ::JDataFilter::queueSize
long long int queueSize
Definition: JDataFilter.cc:1266
KM3NETDAQ::JDataFilter::numberOfTimeslicesProcessed
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:1274
JNET::getSizeOfPacket
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
KM3NETDAQ::JDAQPreamble
DAQ preamble.
Definition: JDAQPreamble.hh:39
JNET::JSubscriptionAll
Auxiliary class for all subscription.
Definition: JControlHost.hh:96
JNET::JSocketInputChannel
Socket input channel.
Definition: JSocketChannel.hh:86
JTRIGGER::JTimesliceRouter
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
Definition: JTimesliceRouter.hh:62
KM3NETDAQ::JDataFilter::actionPause
virtual void actionPause(int length, const char *buffer)
Definition: JDataFilter.cc:595
JTRIGGER::JBuildL2
Template L2 builder.
Definition: JBuildL2.hh:45
KM3NETDAQ::JDataFilter::totalCPURAM
long long int totalCPURAM
Definition: JDataFilter.cc:1263
JLangToolkit.hh
JDAQClient.hh