Jpp  15.0.3
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 #include <unistd.h>
9 
10 #include "JLang/JSinglePointer.hh"
11 #include "JLang/JLangToolkit.hh"
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 #include "Jeep/JPrint.hh"
18 #include "Jeep/JeepToolkit.hh"
25 #include "JDAQ/JDAQTags.hh"
26 #include "JDAQ/JDAQEventIO.hh"
27 #include "JDAQ/JDAQTimesliceIO.hh"
29 #include "JTrigger/JHit.hh"
30 #include "JTrigger/JHitToolkit.hh"
33 #include "JTrigger/JTimeslice.hh"
34 #include "JTrigger/JHitL0.hh"
35 #include "JTrigger/JHitL1.hh"
36 #include "JTrigger/JBuildL1.hh"
37 #include "JTrigger/JBuildL2.hh"
41 #include "JTrigger/JTriggerNB.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 #include "JTrigger/JChecksum.hh"
52 #include "JNet/JControlHost.hh"
54 #include "JNet/JSocket.hh"
55 #include "JNet/JSocketChannel.hh"
56 #include "JNet/JServerSocket.hh"
57 #include "JPhysics/JConstants.hh"
58 #include "JTools/JQuantile.hh"
59 #include "JSupport/JSupport.hh"
61 #include "JSupport/JMeta.hh"
62 #include "JSystem/JTime.hh"
64 #include "JSystem/JNetwork.hh"
65 
66 
67 namespace JNET {
68 
69  /**
70  * Get size of packeet.
71  *
72  * \param preamble DAQ data preamble
73  * \return size [B]
74  */
75  template<>
77  {
78  return preamble.getLength();
79  }
80 }
81 
82 namespace KM3NETDAQ {
83 
84  using namespace JPP;
85 
86  /**
87  * Main class for real-time filtering of data.
88  *
89  * This class implements the action methods for each transition of the state machine.\n
90  * When the state machine is entered, data are continually collected using
91  * custom implementation of virtual methods
92  * setSelect and
93  * actionSelect.
94  *
95  * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
96  * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
97  * When a JDAQTimeslice is complete,
98  * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.\n
99  * The completeness of a time slice is defined by the parameter <tt>frames_per_slice</tt>
100  * which, together with other parameters, is parsed in method actionConfigure.\n
101  * <pre>
102  * numberOfFramesPerSlice = frames_per_slice = 1;
103  * </pre>
104  * Note that this parameter can change during operation (see below).
105  *
106  * A timeout may occur when the total amount of data or the number of incomplete time slices
107  * in the queue exceeds one of the corresponding pre-set limits:\n
108  * <pre>
109  * queueSize = <maximal queue size [B]>;
110  * queueDepth = <maximal queue depth>;
111  * </pre>
112  * Note that these values apply per JDataFilter.\n
113  *
114  * When a timeout occurs,
115  * the parameter <tt>frames_per_slice</tt> is set to the number of frames in the oldest time slice.\n
116  * When data are received with a frame index below the current frame index,
117  * the parameter <tt>frames_per_slice</tt> is incremented by one,
118  * up to the original value set at actionConfigure.\n
119  *
120  * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
121  * The following parameters can be used to configure the circular buffer.\n
122  * <pre>
123  * path = <write directory for circular buffer>;
124  * c_sizeL0 = <L0 buffer size>;
125  * c_sizeL1 = <L1 buffer size>;
126  * c_sizeL2 = <L2 buffer size>;
127  * c_sizeSN = <SN buffer size>;
128  * </pre>
129  *
130  * Note that when path is set to a valid directory:
131  * - There will always be a file created which is closed when the state machine is exited;
132  * - The file name is <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
133  * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
134  * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the closure of the file following an external alert;
135  * - After closure of the file, a new file will be opened if path is set;
136  *
137  * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> are the year, month and day at the time of
138  * the configuration or alert and <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
139  * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
140  * (e.g. when there is more than one alert on a given day).
141  *
142  * The application JConvert.cc can be used to readily convert multiple input files to a single output file.
143  *
144  * The script JDataFilter.sh can be used to test this application.
145  */
146  class JDataFilter :
147  public JDAQClient
148  {
149  public:
150 
153 
154  typedef double hit_type;
160 
161  /**
162  * Circular buffer.
163  */
165  public JTreeRecorder<JDAQTimesliceTypes_t>
166  {
168 
169  /**
170  * Default constructor.
171  */
173  {
174  disable();
175  }
176 
177  /**
178  * Open file.
179  *
180  * \param path directory
181  * \param tag unique process tag
182  */
183  void open(const std::string& path, const JTag& tag)
184  {
185  using namespace std;
186 
187  const JDateAndTime cal;
188 
189  for (int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
190 
191  ostringstream os;
192 
193  os << getFullPath(path)
194  << "KM3NeT"
195  << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
196  << "_" << tag;
197 
198  if (i != 0) {
199  os << "_" << i;
200  }
201 
202  os << ".root";
203 
204  try {
205  this->open(os.str().c_str());
206  }
207  catch(const exception& error) {}
208  }
209  }
210 
211  /**
212  * Disable writing.
213  */
214  void disable()
215  {
216  sizeL0 = 0;
217  sizeL1 = 0;
218  sizeL2 = 0;
219  sizeSN = 0;
220  }
221 
222  /**
223  * Check whether writing of data is enabled.
224  *
225  * \return true if writing enabled; else false
226  */
227  bool is_enabled() const
228  {
229  return (sizeL0 > 0 ||
230  sizeL1 > 0 ||
231  sizeL2 > 0 ||
232  sizeSN > 0);
233  }
234 
235  /**
236  * Write circular buffer to output stream.
237  *
238  * \param out output stream
239  * \param object circular buffer
240  * \return output stream
241  */
242  friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
243  {
244  if (object.is_open())
245  out << object.getFile()->GetName();
246  else
247  out << "void";
248 
249  out << ' ';
250 
251  return out << object.sizeL0 << '/'
252  << object.sizeL1 << '/'
253  << object.sizeL2 << '/'
254  << object.sizeSN << '/';
255  }
256 
257  Long64_t sizeL0; //!< Number of L0 time slices
258  Long64_t sizeL1; //!< Number of L1 time slices
259  Long64_t sizeL2; //!< Number of L2 time slices
260  Long64_t sizeSN; //!< Number of SN time slices
261  };
262 
263 
264  /**
265  * Sort DAQ process by index.
266  *
267  * \param first first DAQ process
268  * \param second second DAQ process
269  * \return true if index of first DAQ process less than that of second; else false
270  */
271  static inline bool processIndexSorter(const JDAQProcess& first, const JDAQProcess& second)
272  {
273  return first.index < second.index;
274  }
275 
276 
277  /**
278  * Constructor.
279  *
280  * \param name name of client
281  * \param server name of command message server
282  * \param hostname name of data server
283  * \param logger pointer to logger
284  * \param level debug level
285  * \param port server port
286  * \param backlog server backlog
287  * \param buffer_size server buffer
288  * \param path directory for writing circular buffer
289  */
290  JDataFilter(const std::string& name,
291  const std::string& server,
292  const std::string& hostname,
293  JLogger* logger,
294  const int level,
295  const int port,
296  const int backlog,
297  const int buffer_size,
298  const std::string& path) :
299  JDAQClient (name,server,logger,level),
300  hostname (hostname),
301  port (port),
302  backlog (backlog),
303  buffer_size(buffer_size),
304  path (path)
305  {
306  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
307 
308  addSubscription(JSubscriptionAll(RC_ALERT));
309 
310  totalCPURAM = getRAM();
311  current_slice_index = -1;
312  reporting = false;
313  }
314 
315 
316  virtual void actionEnter() override
317  {}
318 
319 
320  virtual void actionExit() override
321  {
322  if (c_buffer.is_open()) {
323 
324  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
325 
326  c_buffer.close();
327  }
328 
329  datawriter.reset();
330  }
331 
332 
333  virtual void actionInit(int length, const char* buffer) override
334  {
335  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
336 
337  try {
338 
339  JDebugStream(logger) << "Start server.";
340 
341  if (serversocket.is_valid()) {
342  serversocket->shutdown();
343  }
344 
345  serversocket.reset(new JServerSocket(port,backlog));
346  }
347  catch(const std::exception& error) {
348  JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
349  ev_error();
350  }
351  }
352 
353 
354  virtual void actionConfigure(int length, const char* buffer) override
355  {
356  using namespace std;
357 
358  JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
359 
360  long long int update_s = 10;
361  long long int logger_s = 5;
362 
363  parameters .reset();
364  dataFilters.clear();
365  dataQueues .clear();
366 
367  reporting = false;
368  dumpCount = 0;
369  dumpLimit = numeric_limits<int>::max();
370 
371  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
372 
373  properties["dataWriter"] = hostname;
374  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
375  properties["detector"] = detector;
376  properties["triggerParameters"] = parameters;
377  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
378  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
379  properties["frameIndex"] = maximal_frame_index = 100000;
380  properties["logger_s"] = logger_s;
381  properties["update_s"] = update_s;
382  properties["JDataFilter"] = dataFilters;
383  properties["DataQueue"] = dataQueues;
384  properties["path"] = path;
385  properties["c_sizeL0"] = c_buffer.sizeL0;
386  properties["c_sizeL1"] = c_buffer.sizeL1;
387  properties["c_sizeL2"] = c_buffer.sizeL2;
388  properties["c_sizeSN"] = c_buffer.sizeSN;
389  properties["dumpLimit"] = dumpLimit;
390 
391  try {
392  properties.read(string(buffer, length));
393  }
394  catch(const exception& error) {
395  JErrorStream(logger) << error.what();
396  }
397 
398  if (update_s <= 0) { update_s = 1; }
399  if (logger_s <= 0) { logger_s = 1; }
400 
401  setClockInterval(update_s * 1000000LL);
402 
403  hostname = trim(hostname);
404 
405  if (hostname != "")
406  datawriter.reset(new JControlHost_t(hostname));
407  else
408  throw JException("Undefined data writer host name.");
409 
410  maximum_frames_per_slice = frames_per_slice;
411 
412  // process processlist
413 
414  if (dataFilters.empty()) {
415  JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
416  << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
417  }
418 
419  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
420 
421  unsigned int numberOfDataFiltersOnThisMachine = 0;
422  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
423 
425 
426  {
427  JNoticeStream notice(logger);
428 
429  notice << "My IP addresses:";
430 
431  for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
432  notice << ' ' << *i;
433  }
434  }
435 
436  for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
437 
438  JDebugStream(logger) << "Test IP address \"" << i->hostname << "\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
439 
440  if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
441 
442  numberOfDataFiltersOnThisMachine++;
443 
444  if (i->port == this->port) {
445  thisProcess = i;
446  }
447  }
448  }
449 
450  if (numberOfDataFiltersOnThisMachine == 0) {
451  JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
452  << "Assuming one datafilter on this machine.";
453  numberOfDataFiltersOnThisMachine = 1;
454  }
455 
456  if (thisProcess == dataFilters.end()) {
457  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
458  }
459 
460  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
461  JErrorStream(logger) << "Mismatch between given process names: "
462  << "I am called " << getName()
463  << ", but in the process list I am referred to as " << thisProcess->index;
464  }
465 
466  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
467  reporting = true;
468  }
469 
470  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
471 
472  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
473 
474  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
475  << "Queue size reduced to "
476  << maxQueueSize << " bytes." ;
477  }
478 
479  // trigger parameters
480 
482 
483  triggerNB .reset(new JTriggerNB (parameters));
484  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
485  trigger3DShower.reset(new JTrigger3DShower(parameters));
486  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
487 
488  moduleRouter.reset(new JModuleRouter(detector));
489 
490  if (reporting) {
491  JNoticeStream(logger) << "This data filter process will report.";
492  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
493  JDebugStream (logger) << "Trigger parameters: " << parameters;
494  JDebugStream (logger) << "Detector description: " << endl << detector;
495  JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
496  }
497 
498  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
499 
500  // set L1, L2 and SN builders
501 
502  buildL1.reset(new JBuildL1_t(parameters));
503  buildL2.reset(new JBuildL2_t(parameters.L2));
504  buildSN.reset(new JBuildL2_t(parameters.SN));
505  buildNB.reset(new JBuildL2_t(parameters.NB));
506 
507  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
508  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
509  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
510  if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
511 
512  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
513  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
514  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
515  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
516 
517  if (c_buffer.is_enabled()) {
518 
519  if (!c_buffer.is_open()) {
520 
521  c_buffer.open(path, getUniqueTag());
522 
523  if (c_buffer.is_open()) {
524 
525  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
526 
527  putObject(c_buffer.getFile(), meta);
528 
529  } else {
530 
531  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
532 
533  c_buffer.disable();
534  }
535 
536  } else {
537 
538  JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
539  }
540 
541  if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
542  if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
543  if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
544  if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
545 
546  } else {
547 
548  if (c_buffer.is_open()) {
549  c_buffer.close();
550  }
551  }
552  }
553 
554 
555  virtual void actionStart(int length, const char* buffer) override
556  {
557  using namespace std;
558 
559  if (reporting) {
560  JNoticeStream(logger) << "Start run " << getRunNumber();
561  }
562 
563  timeslices.clear();
564 
565  current_slice_index = -1;
566  queueSize = 0;
567 
568  numberOfEvents = 0;
569  numberOfBytes = 0;
570  numberOfTimeslicesProcessed = 0;
571  numberOfIncompleteTimeslicesProcessed = 0;
572 
573  number_of_packets_received = 0;
574  number_of_packets_discarded = 0;
575  number_of_bytes_received = 0;
576  number_of_reads = 0;
577 
578  minFrameNumber = numeric_limits<int>::max();
579  maxFrameNumber = numeric_limits<int>::min();
580 
581  // Reset global trigger counter.
582 
584 
585  logErrorRun .reset();
586  logErrorDetector .reset();
587  logErrorIndex .reset();
588  logErrorIncomplete.reset();
589 
590  timer.reset();
591  timer.start();
592 
593  Qt.reset();
594 
595  // send trigger parameters to the datawriter
596 
597  ostringstream os;
598 
599  os << getRunNumber() << ' ' << parameters;
600 
601  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
602  }
603 
604 
605  virtual void actionPause(int length, const char* buffer) override
606  {
607  using namespace std;
608 
609  if (!timeslices.empty()) {
610 
611  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
612 
613  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
614  queueSize -= getSizeof(*i);
615  }
616 
617  timeslices.clear();
618  }
619 
620  { // force clearance of memory
621 
623 
624  timeslices.swap(buffer);
625  }
626 
627  if (queueSize != 0) {
628  JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
629  }
630 
631  current_slice_index = -1;
632  queueSize = 0;
633 
634  timer.stop();
635  }
636 
637 
638  virtual void actionContinue(int length, const char* buffer) override
639  {
640  timer.start();
641  }
642 
643 
644  virtual void actionStop(int length, const char* buffer) override
645  {
646  typeout();
647 
648  datawriter.reset();
649  }
650 
651 
652  virtual void actionReset(int length, const char* buffer) override
653  {
654  if (serversocket.is_valid()) {
655  serversocket->shutdown();
656  }
657 
658  serversocket.reset();
659  }
660 
661 
662  virtual void actionQuit(int length, const char* buffer) override
663  {
664  datawriter.reset();
665  }
666 
667 
668  virtual void setSelect(JFileDescriptorMask& mask) const override
669  {
670  if (serversocket.is_valid()) {
671  mask.set(*serversocket);
672  }
673 
674  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
675  if (!channel->isReady()) {
676  mask.set(channel->getFileDescriptor());
677  }
678  }
679  }
680 
681 
682  virtual void actionSelect(const JFileDescriptorMask& mask) override
683  {
684  using namespace std;
685 
686  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
687 
688  try {
689 
690  if (mask.has(channel->getFileDescriptor())) {
691  channel->read();
692  }
693 
694  if (channel->isReady()) {
695 
696  number_of_packets_received += 1;
697  number_of_reads += channel->getCounter();
698  number_of_bytes_received += channel->size();
699 
700  if (isRunning()) {
701 
702  updateFrameQueue(channel);
703 
704  } else {
705 
706  JErrorStream(logErrorRun) << "Receiving data while not running.";
707 
708  number_of_packets_discarded += 1;
709  }
710 
711  channel->reset();
712  }
713 
714  ++channel;
715  }
716  catch(const exception& error) {
717 
718  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
719 
720  channel->shutdown();
721 
722  channel = channelList.erase(channel);
723  }
724  }
725 
726 
727  if (serversocket.is_valid()) {
728 
729  if (mask.has(*serversocket)) {
730 
731  JSocket socket;
732 
733  socket.accept(serversocket->getFileDescriptor());
734 
735  //socket.setSendBufferSize (buffer_size);
737 
738  socket.setKeepAlive (true);
739  socket.setNonBlocking(true);
740 
741  JNoticeStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']';
742 
743  channelList.push_back(JSocketInputChannel_t(socket));
744  }
745  }
746 
747 
748  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
749  timeslices.size() >= maxQueueDepth ||
750  queueSize >= maxQueueSize)) {
751 
752  const JDAQTimesliceL0& pending_slice = timeslices.front();
753  queueSize -= getSizeof(pending_slice);
754 
755  current_slice_index = pending_slice.getFrameIndex();
756  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
757  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
758 
759  if (pending_slice.size() > frames_per_slice) {
760 
761  JErrorStream(logger) << "More frames in timeslice than expected "
762  << pending_slice.size() << " > " << frames_per_slice;
763 
764  if (pending_slice.size() <= maximum_frames_per_slice) {
765 
766  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
767 
768  frames_per_slice = pending_slice.size();
769  }
770  }
771 
772  if (!pending_slice.empty()) {
773 
774  const localtime_t t0 = getLocalTime();
775 
776  processTimeSlice(pending_slice);
777 
778  const localtime_t t1 = getLocalTime();
779 
780  numberOfTimeslicesProcessed += 1;
781 
782  Qt.put(t1 - t0);
783 
784  if (pending_slice.size() < frames_per_slice) {
785 
786  numberOfIncompleteTimeslicesProcessed += 1;
787 
788  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice: "
789  << "Frame index = " << pending_slice.getFrameIndex() << ';'
790  << "Size of timeslice = " << pending_slice.size() << ';'
791  << "Queue depth = " << timeslices.size() << ';'
792  << "Queue size = " << queueSize;
793 
794  if (!timeslices.empty()) {
795 
796  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
797 
798  frames_per_slice = pending_slice.size();
799  }
800  }
801  }
802 
803  timeslices.pop_front();
804  }
805  }
806 
807 
808  virtual void actionRunning() override
809  {
810  if (reporting) {
811  typeout();
812  }
813  }
814 
815 
816  /**
817  * Update queue with data frames.
818  *
819  * Note that any discarded data will be reported.
820  *
821  * \param channel incoming data channel
822  */
823  void updateFrameQueue(const JChannelList_t::const_iterator channel)
824  {
825  using namespace std;
826 
827  JByteArrayReader in(channel->data(), channel->size());
828 
829  JDAQPreamble preamble;
830  JDAQSuperFrameHeader header;
831 
832  in >> preamble;
833  in >> header;
834 
835  if (preamble.getLength() != channel->size()) {
836 
837  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
838  << "preamble.getLength() = " << preamble.getLength() << ';'
839  << "channel->size(): " << channel->size() << ';';
840 
841  number_of_packets_discarded += 1;
842 
843  return;
844  }
845 
846  if (header.getRunNumber() != getRunNumber()) {
847 
848  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
849  << " unequal to current run " << getRunNumber()
850  << " -> Dropping frame.";
851 
852  number_of_packets_discarded += 1;
853 
854  return;
855  }
856 
857  if (header.getFrameIndex() <= current_slice_index) {
858 
859  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
860  << " -> Dropping frame.";
861 
862  number_of_packets_discarded += 1;
863 
864  if (frames_per_slice < maximum_frames_per_slice) {
865 
866  frames_per_slice++;
867 
868  JErrorStream(logErrorIndex) << "Increase number of frames expected to: " << frames_per_slice;
869  }
870 
871  return;
872  }
873 
874  if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
875 
876  JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
877  << " -> Dropping frame.";
878 
879  number_of_packets_discarded += 1;
880 
881  return;
882  }
883 
884  list<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
885 
886  while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
887  ++timesliceIterator;
888  }
889 
890  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
891 
892  // The corresponding time slice already exists
893 
894  } else {
895 
896  // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
897 
898  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
899 
900  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
901 
902  queueSize += getSizeof(*timesliceIterator);
903  }
904 
905  timesliceIterator->push_back(JDAQSuperFrame(header));
906 
907  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
908 
909  queueSize += getSizeof(*timesliceIterator->rbegin());
910  }
911 
912 
913  /**
914  * Process time slice.
915  *
916  * \param timeslice time slice
917  */
918  void processTimeSlice(const JDAQTimesliceL0& timeslice)
919  {
920  using namespace std;
921 
922  try {
923 
924  timesliceRouter->configure(timeslice);
925 
926  if (parameters.writeSummary()) {
927  this->put(JDAQSummaryslice(timeslice));
928  }
929 
930  if (parameters.trigger3DMuon.enabled ||
931  parameters.trigger3DShower.enabled ||
932  parameters.triggerMXShower.enabled ||
933  parameters.triggerNB.enabled ||
934  parameters.writeL0.prescale ||
935  parameters.writeL1.prescale ||
936  parameters.writeL2.prescale ||
937  parameters.writeSN.prescale ||
938  c_buffer.is_enabled()) {
939 
940  JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
941  JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
942  JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
943  JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
944  JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
945  JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
946 
947  for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
948 
949  if (moduleRouter->hasModule(frame->getModuleID())) {
950 
951  if (!checksum(*frame)) {
952 
953  JErrorStream(logger) << "Invalid data at "
954  << "run = " << timeslice.getRunNumber() << ";"
955  << "frame index = " << timeslice.getFrameIndex() << ";"
956  << "module = " << frame->getModuleID() << ";"
957  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
958 
959  timesliceTX.push_back(*frame);
960 
961  continue;
962  }
963 
964  const JModule& module = moduleRouter->getModule(frame->getModuleID());
965  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
966 
967  // Apply high-rate veto
968 
969  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
970 
971  // L0
972 
973  timesliceL0.push_back(JSuperFrame1D_t(buffer));
974 
975  // Nano-beacon trigger
976 
977  if (parameters.triggerNB.enabled) {
978 
979  JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
980 
981  if (buffer.begin() != __end) {
982 
983  timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
984  frame->getModuleIdentifier(),
985  module.getPosition()));
986 
987  JSuperFrame1D_t zbuf;
988 
989  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
990 
991  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
992  }
993  }
994 
995  // L1
996 
997  timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
998  frame->getModuleIdentifier(),
999  module.getPosition()));
1000 
1001  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1002 
1003  // L2
1004 
1005  timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1006  frame->getModuleIdentifier(),
1007  module.getPosition()));
1008 
1009  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1010 
1011  // SN
1012 
1013  timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1014  frame->getModuleIdentifier(),
1015  module.getPosition()));
1016 
1017  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1018 
1019  } else {
1020 
1021  JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1022  }
1023  }
1024 
1025  if (!timesliceTX.empty()) {
1026 
1027  if (dumpCount < dumpLimit) {
1028 
1029  this->put(timesliceTX);
1030 
1031  dumpCount += 1;
1032  }
1033  }
1034 
1035  // Trigger
1036 
1037  if (parameters.triggerNB.enabled) {
1038 
1039  const JTriggerInput trigger_input(timesliceNB);
1040 
1041  for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1042 
1043  if (parameters.triggerNB.write()) {
1044 
1045  JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1046  getTriggerMask(triggerNB->getTriggerBit()),
1047  *hit,
1048  *timesliceRouter,
1049  *moduleRouter,
1050  parameters.TMaxLocal_ns,
1051  parameters.triggerNB.DMax_m,
1052  getTimeRange(parameters.triggerNB));
1053 
1054  this->put(tev);
1055  }
1056  }
1057  }
1058 
1059  JTriggerInput trigger_input(timesliceL2);
1060  JTriggerOutput trigger_output;
1061 
1062  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1063  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1064  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1065 
1066  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1067 
1068  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1069 
1070  const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1071 
1072  if (this->put(object)) {
1073  numberOfEvents += 1;
1074  }
1075  }
1076 
1077  if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1078 
1079  const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1080 
1081  if (parameters.writeL1) { this->put(object); }
1082  if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1083  }
1084 
1085  if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1086 
1087  const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1088 
1089  if (parameters.writeL2) { this->put(object); }
1090  if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1091  }
1092 
1093  if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1094 
1095  const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1096 
1097  if (parameters.writeSN) { this->put(object); }
1098  if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1099  }
1100 
1101  if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1102 
1103  if (parameters.writeL0) { this->put(timeslice); }
1104  if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1105  }
1106  }
1107 
1108  } catch(const exception& error) {
1109 
1110  JErrorStream(logger) << "Error = " << error.what() << ";"
1111  << "run = " << timeslice.getRunNumber() << ";"
1112  << "frame index = " << timeslice.getFrameIndex() << ";"
1113  << "time slice not correctly processed;"
1114  << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1115 
1116  if (dumpCount < dumpLimit) {
1117 
1118  this->put(static_cast<const JDAQTimeslice&>(timeslice));
1119 
1120  dumpCount += 1;
1121  }
1122  }
1123 
1124  timesliceRouter->reset();
1125  }
1126 
1127 
1128  /**
1129  * Report status to message logger.
1130  */
1131  void typeout()
1132  {
1133  timer.stop();
1134 
1135  const double T_us = (double) timer.usec_wall;
1136 
1137  JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1138  JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1139  JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1140  try {
1141  JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 << " +/- " << Qt.getDeviation() * 1.0e-3;
1142  }
1143  catch(const std::exception&) {}
1144  JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1145  JStatusStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
1146 
1147  if (number_of_packets_received > 0) {
1148  JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1149  }
1150 
1151  JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1152  JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
1153 
1154  JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1155 
1156  if (numberOfTimeslicesProcessed > 0) {
1157  JStatusStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1158  JStatusStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1159  JStatusStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1160  }
1161 
1162  const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1163  const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1164 
1165  if (processedSlicesTime_us > 0) {
1166  JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1167  }
1168  if (processedDetectorTime_us > 0) {
1169  JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1170  }
1171 
1172  timer.start();
1173  }
1174 
1175 
1176  /**
1177  * Tagged action to handle alerts.
1178  *
1179  * \param tag tag
1180  * \param length number of characters
1181  * \param buffer message
1182  */
1183  virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1184  {
1185  using namespace std;
1186 
1187  JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1188 
1189  if (tag == RC_ALERT) {
1190 
1191  if (c_buffer.is_open()) {
1192 
1193  JNoticeStream(logger) << "Close circular buffer " << c_buffer;
1194 
1195  c_buffer.close();
1196  }
1197 
1198  if (c_buffer.is_enabled()) {
1199 
1200  c_buffer.open(path, getUniqueTag());
1201 
1202  if (c_buffer.is_open()) {
1203 
1204  JNoticeStream(logger) << "Created circular buffer " << c_buffer;
1205 
1206  putObject(c_buffer.getFile(), meta);
1207 
1208  } else {
1209 
1210  JErrorStream (logger) << "Failed to create circular buffer in directory <" << path << ">; disable functionality.";
1211 
1212  c_buffer.disable();
1213  }
1214  }
1215 
1216  } else {
1217 
1218  JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1219  }
1220  }
1221 
1222  JMeta meta; //!< meta data
1223 
1224  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
1225 
1226  private:
1227 
1229  JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1230  std::string hostname; //!< host name of data server
1231 
1232  /**
1233  * Auxiliary method to send object to data server.
1234  *
1235  * \param object object to be sent
1236  * \return true if sent; else false
1237  */
1238  template<class T>
1239  bool put(const T& object)
1240  {
1241  try {
1242 
1243  datawriter->put(object);
1244 
1245  numberOfBytes += getSizeof(object);
1246 
1247  return true;
1248  }
1249  catch(const std::exception& error) {
1250  JErrorStream(logger) << error.what();
1251  }
1252 
1253  return false;
1254  }
1255 
1256 
1257  int port; //!< server socket port
1258  int backlog;
1260 
1261  JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1262  JChannelList_t channelList; //!< connections to data queue
1263 
1265  JQuantile Qt;
1266 
1267  std::list<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1269  unsigned int frames_per_slice;
1272 
1273  // trigger
1274 
1277 
1284 
1289 
1294 
1297 
1298  // process management
1299 
1302 
1303  // memory management
1304 
1305  long long int totalCPURAM;
1306  unsigned int maxQueueDepth;
1307  long long int maxQueueSize;
1308  long long int queueSize;
1309 
1310  // statistics
1311 
1313 
1314  long long int numberOfEvents;
1315  long long int numberOfBytes;
1318 
1321 
1322  // temporary
1323 
1326  long long int number_of_reads;
1328 
1329  // circular buffer
1330 
1332  std::string path; //!< directory for writing circular buffer
1333  };
1334 }
1335 
1336 /**
1337  * \file
1338  *
1339  * Application for real-time filtering of data.
1340  * For more information, see KM3NETDAQ::JDataFilter.
1341  *
1342  * \author rbruijn and mdejong
1343  */
1344 int main(int argc, char* argv[])
1345 {
1346  using namespace std;
1347  using namespace JPP;
1348  using namespace KM3NETDAQ;
1349 
1350  string server;
1351  string logger;
1352  string hostname;
1353  string client_name;
1354  int port;
1355  int backlog;
1356  int buffer_size;
1357  bool use_cout;
1358  string path;
1359  int debug;
1360 
1361 
1362  try {
1363 
1364  JParser<> zap("Application for real-time filtering of data.");
1365 
1366  zap['H'] = make_field(server) = "localhost";
1367  zap['M'] = make_field(logger) = "localhost";
1368  zap['D'] = make_field(hostname) = "";
1369  zap['u'] = make_field(client_name) = "JDataFilter";
1370  zap['P'] = make_field(port);
1371  zap['q'] = make_field(backlog) = 1024;
1372  zap['s'] = make_field(buffer_size) = 32 * MEGABYTE; // TCP buffer of 32 MB
1373  zap['c'] = make_field(use_cout);
1374  zap['p'] = make_field(path) = "/tmp/";
1375  zap['d'] = make_field(debug) = 0;
1376 
1377  zap(argc, argv);
1378  }
1379  catch(const exception& error) {
1380  FATAL(error.what() << endl);
1381  }
1382 
1383 
1384  JLogger* out = NULL;
1385 
1386  if (use_cout)
1387  out = new JStreamLogger(cout);
1388  else
1389  out = new JControlHostLogger(logger);
1390 
1391  JDataFilter dfilter(client_name,
1392  server,
1393  hostname,
1394  out,
1395  debug,
1396  port,
1397  backlog,
1398  buffer_size,
1399  path);
1400 
1401  dfilter.meta = JMeta(argc, argv);
1402 
1403  dfilter.enter();
1404  dfilter.run();
1405 }
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:808
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1500
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
Definition: JDataFilter.cc:918
General exception.
Definition: JException.hh:23
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:76
void open(const std::string &path, const JTag &tag)
Open file.
Definition: JDataFilter.cc:183
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:677
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
JMeta meta
meta data
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
Definition: JModule.hh:68
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
static bool processIndexSorter(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Definition: JDataFilter.cc:271
std::list< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:159
Message logging based on std::ostream.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:156
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:151
Long64_t sizeL1
Number of L1 time slices.
Definition: JDataFilter.cc:258
Detector data structure.
Definition: JDetector.hh:89
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:51
Router for direct addressing of module data in detector data structure.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size, const std::string &path)
Constructor.
Definition: JDataFilter.cc:290
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Definition: JCookie.sh:42
Utility class to parse parameter values.
Definition: JProperties.hh:496
JSinglePointer< JServerSocket > serversocket
server for data queue connections
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
TCP Server socket.
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:68
Socket class.
Definition: JSocket.hh:43
JTriggerParameters parameters
Main class for real-time filtering of data.
Definition: JDataFilter.cc:146
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:128
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
T * open(const std::string &file_name)
Open file.
Definition: JeepToolkit.hh:306
Tools for handling different hit types.
Utility class to parse parameter values.
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition: JChecksum.hh:176
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:158
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t()
Default constructor.
Definition: JDataFilter.cc:172
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Definition: JDataFilter.cc:227
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:155
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:157
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:45
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:373
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
I/O formatting auxiliaries.
Detector file.
Definition: JHead.hh:224
virtual void actionContinue(int length, const char *buffer) override
Definition: JDataFilter.cc:638
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:152
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
Definition: JDataFilter.cc:242
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
Definition: JDAQCHSM.chsm:175
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1961
Byte array binary input.
Definition: JByteArrayIO.hh:25
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
Auxiliary methods for handling file names, type names and environment.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
static const size_t buffer_size
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
Data frame.
Definition: JDAQFrame.hh:64
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Data time slice.
Auxiliary class to build JDAQEvent for a triggered event.
Physics constants.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
Definition: JDataFilter.cc:823
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
Definition: JDataFilter.cc:316
Level specific message streamers.
int debug
debug level
Definition: JSirene.cc:63
long long int number_of_packets_received
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
static const long long int MEGABYTE
Number of bytes in a kilo-byte.
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:130
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataFilter.cc:662
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DOM<$WORKDIR/ev_configure_domsimulator.txt > RC_DWRT path
Nano-beacon trigger.
Definition: JTriggerNB.hh:19
Long64_t sizeSN
Number of SN time slices.
Definition: JDataFilter.cc:260
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
Socket input channel.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:251
std::string hostname
host name of data server
unsigned int maxQueueDepth
Template L1 hit builder.
Definition: JBuildL1.hh:85
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
#define FATAL(A)
Definition: JMessage.hh:67
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
Definition: JDataFilter.cc:555
bool put(const T &object)
Auxiliary method to send object to data server.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
Definition: JDataFilter.cc:652
long long int queueSize
virtual void actionExit() override
Definition: JDataFilter.cc:320
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
JChannelList_t channelList
connections to data queue
Control unit client base class.
Definition: JDAQClient.hh:272
Auxililary class to get date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Definition: JDAQTags.hh:57
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataFilter.cc:682
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition: JNetwork.hh:216
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
Definition: JDataFilter.cc:257
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
System auxiliaries.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:110
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
std::string path
directory for writing circular buffer
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
const char * getName()
Get ROOT name of given data type.
Definition: JRootToolkit.hh:57
Data structure for input to trigger algorithm.
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:152
System time information.
Setting of trigger bits.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
Definition: JDataFilter.cc:644
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataFilter.cc:333
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY source JAcoustics sh $DETECTOR_ID CHECK_EXIT_CODE typeset A TRIPODS get_tripods $WORKDIR tripod txt TRIPODS for EMITTER in
Definition: JCanberra.sh:42
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
Definition: JDAQEventIO.hh:26
virtual void actionPause(int length, const char *buffer) override
Definition: JDataFilter.cc:605
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Definition: JDataFilter.cc:259
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataFilter.cc:668
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataFilter.cc:354
Time slice with calibrated data.
Definition: JTimeslice.hh:26