Jpp
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
21 #include "JDAQ/JDAQTags.hh"
22 #include "JDAQ/JDAQEventIO.hh"
23 #include "JDAQ/JDAQTimesliceIO.hh"
25 #include "JDAQ/JDAQPreambleIO.hh"
26 #include "JDAQ/JDAQFrameIO.hh"
27 #include "JDAQ/JDAQSuperFrameIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerBits.hh"
45 #include "JTrigger/JTimesliceL1.hh"
48 #include "JTrigger/JChecksum.hh"
51 #include "JNet/JControlHost.hh"
53 #include "JNet/JSocket.hh"
54 #include "JNet/JSocketChannel.hh"
55 #include "JNet/JServerSocket.hh"
56 #include "JTools/JConstants.hh"
57 #include "JTools/JQuantile.hh"
58 #include "JSupport/JSupport.hh"
60 #include "JSupport/JMeta.hh"
61 #include "JSystem/JTime.hh"
63 #include "JSystem/JNetwork.hh"
64 
65 
66 namespace JNET {
67 
68  /**
69  * Get size of packeet.
70  *
71  * \param preamble DAQ data preamble
72  * \return size [B]
73  */
74  template<>
76  {
77  return preamble.getLength();
78  }
79 }
80 
81 namespace KM3NETDAQ {
82 
83  using namespace JPP;
84 
85  /**
86  * Main class for real-time filtering of data.
87  *
88  * This class implements the action methods for each transition of the state machine.\n
89  * When the state machine is entered, data are continually collected using
90  * custom implementation of virtual methods
91  * JDataFilter::setSelect and
92  * JDataFilter::actionSelect.
93  *
94  * In state <tt>Running</tt>, all incoming data are buffered, referred to as "queue".\n
95  * When a JDAQTimeslice is complete,
96  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
97  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
98  * which, together with other parameters, is parsed in method JDataFilter::actionConfigure.\n
99  * <pre>
100  * numberOfFramesPerSlice = frames_per_slice = 1;
101  * </pre>
102  * Note that this parameter can change during operation (see below).
103  *
104  * A timeout may occur when the total amount of data or the number of incomplete time slices
105  * in the queue exceeds the corresponding pre-set limit:\n
106  * <pre>
107  * queueSize = <maximal queue size [B]>;
108  * queueDepth = <maximal queue depth>;
109  * </pre>
110  * Note that these values apply per JDataFilter.\n
111  *
112  * When a timeout occurs,
113  * the parameter <tt>frames_per_slice<</tt> is set to the number of frames in the oldest time slice.\n
114  * When data are received with a frame index below the current frame index,
115  * the parameter <tt>frames_per_slice</tt> is incremented by one,
116  * upto the original value set at JDataFilter::actionConfigure.\n
117  *
118  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
119  * The following parameters can be used to configure the circular buffer.\n
120  * <pre>
121  * path = <write directory for circular buffer>;
122  * c_sizeL0 = <L0 buffer size>;
123  * c_sizeL1 = <L1 buffer size>;
124  * c_sizeL2 = <L2 buffer size>;
125  * c_sizeSN = <SN buffer size>;
126  * </pre>
127  *
128  * Note that when path is set to a valid director:
129  * - There will always be a file created which is closed when the state machine is exited;
130  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
131  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
132  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
133  * - After closure of the file, a new file will be opened if path is set;
134  *
135  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
136  * the configuration or alert and <tt><tag></tt> corresponds to the unique JTag of the JDataFilter process.\n
137  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
138  * (e.g. when there is more than one alert on a given day).
139  *
140  * The application/script JConvert.cc/JConvert.sh can be used to readily convert multiple input files to a single output file.
141  *
142  * The script JDOMandDataFilter.sh can be used to test this application.
143  */
144  class JDataFilter :
145  public JDAQClient
146  {
147  public:
148 
151 
152  typedef double hit_type;
158 
159  /**
160  * Circular buffer.
161  */
163  public JTreeRecorder<JDAQTimesliceTypes_t>
164  {
166 
167  /**
168  * Default constructor.
169  */
171  {
172  disable();
173  }
174 
175  /**
176  * Open file.
177  *
178  * \param path directory
179  * \param tag unique process tag
180  */
181  void open(const std::string& path, const JTag& tag)
182  {
183  using namespace std;
184 
185  const JDateAndTime cal;
186 
187  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
188 
189  ostringstream os;
190 
191  os << getFullPath(path)
192  << "KM3NeT"
193  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
194  << "_" << tag;
195 
196  if (i != 0) {
197  os << "_" << i;
198  }
199 
200  os << ".root";
201 
202  try {
203  this->open(os.str().c_str());
204  }
205  catch(JException& exception) {}
206  }
207  }
208 
209  /**
210  * Disable writing.
211  */
212  void disable()
213  {
214  sizeL0 = 0;
215  sizeL1 = 0;
216  sizeL2 = 0;
217  sizeSN = 0;
218  }
219 
220  /**
221  * Check whether writing of data is enabled.
222  *
223  * \return true if writing enabled; else false
224  */
225  bool is_enabled() const
226  {
227  return (sizeL0 > 0 ||
228  sizeL1 > 0 ||
229  sizeL2 > 0 ||
230  sizeSN > 0);
231  }
232 
233  /**
234  * Write circular buffer to output stream.
235  *
236  * \param out output stream
237  * \param object circular buffer
238  * \return output stream
239  */
240  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
241  {
242  if (object.is_open())
243  out << object.getFile()->GetName();
244  else
245  out << "void";
246 
247  out << ' ';
248 
249  return out << object.sizeL0 << '/'
250  << object.sizeL1 << '/'
251  << object.sizeL2 << '/'
252  << object.sizeSN << '/';
253  }
254 
255  Long64_t sizeL0; //!< Number of L0 time slices
256  Long64_t sizeL1; //!< Number of L1 time slices
257  Long64_t sizeL2; //!< Number of L2 time slices
258  Long64_t sizeSN; //!< Number of SN time slices
259  };
260 
261 
262  /**
263  * Sort DAQ process by index.
264  *
265  * \param first first DAQ process
266  * \param second second DAQ process
267  * \return true if index of first DAQ process less than that of second; else false
268  */
269  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
270  {
271  return first.index < second.index;
272  }
273 
274 
275  /**
276  * Constructor.
277  *
278  * \param name name of client
279  * \param server name of command message server
280  * \param hostname name of data server
281  * \param logger pointer to logger
282  * \param level debug level
283  * \param port server port
284  * \param backlog server backlog
285  * \param buffer_size server buffer
286  * \param path directory for writing circular buffer
287  */
288  JDataFilter(const std::string& name,
289  const std::string& server,
290  const std::string& hostname,
291  JLogger* logger,
292  const int level,
293  const int port,
294  const int backlog,
295  const int buffer_size,
296  const std::string& path) :
297  JDAQClient (name,server,logger,level),
298  hostname (hostname),
299  port (port),
300  backlog (backlog),
302  path (path)
303  {
304  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
305 
306  addSubscription(JSubscriptionAll(RC_ALERT));
307 
308  totalCPURAM = getRAM();
309  current_slice_index = -1;
310  reporting = false;
311  }
312 
313 
314  virtual void actionEnter()
315  {}
316 
317 
318  virtual void actionExit()
319  {
320  if (c_buffer.is_open()) {
321 
322  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
323 
324  c_buffer.close();
325  }
326 
327  datawriter.reset();
328  }
329 
330 
331  virtual void actionInit(int length, const char* buffer)
332  {
333  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
334 
335  try {
336 
337  JDebugStream(logger) << "Start server.";
338 
339  serversocket.reset(new JServerSocket(port,backlog));
340  }
341  catch(const std::exception& error) {
342  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
343  ev_error();
344  }
345  }
346 
347 
348  virtual void actionConfigure(int length, const char* buffer)
349  {
350  using namespace std;
351 
352  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
353 
354  long long int update_s = 10;
355  long long int logger_s = 5;
356 
357  parameters .reset();
358  dataFilters.clear();
359  dataQueues .clear();
360 
361  reporting = false;
362 
363  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
364 
365  properties["dataWriter"] = hostname;
366  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
367  properties["detector"] = detector;
368  properties["triggerParameters"] = parameters;
369  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
370  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
371  properties["logger_s"] = logger_s;
372  properties["update_s"] = update_s;
373  properties["JDataFilter"] = dataFilters;
374  properties["DataQueue"] = dataQueues;
375  properties["path"] = path;
376  properties["c_sizeL0"] = c_buffer.sizeL0;
377  properties["c_sizeL1"] = c_buffer.sizeL1;
378  properties["c_sizeL2"] = c_buffer.sizeL2;
379  properties["c_sizeSN"] = c_buffer.sizeSN;
380 
381  try {
382  properties.read(string(buffer, length));
383  }
384  catch(const exception& error) {
385  JErrorStream(logger) << error.what();
386  }
387 
388  if (update_s <= 0) { update_s = 1; }
389  if (logger_s <= 0) { logger_s = 1; }
390 
391  setClockInterval(update_s * 1000000LL);
392 
393  hostname = trim(hostname);
394 
395  if (hostname != "")
396  datawriter.reset(new JControlHost_t(hostname));
397  else
398  throw JException("Undefined data writer host name.");
399 
400  maximum_frames_per_slice = frames_per_slice;
401 
402  // process processlist
403 
404  if (dataFilters.empty()) {
405  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
406  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
407  }
408 
409  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
410 
411  unsigned int numberOfDataFiltersOnThisMachine = 0;
412  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
413 
415 
416  {
417  JNoticeStream notice(logger);
418 
419  notice << "My IP addresses:";
420 
421  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
422  notice << ' ' << *i;
423  }
424  }
425 
426  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
427 
428  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
429 
430  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
431 
432  numberOfDataFiltersOnThisMachine++;
433 
434  if (i->port == this->port) {
435  thisProcess = i;
436  }
437  }
438  }
439 
440  if (numberOfDataFiltersOnThisMachine == 0) {
441  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
442  << "Assuming one datafilter on this machine.";
443  numberOfDataFiltersOnThisMachine = 1;
444  }
445 
446  if (thisProcess == dataFilters.end()) {
447  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
448  }
449 
450  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
451  JErrorStream(logger) << "Mismatch between given process names: "
452  << "I am called " << getName()
453  << ", but in the process list I am referred to as " << thisProcess->index;
454  }
455 
456  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
457  reporting = true;
458  }
459 
460  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
461 
462  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
463 
464  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
465  << "Queue size reduced to "
466  << maxQueueSize << " bytes." ;
467  }
468 
469  // trigger parameters
470 
471  parameters.set(getMaximalDistance(detector));
472 
473  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
474  trigger3DShower.reset(new JTrigger3DShower(parameters));
475  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
476 
477  moduleRouter.reset(new JModuleRouter(detector));
478 
479  if (reporting) {
480  JNoticeStream(logger) << "This data filter process will report.";
481  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
482  JDebugStream (logger) << "Trigger parameters: " << parameters;
483  JDebugStream (logger) << "Detector description: " << endl << detector;
484  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
485  }
486 
487  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
488 
489  // set L1, L2 and SN builders
490 
491  buildL1.reset(new JBuildL1_t(parameters));
492  buildL2.reset(new JBuildL2_t(parameters.L2));
493  buildSN.reset(new JBuildL2_t(parameters.SN));
494 
495  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
496  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
497  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
498 
499  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
500  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
501  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
502  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
503 
504  if (c_buffer.is_enabled()) {
505 
506  if (!c_buffer.is_open()) {
507 
508  c_buffer.open(path, getUniqueTag());
509 
510  if (c_buffer.is_open()) {
511 
512  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
513 
514  putObject(c_buffer.getFile(), meta);
515 
516  } else {
517 
518  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
519 
520  c_buffer.disable();
521  }
522 
523  } else {
524 
525  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
526  }
527 
528  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
529  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
530  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
531  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
532 
533  } else {
534 
535  if (c_buffer.is_open()) {
536  c_buffer.close();
537  }
538  }
539  }
540 
541 
542  virtual void actionStart(int length, const char* buffer)
543  {
544  using namespace std;
545 
546  if (reporting) {
547  JNoticeStream(logger) << "Start run " << getRunNumber();
548  }
549 
550  timeslices.clear();
551 
552  current_slice_index = -1;
553  queueSize = 0;
554 
555  numberOfEvents = 0;
556  numberOfBytes = 0;
557  numberOfTimeslicesProcessed = 0;
558  numberOfIncompleteTimeslicesProcessed = 0;
559 
560  number_of_packets_received = 0;
561  number_of_packets_discarded = 0;
562  number_of_bytes_received = 0;
563  number_of_reads = 0;
564 
565  minFrameNumber = numeric_limits<int>::max();
566  maxFrameNumber = numeric_limits<int>::min();
567 
568  // Reset global trigger counter.
569 
571 
572  logErrorRun .reset();
573  logErrorDetector .reset();
574  logErrorIndex .reset();
575  logErrorIncomplete.reset();
576 
577  timer.reset();
578  timer.start();
579 
580  Qt.reset();
581 
582  // send trigger parameters to the datawriter
583 
584  ostringstream os;
585 
586  os << getRunNumber() << ' ' << parameters;
587 
588  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
589  }
590 
591 
592  virtual void actionPause(int length, const char* buffer)
593  {
594  using namespace std;
595 
596  if (!timeslices.empty()) {
597 
598  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
599 
600  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
601  queueSize -= getSizeof(*i);
602  }
603 
604  timeslices.clear();
605  }
606 
607  { // force clearance of memory
608 
610 
611  timeslices.swap(buffer);
612  }
613 
614  if (queueSize != 0) {
615  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
616  }
617 
618  current_slice_index = -1;
619  queueSize = 0;
620 
621  timer.stop();
622  }
623 
624 
625  virtual void actionContinue(int length, const char* buffer)
626  {
627  timer.start();
628  }
629 
630 
631  virtual void actionStop(int length, const char* buffer)
632  {
633  typeout();
634 
635  datawriter.reset();
636  }
637 
638 
639  virtual void actionReset(int length, const char* buffer)
640  {
641  serversocket.reset();
642  }
643 
644 
645  virtual void actionQuit(int length, const char* buffer)
646  {
647  datawriter.reset();
648  }
649 
650 
651  virtual void setSelect(JFileDescriptorMask& mask) const
652  {
653  if (serversocket.is_valid()) {
654  mask.set(*serversocket);
655  }
656 
657  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
658  if (!channel->isReady()) {
659  mask.set(channel->getFileDescriptor());
660  }
661  }
662  }
663 
664 
665  virtual void actionSelect(const JFileDescriptorMask& mask)
666  {
667  using namespace std;
668 
669  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
670 
671  try {
672 
673  if (mask.has(channel->getFileDescriptor())) {
674  channel->read();
675  }
676 
677  if (channel->isReady()) {
678 
679  number_of_packets_received += 1;
680  number_of_reads += channel->getCounter();
681  number_of_bytes_received += channel->size();
682 
683  if (isRunning()) {
684 
685  updateFrameQueue(channel);
686 
687  } else {
688 
689  JErrorStream(logErrorRun) << "Receiving data while not running.";
690 
691  number_of_packets_discarded += 1;
692  }
693 
694  channel->reset();
695  }
696 
697  ++channel;
698  }
699  catch(const JSocketException& error) {
700 
701  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
702 
703  channel->shutdown();
704 
705  channel = channelList.erase(channel);
706  }
707  }
708 
709 
710  if (serversocket.is_valid()) {
711 
712  if (mask.has(*serversocket)) {
713 
714  JSocket socket;
715 
716  socket.accept(serversocket->getFileDescriptor());
717 
718  //socket.setSendBufferSize (buffer_size);
720 
721  socket.setKeepAlive (true);
722  socket.setNonBlocking(true);
723 
724  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
725 
726  channelList.push_back(JSocketInputChannel_t(socket));
727  }
728  }
729 
730 
731  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
732  timeslices.size() >= maxQueueDepth ||
733  queueSize >= maxQueueSize)) {
734 
735  const JDAQTimesliceL0& pending_slice = timeslices.front();
736  queueSize -= getSizeof(pending_slice);
737 
738  current_slice_index = pending_slice.getFrameIndex();
739  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
740  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
741 
742  if (pending_slice.size() > frames_per_slice) {
743 
744  JErrorStream(logger) << "More frames in timeslice than expected "
745  << pending_slice.size() << " > " << frames_per_slice;
746 
747  if (pending_slice.size() <= maximum_frames_per_slice) {
748 
749  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
750 
751  frames_per_slice = pending_slice.size();
752  }
753  }
754 
755  if (!pending_slice.empty()) {
756 
757  const localtime_t t0 = getLocalTime();
758 
759  processTimeSlice(pending_slice);
760 
761  const localtime_t t1 = getLocalTime();
762 
763  numberOfTimeslicesProcessed += 1;
764 
765  Qt.put(t1 - t0);
766 
767  if (pending_slice.size() < frames_per_slice) {
768 
769  numberOfIncompleteTimeslicesProcessed += 1;
770 
771  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
772  << "Frame index = " << pending_slice.getFrameIndex() << ';'
773  << "Size of timeslice = " << pending_slice.size() << ';'
774  << "Queue depth = " << timeslices.size() << ';'
775  << "Queue size = " << queueSize;
776 
777  if (!timeslices.empty()) {
778 
779  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
780 
781  frames_per_slice = pending_slice.size();
782  }
783  }
784  }
785 
786  timeslices.pop_front();
787  }
788  }
789 
790 
791  virtual void actionRunning()
792  {
793  if (reporting) {
794  typeout();
795  }
796  }
797 
798 
799  /**
800  * Update queue with data frames.
801  *
802  * Note that any discarded data will be reported.
803  *
804  * \param channel incoming data channel
805  */
806  void updateFrameQueue(const JChannelList_t::const_iterator channel)
807  {
808  using namespace std;
809 
810  JByteArrayReader in(channel->data(), channel->size());
811 
812  JDAQPreamble preamble;
813  JDAQSuperFrameHeader header;
814 
815  in >> preamble;
816  in >> header;
817 
818  if (preamble.getLength() != channel->size()) {
819 
820  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
821  << "preamble.getLength() = " << preamble.getLength() << ';'
822  << "channel->size(): " << channel->size() << ';';
823 
824  number_of_packets_discarded += 1;
825 
826  return;
827  }
828 
829  if (header.getRunNumber() != getRunNumber()) {
830 
831  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
832  << " unequal to current run " << getRunNumber()
833  << " -> Dropping frame.";
834 
835  number_of_packets_discarded += 1;
836 
837  return;
838  }
839 
840  if (header.getFrameIndex() <= current_slice_index) {
841 
842  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
843  << " -> Dropping frame.";
844 
845  number_of_packets_discarded += 1;
846 
847  if (frames_per_slice < maximum_frames_per_slice) {
848 
849  frames_per_slice++;
850 
851  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
852  }
853 
854  return;
855  }
856 
857  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
858 
859  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
860  ++timesliceIterator;
861  }
862 
863  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
864 
865  // The corresponding time slice already exists
866 
867  } else {
868 
869  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
870 
871  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
872 
873  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
874 
875  queueSize += getSizeof(*timesliceIterator);
876  }
877 
878  timesliceIterator->push_back(JDAQSuperFrame(header));
879 
880  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
881 
882  queueSize += getSizeof(*timesliceIterator->rbegin());
883  }
884 
885 
886  /**
887  * Process time slice.
888  *
889  * \param timeslice time slice
890  */
891  void processTimeSlice(const JDAQTimesliceL0& timeslice)
892  {
893  using namespace std;
894 
895  try {
896 
897  if (parameters.writeSummary()) {
898  this->put(JDAQSummaryslice(timeslice));
899  }
900 
901  if (parameters.trigger3DMuon.enabled ||
902  parameters.trigger3DShower.enabled ||
903  parameters.triggerMXShower.enabled ||
904  parameters.writeL0.prescale ||
905  parameters.writeL1.prescale ||
906  parameters.writeL2.prescale ||
907  parameters.writeSN.prescale ||
908  c_buffer.is_enabled()) {
909 
910  timesliceRouter->configure(timeslice);
911 
912  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
913  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
914  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
915  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
916  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
917 
918  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
919 
920  if (moduleRouter->hasModule(frame->getModuleID())) {
921 
922  if (!checksum(*frame)) {
923 
924  JErrorStream(logger) << "Invalid data at "
925  << "run = " << timeslice.getRunNumber() << ";"
926  << "frame index = " << timeslice.getFrameIndex() << ";"
927  << "module = " << frame->getModuleID() << ";"
928  << "discard and dump";
929 
930  timesliceTX.push_back(*frame);
931 
932  continue;
933  }
934 
935  const JModule& module = moduleRouter->getModule(frame->getModuleID());
936  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
937 
938  // Apply high-rate veto
939 
940  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
941 
942  // L0
943 
944  timesliceL0.push_back(JSuperFrame1D_t(buffer));
945 
946  // L1
947 
948  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
949  frame->getModuleIdentifier(),
950  module.getPosition()));
951 
952  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
953 
954  // L2
955 
956  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
957  frame->getModuleIdentifier(),
958  module.getPosition()));
959 
960  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
961 
962  // SN
963 
964  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
965  frame->getModuleIdentifier(),
966  module.getPosition()));
967 
968  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
969 
970  } else {
971 
972  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
973  }
974  }
975 
976  if (!timesliceTX.empty()) {
977  this->put(timesliceTX);
978  }
979 
980  // Trigger
981 
982  JTriggerInput trigger_input(timesliceL2);
983  JTriggerOutput trigger_output;
984 
985  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
986  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
987  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
988 
989  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
990 
991  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
992 
993  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
994 
995  if (this->put(object)) {
996  numberOfEvents += 1;
997  }
998  }
999 
1000  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1001 
1002  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1003 
1004  if (parameters.writeL1) { this->put(object); }
1005  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1006  }
1007 
1008  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1009 
1010  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1011 
1012  if (parameters.writeL2) { this->put(object); }
1013  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1014  }
1015 
1016  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1017 
1018  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1019 
1020  if (parameters.writeSN) { this->put(object); }
1021  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1022  }
1023 
1024  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1025 
1026  if (parameters.writeL0) { this->put(timeslice); }
1027  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1028  }
1029  }
1030 
1031  } catch(const JTriggerException& error) {
1032  JErrorStream(logger) << "Error = " << error.what() << ";"
1033  << "run = " << timeslice.getRunNumber() << ";"
1034  << "frame index = " << timeslice.getFrameIndex() << ";"
1035  << "time slicee not correctly processed!";
1036  }
1037  }
1038 
1039 
1040  /**
1041  * Report status to message logger.
1042  */
1043  void typeout()
1044  {
1045  timer.stop();
1046 
1047  const double T_us = (double) timer.usec_wall;
1048 
1049  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1050  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1051  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1052  try {
1053  JNoticeStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1054  }
1055  catch(const std::exception&) {}
1056  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1057  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1058 
1059  if (number_of_packets_received > 0) {
1060  JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1061  }
1062 
1063  JNoticeStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1064  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1065 
1066  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1067 
1068  if (numberOfTimeslicesProcessed > 0) {
1069  JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1070  JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1071  JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1072  }
1073 
1074  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1075  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1076 
1077  if (processedSlicesTime_us > 0) {
1078  JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1079  }
1080  if (processedDetectorTime_us > 0) {
1081  JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1082  }
1083 
1084  timer.start();
1085  }
1086 
1087 
1088  /**
1089  * Tagged action to handle alerts.
1090  *
1091  * \param tag tag
1092  * \param length number of characters
1093  * \param buffer message
1094  */
1095  virtual void actionTagged(const JTag& tag, int length, const char* buffer)
1096  {
1097  using namespace std;
1098 
1099  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1100 
1101  if (tag == RC_ALERT) {
1102 
1103  if (c_buffer.is_open()) {
1104 
1105  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1106 
1107  c_buffer.close();
1108  }
1109 
1110  if (c_buffer.is_enabled()) {
1111 
1112  c_buffer.open(path, getUniqueTag());
1113 
1114  if (c_buffer.is_open()) {
1115 
1116  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1117 
1118  putObject(c_buffer.getFile(), meta);
1119 
1120  } else {
1121 
1122  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1123 
1124  c_buffer.disable();
1125  }
1126  }
1127 
1128  } else {
1129 
1130  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1131  }
1132  }
1133 
1134  JMeta meta; //!< meta data
1135 
1136  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1137 
1138  private:
1139 
1141  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1142  std::string hostname; //!< host name of data server
1143 
1144  /**
1145  * Auxiliary method to send object to data server.
1146  *
1147  * \param object object to be sent
1148  * \return true if sent; else false
1149  */
1150  template<class T>
1151  bool put(const T& object)
1152  {
1153  try {
1154 
1155  datawriter->put(object);
1156 
1157  numberOfBytes += getSizeof(object);
1158 
1159  return true;
1160  }
1161  catch(const JControlHostException& error) {
1162  JErrorStream(logger) << error;
1163  }
1164 
1165  return false;
1166  }
1167 
1168 
1169  int port; //!< server socket port
1170  int backlog;
1172 
1173  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1174  JChannelList_t channelList; //!< connections to data queue
1175 
1177  JQuantile Qt;
1178 
1179  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1181  unsigned int frames_per_slice;
1183 
1184  // trigger
1185 
1188 
1194 
1198 
1203 
1204  // process management
1205 
1208 
1209  // memory management
1210 
1211  long long int totalCPURAM;
1212  unsigned int maxQueueDepth;
1213  long long int maxQueueSize;
1214  long long int queueSize;
1215 
1216  // statistics
1217 
1219 
1220  long long int numberOfEvents;
1221  long long int numberOfBytes;
1224 
1227 
1228  // temporary
1229 
1232  long long int number_of_reads;
1234 
1235  // circular buffer
1236 
1238  std::string path; //!< directory for writing circular buffer
1239  };
1240 }
1241 
1242 /**
1243  * \file
1244  *
1245  * Application for real-time filtering of data.
1246  * For more information, see KM3NETDAQ::JDataFilter.
1247  *
1248  * \author rbruijn and mdejong
1249  */
1250 int main(int argc, char* argv[])
1251 {
1252  using namespace std;
1253  using namespace JPP;
1254  using namespace KM3NETDAQ;
1255 
1256  string server;
1257  string logger;
1258  string hostname;
1259  string client_name;
1260  int port;
1261  int backlog;
1262  int buffer_size;
1263  bool use_cout;
1264  string path;
1265  int debug;
1266 
1267 
1268  try {
1269 
1270  JParser<> zap("Application for real-time filtering of data.");
1271 
1272  zap['H'] = make_field(server) = "localhost";
1273  zap['M'] = make_field(logger) = "localhost";
1274  zap['D'] = make_field(hostname) = "";
1275  zap['u'] = make_field(client_name) = "JDataFilter";
1276  zap['P'] = make_field(port);
1277  zap['q'] = make_field(backlog) = 1024;
1278  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1279  zap['c'] = make_field(use_cout);
1280  zap['p'] = make_field(path) = "/tmp/";
1281  zap['d'] = make_field(debug) = 0;
1282 
1283  zap(argc, argv);
1284  }
1285  catch(const exception& error) {
1286  FATAL(error.what() << endl);
1287  }
1288 
1289 
1290  JLogger* out = NULL;
1291 
1292  if (use_cout)
1293  out = new JStreamLogger(cout);
1294  else
1295  out = new JControlHostLogger(logger);
1296 
1297  JDataFilter dfilter(client_name,
1298  server,
1299  hostname,
1300  out,
1301  debug,
1302  port,
1303  backlog,
1304  buffer_size,
1305  path);
1306 
1307  dfilter.meta = JMeta(argc, argv);
1308 
1309  dfilter.enter();
1310  dfilter.run();
1311 }
JLOGGER::JDebugStream
Level specific message streamers.
Definition: JMessageStream.hh:113
JLOGGER::JLogger
Interface for logging messages.
Definition: JLogger.hh:22
KM3NETDAQ::JDataFilter::setSelect
virtual void setSelect(JFileDescriptorMask &mask) const
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:651
KM3NETDAQ::JDataFilter::hostname
std::string hostname
host name of data server
Definition: JDataFilter.cc:1142
JLANG::JAbstractFile::getFileDescriptor
int getFileDescriptor() const
Get file descriptor.
Definition: JAbstractFile.hh:75
JRuncontrolToolkit.hh
JMeta.hh
JLOGGER::JNoticeStream
Definition: JMessageStream.hh:116
JDAQ.hh
KM3NETDAQ::JDataFilter::maxQueueSize
long long int maxQueueSize
Definition: JDataFilter.cc:1213
JControlHostObjectOutput.hh
KM3NETDAQ::JDataFilter
Main class for real-time filtering of data.
Definition: JDataFilter.cc:144
KM3NETDAQ::JDataFilter::path
std::string path
directory for writing circular buffer
Definition: JDataFilter.cc:1238
JChecksum.hh
KM3NETDAQ::JDataFilter::actionEnter
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:314
KM3NETDAQ::JDataFilter::parameters
JTriggerParameters parameters
Definition: JDataFilter.cc:1186
KM3NETDAQ::JDataFilter::channelList
JChannelList_t channelList
connections to data queue
Definition: JDataFilter.cc:1174
KM3NETDAQ::JDataFilter::number_of_reads
long long int number_of_reads
Definition: JDataFilter.cc:1232
KM3NETDAQ::JDataFilter::JChannelList_t
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:150
KM3NETDAQ::JDataFilter::maximum_frames_per_slice
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:1182
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL2
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:257
JTriggerParameters.hh
JLANG::JSinglePointer< JControlHost_t >
JDETECTOR::getMaximalDistance
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
Definition: JDetectorToolkit.hh:78
JSuperFrame2D.hh
KM3NETDAQ::JDataFilter::JSuperFrame1D_t
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:153
JEEP::JTimer
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
KM3NETDAQ::JDataFilter::JCircularBuffer_t::open
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:181
KM3NETDAQ::JDataFilter::numberOfBytes
long long int numberOfBytes
Definition: JDataFilter.cc:1221
JNET::JServerSocket
TCP Server socket.
Definition: JServerSocket.hh:27
JAANET::getTimeRange
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e.
Definition: JAAnetToolkit.hh:134
JTrigger3DMuon.hh
JTriggeredEvent.hh
KM3NETDAQ::JDataFilter::actionReset
virtual void actionReset(int length, const char *buffer)
Definition: JDataFilter.cc:639
JMessage.hh
KM3NETDAQ::JDataFilter::timesliceRouter
JSinglePointer< JTimesliceRouter > timesliceRouter
Definition: JDataFilter.cc:1187
KM3NETDAQ::JDataFilter::actionTagged
virtual void actionTagged(const JTag &tag, int length, const char *buffer)
Tagged action to handle alerts.
Definition: JDataFilter.cc:1095
JPrint.hh
KM3NETDAQ::JDataFilter::c_buffer
JCircularBuffer_t c_buffer
Definition: JDataFilter.cc:1237
KM3NETDAQ::JDataFilter::meta
JMeta meta
meta data
Definition: JDataFilter.cc:1134
KM3NETDAQ::JDataFilter::trigger3DShower
JSinglePointer< JTrigger3DShower > trigger3DShower
Definition: JDataFilter.cc:1196
JLANG::JControlHostException
Exception for ControlHost.
Definition: JException.hh:450
JTRIGGER::JTriggerParameters
Data structure for all trigger parameters.
Definition: JTriggerParameters.hh:116
JLOGGER::JErrorStream
Definition: JMessageStream.hh:115
JTriggerToolkit.hh
KM3NETDAQ::JDataFilter::numberOfEvents
long long int numberOfEvents
Definition: JDataFilter.cc:1220
JTimeslice.hh
JTRIGGER::JBuildL1
Template L1 hit builder.
Definition: JBuildL1.hh:85
JSinglePointer.hh
JSYSTEM::JDateAndTime::getDay
int getDay() const
day of the month [1-31]
Definition: JDate.hh:94
JNET::JSocketInputChannel_t
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
KM3NETDAQ::JDataFilter::logErrorRun
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:1199
KM3NETDAQ::JDataFilter::number_of_packets_discarded
long long int number_of_packets_discarded
Definition: JDataFilter.cc:1231
JTimekeeper.hh
JNetwork.hh
KM3NETDAQ::JDataFilter::minFrameNumber
int minFrameNumber
Definition: JDataFilter.cc:1225
KM3NETDAQ::JDataFilter::JBuildL1_t
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:156
KM3NETDAQ::getUniqueTag
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
Definition: JRuncontrolToolkit.hh:441
JTRIGGER::JSuperFrame1D
1-dimensional frame with time calibrated data from one optical module.
Definition: JSuperFrame1D.hh:35
KM3NETDAQ::JDAQClient::run
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
JTOOLS::GIGABYTE
static const long long int GIGABYTE
Number of bytes in a megabyte.
Definition: JConstants.hh:81
std::vector< JSocketInputChannel_t >
KM3NETDAQ::JDataFilter::port
int port
server socket port
Definition: JDataFilter.cc:1169
KM3NETDAQ::JDAQChronometer::getFrameIndex
int getFrameIndex() const
Get frame index.
Definition: JDAQChronometer.hh:132
KM3NETDAQ::JDataFilter::timeslices
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
Definition: JDataFilter.cc:1179
KM3NETDAQ::JDataFilter::actionStart
virtual void actionStart(int length, const char *buffer)
Definition: JDataFilter.cc:542
JLANG::JFileDescriptorMask::has
bool has(const int file_descriptor) const
Has file descriptor.
Definition: JFileDescriptorMask.hh:216
KM3NETDAQ::JDataFilter::actionRunning
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:791
JLANG::JSocketException
Exception for socket.
Definition: JException.hh:432
KM3NETDAQ::JDataFilter::actionContinue
virtual void actionContinue(int length, const char *buffer)
Definition: JDataFilter.cc:625
KM3NETDAQ::JDAQTimeslice
Data time slice.
Definition: JDAQTimeslice.hh:30
JServerSocket.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t::disable
void disable()
Disable writing.
Definition: JDataFilter.cc:212
KM3NETDAQ::JDataFilter::buildL1
JSinglePointer< JBuildL1_t > buildL1
Definition: JDataFilter.cc:1191
KM3NETDAQ::JDataFilter::timer
JTimer timer
Definition: JDataFilter.cc:1176
JPARSER::JParser
Utility class to parse command line options.
Definition: JParser.hh:1493
KM3NETDAQ::JDataFilter::trigger3DMuon
JSinglePointer< JTrigger3DMuon > trigger3DMuon
Definition: JDataFilter.cc:1195
JEEP::JProperties::read
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JNET::JSocket
Socket class.
Definition: JSocket.hh:42
JDAQAbstractPreamble.hh
KM3NETDAQ::JDataFilter::reporting
bool reporting
Definition: JDataFilter.cc:1218
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL1
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:256
KM3NETDAQ::JDataFilter::actionStop
virtual void actionStop(int length, const char *buffer)
Definition: JDataFilter.cc:631
JTRIGGER::JTriggerInput
Data structure for input to trigger algorithm.
Definition: JTriggerInput.hh:32
JNET
Interprocess communication.
Definition: JDataFilter.cc:66
JROOT::putObject
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.
Definition: JRootFileWriter.hh:38
KM3NETDAQ::JDAQAbstractPreamble::getLength
int getLength() const
Get length.
Definition: JDAQAbstractPreamble.hh:49
JQuantile.hh
KM3NETDAQ::JDataFilter::backlog
int backlog
Definition: JDataFilter.cc:1170
JTriggerBits.hh
JDAQPreambleIO.hh
KM3NETDAQ::JDataFilter::buildL2
JSinglePointer< JBuildL2_t > buildL2
Definition: JDataFilter.cc:1192
JNET::JSocket::setReceiveBufferSize
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
JNET::JControlHostObjectOutput
Implemenation of object output through ControlHost.
Definition: JControlHostObjectOutput.hh:36
JIO::JByteArrayReader
Byte array binary input.
Definition: JByteArrayIO.hh:25
KM3NETDAQ::getFrameTime
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
JSupport.hh
JPP
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
Definition: JAAnetToolkit.hh:37
JBuildL1.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t::JCircularBuffer_t
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:170
KM3NETDAQ::getSizeof
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
JTriggerMXShower.hh
KM3NETDAQ::JDataFilter::buffer_size
int buffer_size
Definition: JDataFilter.cc:1171
KM3NETDAQ::JDataFilter::actionInit
virtual void actionInit(int length, const char *buffer)
Definition: JDataFilter.cc:331
JDAQEventIO.hh
JTRIGGER::JTriggerException
General exception.
Definition: JTriggerException.hh:23
JTRIGGER::JSuperFrame2D::applyHighRateVeto
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Definition: JSuperFrame2D.hh:122
KM3NETDAQ::JDataFilter::JTimeslice_t
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:155
JDAQTimesliceIO.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t::is_enabled
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:225
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeSN
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:258
KM3NETDAQ::JDataFilter::datawriter
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
Definition: JDataFilter.cc:1141
debug
int debug
debug level
Definition: JSirene.cc:59
JNET::JTag
ControlHost tag.
Definition: JTag.hh:35
JTRIGGER::JTimesliceL1
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
KM3NETDAQ::JDataFilter::actionConfigure
virtual void actionConfigure(int length, const char *buffer)
Definition: JDataFilter.cc:348
FILL
Auxiliary data structure for sequence of same character.
Definition: JPrint.hh:361
JConstants.hh
KM3NETDAQ::JDataFilter::logErrorIncomplete
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:1202
KM3NETDAQ::JDataFilter::moduleRouter
JSinglePointer< JModuleRouter > moduleRouter
Definition: JDataFilter.cc:1190
JTRIGGER::JTriggerOutput::merge
void merge(const JMatch_t &match)
Merge events.
Definition: JTriggerOutput.hh:41
JHitToolkit.hh
KM3NETDAQ::JDataFilter::actionExit
virtual void actionExit()
Definition: JDataFilter.cc:318
JDAQSuperFrameIO.hh
KM3NETDAQ::JDataFilter::dataFilters
std::vector< JDAQProcess > dataFilters
Definition: JDataFilter.cc:1206
KM3NETDAQ::JDataFilter::serversocket
JSinglePointer< JServerSocket > serversocket
server for data queue connections
Definition: JDataFilter.cc:1173
JTimesliceL1.hh
KM3NETDAQ::JDataFilter::maxQueueDepth
unsigned int maxQueueDepth
Definition: JDataFilter.cc:1212
JEEP::getFullPath
std::string getFullPath(const std::string &path)
Get full path, i.e.
Definition: JeepToolkit.hh:126
JNET::JSocket::accept
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
JSYSTEM::getLocalTime
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
Definition: JSystem/JTime.hh:61
JLOGGER::JWarningStream
Definition: JMessageStream.hh:114
KM3NETDAQ::JDataFilter::JControlHost_t
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
Definition: JDataFilter.cc:1140
JDAQTags.hh
KM3NETDAQ::JDAQTimesliceL0
Timeslice data structure for L0 data.
Definition: JDAQTimeslice.hh:255
JSYSTEM::JDateAndTime::getYear
int getYear() const
year a.d.
Definition: JDate.hh:96
buffer_size
const static size_t buffer_size
Definition: clb_swiss_knife.cpp:169
JEventOverlap.hh
JLOGGER::JStreamLogger
Message logging based on std::ostream.
Definition: JStreamLogger.hh:22
KM3NETDAQ::JDAQSummaryslice
Data summary slice.
Definition: JDAQSummaryslice.hh:25
JHit.hh
JTreeRecorder.hh
JTRIGGER::JTimeslice
Time slice with calibrated data.
Definition: JTimeslice.hh:26
JLANG::JFileDescriptorMask
Auxiliary class for method select.
Definition: JFileDescriptorMask.hh:24
JDETECTOR::JModule
Data structure for a composite optical module.
Definition: JModule.hh:49
JSUPPORT::JTreeRecorder
ROOT TTree object output.
Definition: JTreeRecorder.hh:28
KM3NETDAQ::JDataFilter::Qt
JQuantile Qt
Definition: JDataFilter.cc:1177
JMessageScheduler.hh
KM3NETDAQ::JDataFilter::maxFrameNumber
int maxFrameNumber
Definition: JDataFilter.cc:1226
JNET::JSocket::setKeepAlive
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
KM3NETDAQ::JDAQSuperFrameHeader
DAQ super frame header.
Definition: JDAQSuperFrameHeader.hh:19
JEEP::open
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:302
JTRIGGER::JTriggeredEvent
Auxiliary class to build JDAQEvent for a triggered event.
Definition: JTriggeredEvent.hh:41
KM3NETDAQ::JDataFilter::logErrorDetector
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:1200
KM3NETDAQ::RC_ALERT
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
JSYSTEM::localtime_t
long long int localtime_t
Type definition of local time.
Definition: JSystem/JTime.hh:21
KM3NETDAQ::JDataFilter::detector
JDetector detector
Definition: JDataFilter.cc:1189
JBuildL2.hh
JLANG::JFileDescriptorMask::set
void set(const int file_descriptor)
Set file descriptor.
Definition: JFileDescriptorMask.hh:154
JTRIGGER::checksum
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:161
KM3NETDAQ::JDAQClient
Control unit client base class.
Definition: JDAQClient.hh:272
JLOGGER::JControlHostLogger
Message logging based on ControlHost.
Definition: JControlHostLogger.hh:26
JParser.hh
JSYSTEM::JDateAndTime::getMonth
int getMonth() const
month of the year [1-12]
Definition: JDate.hh:95
KM3NETDAQ::JDataFilter::JCircularBuffer_t::operator<<
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:240
JDAQFrameIO.hh
KM3NETDAQ::JDataFilter::frames_per_slice
unsigned int frames_per_slice
Definition: JDataFilter.cc:1181
KM3NETDAQ::JDAQAbstractPreamble
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
Definition: JDAQAbstractPreamble.hh:21
JDETECTOR::JModuleRouter
Router for direct addressing of module data in detector data structure.
Definition: JModuleRouter.hh:34
JHitL1.hh
KM3NETDAQ::IO_TRIGGER_PARAMETERS
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
JSYSTEM::JDateAndTime
Auxililary class to get date and time.
Definition: JDate.hh:59
KM3NETDAQ::JDataFilter::number_of_bytes_received
long long int number_of_bytes_received
Definition: JDataFilter.cc:1233
JTimesliceRouter.hh
KM3NETDAQ::JDAQChronometer::getRunNumber
int getRunNumber() const
Get run number.
Definition: JDAQChronometer.hh:121
JSocketChannel.hh
KM3NETDAQ::JDataFilter::number_of_packets_received
long long int number_of_packets_received
Definition: JDataFilter.cc:1230
JLANG::JEquationParameters
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Definition: JEquationParameters.hh:20
JROOT::JTreeWriterObjectOutput
JTreeWriter object output.
Definition: JTreeWriterObjectOutput.hh:30
KM3NETDAQ::JDataFilter::processIndexSorter
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:269
make_field
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1954
JControlHost.hh
KM3NETDAQ::JDataFilter::current_slice_index
int current_slice_index
Definition: JDataFilter.cc:1180
JTrigger3DShower.hh
KM3NETDAQ::JDataFilter::processTimeSlice
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:891
JTOOLS::MEGABYTE
static const long long int MEGABYTE
Number of bytes in a kilobyte.
Definition: JConstants.hh:80
JDETECTOR::JDetector
Detector data structure.
Definition: JDetector.hh:80
JGEOMETRY3D::JPosition3D::getPosition
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:129
KM3NETDAQ::JDataFilter::JCircularBuffer_t::sizeL0
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:255
JNET::JSocket::setNonBlocking
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
KM3NETDAQ::JDataFilter::actionSelect
virtual void actionSelect(const JFileDescriptorMask &mask)
Action method following last select call.
Definition: JDataFilter.cc:665
JEEP::JProperties
Utility class to parse parameter values.
Definition: JProperties.hh:496
JTRIGGER::JTrigger3DShower
Shower trigger.
Definition: JTrigger3DShower.hh:22
JAANET::detector
Detector file.
Definition: JHead.hh:130
KM3NETDAQ::JDAQChronometer::getDAQChronometer
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
Definition: JDAQChronometer.hh:88
JROOT::getName
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:45
KM3NETDAQ::JDataFilter::actionQuit
virtual void actionQuit(int length, const char *buffer)
Definition: JDataFilter.cc:645
KM3NETDAQ::JDataFilter::typeout
void typeout()
Report status to message logger.
Definition: JDataFilter.cc:1043
JSuperFrame1D.hh
JSUPPORT::JMeta
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:71
JTRIGGER::JTriggerOutput
Set of triggered events.
Definition: JTriggerOutput.hh:23
std
Definition: jaanetDictionary.h:36
JEEP::JTimekeeper
Time keeper.
Definition: JTimekeeper.hh:34
KM3NETDAQ::JDataFilter::put
bool put(const T &object)
Auxiliary method to send object to data server.
Definition: JDataFilter.cc:1151
KM3NETDAQ::JDataFilter::numberOfIncompleteTimeslicesProcessed
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:1223
KM3NETDAQ
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
JSocket.hh
JeepToolkit.hh
KM3NETDAQ::JDAQSuperFrame
Data frame of one optical module.
Definition: JDAQSuperFrame.hh:22
KM3NETDAQ::RC_CMD
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
JDAQSummarysliceIO.hh
JTimer.hh
JProperties.hh
KM3NETDAQ::JDataFilter::JBuildL2_t
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:157
JSYSTEM::getRAM
unsigned long long int getRAM()
Get RAM of this CPU.
Definition: JSystemToolkit.hh:254
JLANG::JException::what
virtual const char * what() const
Get error message.
Definition: JException.hh:48
JNET::JTag::toString
std::string toString() const
Convert tag to string.
Definition: JTag.hh:167
JTRIGGER::JTriggerMXShower
Shower trigger.
Definition: JTriggerMXShower.hh:76
JTime.hh
KM3NETDAQ::JDataFilter::buildSN
JSinglePointer< JBuildL2_t > buildSN
Definition: JDataFilter.cc:1193
KM3NETDAQ::JDataFilter::logErrorIndex
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:1201
KM3NETDAQ::JDataFilter::JDataFilter
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:288
KM3NETDAQ::JDataFilter::triggerMXShower
JSinglePointer< JTriggerMXShower > triggerMXShower
Definition: JDataFilter.cc:1197
KM3NETDAQ::JDataFilter::hit_type
double hit_type
Definition: JDataFilter.cc:152
std::list
Definition: JSTDTypes.hh:24
JHitL0.hh
KM3NETDAQ::JDataFilter::JCircularBuffer_t
Circular buffer.
Definition: JDataFilter.cc:162
JTRIGGER::JSuperFrame2D
2-dimensional frame with time calibrated data from one optical module.
Definition: JSuperFrame2D.hh:41
FATAL
#define FATAL(A)
Definition: JMessage.hh:67
KM3NETDAQ::JDataFilter::dataQueues
std::vector< JDAQProcess > dataQueues
Definition: JDataFilter.cc:1207
KM3NETDAQ::JDAQProcess
Auxiliary class for itemization of process list.
Definition: JRuncontrolToolkit.hh:464
JLANG::trim
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JLOGGER::JMessageScheduler
Message logger with time scheduler.
Definition: JMessageScheduler.hh:25
KM3NETDAQ::JDAQProcess::index
std::string index
index in process list
Definition: JRuncontrolToolkit.hh:580
main
int main(int argc, char *argv[])
Definition: JDataFilter.cc:1250
KM3NETDAQ::JDataFilter::updateFrameQueue
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:806
JTRIGGER::JTrigger3DMuon
Muon trigger.
Definition: JTrigger3DMuon.hh:24
JSYSTEM::getListOfIPaddresses
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
KM3NETDAQ::JDAQTriggerCounter::reset
static void reset()
Reset counter of unique instance of this class object.
Definition: JDAQTriggerCounter.hh:88
JTRIGGER::JEventOverlap
Match of two events considering overlap in time.
Definition: JEventOverlap.hh:20
JLANG::JException
General exception.
Definition: JException.hh:23
KM3NETDAQ::JDataFilter::JSuperFrame2D_t
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:154
JSystemToolkit.hh
KM3NETDAQ::JDAQClient::enter
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
KM3NETDAQ::RC_DFILTER
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
KM3NETDAQ::JDataFilter::JSocketInputChannel_t
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:149
KM3NETDAQ::JDataFilter::queueSize
long long int queueSize
Definition: JDataFilter.cc:1214
KM3NETDAQ::JDataFilter::numberOfTimeslicesProcessed
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:1222
JNET::getSizeOfPacket
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:75
KM3NETDAQ::JDAQPreamble
DAQ preamble.
Definition: JDAQPreamble.hh:26
JNET::JSubscriptionAll
Auxiliary class for all subscription.
Definition: JControlHost.hh:96
JNET::JSocketInputChannel
Socket input channel.
Definition: JSocketChannel.hh:86
JTRIGGER::JTimesliceRouter
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
Definition: JTimesliceRouter.hh:62
KM3NETDAQ::JDataFilter::actionPause
virtual void actionPause(int length, const char *buffer)
Definition: JDataFilter.cc:592
JTRIGGER::JBuildL2
Template L2 builder.
Definition: JBuildL2.hh:45
KM3NETDAQ::JDataFilter::totalCPURAM
long long int totalCPURAM
Definition: JDataFilter.cc:1211
JLangToolkit.hh
JDAQClient.hh