Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataFilter.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <iostream>
4 #include <iomanip>
5 #include <list>
6 #include <limits>
7 #include <algorithm>
8 
10 #include "JLang/JLangToolkit.hh"
11 
12 #include "Jeep/JParser.hh"
13 #include "Jeep/JProperties.hh"
14 #include "Jeep/JTimer.hh"
15 #include "Jeep/JTimekeeper.hh"
16 #include "Jeep/JMessage.hh"
17 
19 
20 #include "JDAQ/JDAQ.hh"
21 #include "JDAQ/JDAQTags.hh"
22 #include "JDAQ/JDAQEvent.hh"
23 #include "JDAQ/JDAQTimeslice.hh"
25 #include "JDAQ/JDAQPreamble.hh"
26 #include "JDAQ/JDAQSuperFrame.hh"
27 #include "JDAQ/JDAQFrame.hh"
28 #include "JDAQ/JDAQSummaryslice.hh"
29 
30 #include "JTrigger/JHit.hh"
31 #include "JTrigger/JHitToolkit.hh"
34 #include "JTrigger/JTimeslice.hh"
35 #include "JTrigger/JHitL0.hh"
36 #include "JTrigger/JHitL1.hh"
37 #include "JTrigger/JBuildL1.hh"
38 #include "JTrigger/JBuildL2.hh"
42 #include "JTrigger/JTriggerBits.hh"
46 #include "JTrigger/JTimesliceL1.hh"
49 
52 
53 #include "JNet/JControlHost.hh"
55 #include "JNet/JSocket.hh"
56 #include "JNet/JSocketChannel.hh"
57 #include "JNet/JServerSocket.hh"
58 
59 #include "JTools/JConstants.hh"
60 
61 #include "JSupport/JSupport.hh"
62 
64 #include "JSystem/JNetwork.hh"
65 
66 
67 using namespace KM3NETDAQ;
68 using namespace JPP;
69 using namespace std;
70 //using JTOOLS::make_range;
71 
72 
73 bool processIndexSorter(const JDAQProcess &a, const JDAQProcess& b)
74 {
75  return a.index<b.index;
76 }
77 
78 namespace JNET {
79 
80  template<>
81  inline int getSizeOfPacket (const KM3NETDAQ::JDAQAbstractPreamble& preamble)
82  {
83  return preamble.getLength();
84  }
85 }
86 
89 typedef double hit_type;
90 
97 
98 
99 class JDataFilter :
100  public JDAQClient
101 {
102 public:
103  /**
104  * Constructor.
105  *
106  * \param name name of client
107  * \param server name of command message server
108  * \param hostname name of data server
109  * \param logger pointer to logger
110  * \param level debug level
111  * \param port server port
112  * \param backlog server backlog
113  * \param buffer_size server buffer
114  */
115  JDataFilter(const std::string& name,
116  const std::string& server,
117  const std::string& hostname,
118  JLogger* logger,
119  const int level,
120  const int port,
121  const int backlog,
122  const int buffer_size) :
123  JDAQClient(name,server,logger,level),
124  hostname (hostname),
125  parameters()
126  {
127  this->port = port;
128  this->backlog = backlog;
129  this->buffer_size = buffer_size;
130 
131  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
132 
133  totalCPURAM = getRAM();
134 
135  current_slice_index = -1;
136 
137  reporting = false;
138  }
139 
140 
141  virtual void actionEnter()
142  {
143  try {
144 
145  JDebugStream(logger) << "Start server.";
146 
147  serversocket.reset(new JServerSocket(port,backlog));
148  }
149  catch(const std::exception& error) {
150  JErrorStream(logger) << error.what();
151  }
152  }
153 
154 
155  virtual void actionExit()
156  {
157  JDebugStream(logger) << "Stop server.";
158 
159  serversocket.reset();
160  datawriter .reset();
161  }
162 
163 
164  virtual void actionInit(int length, const char* buffer)
165  {
166  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
167  }
168 
169 
170  virtual void actionConfigure(int length, const char* buffer)
171  {
172 
173  // todo:
174 
175  // 3) receive and set site id
176 
177 
178  using namespace std;
179  using namespace JPP;
180 
181  JDebugStream(logger) << "actionConfigure() " << std::string(buffer,length);
182 
183  long long int logger_s = 5;
184 
185  parameters .reset();
186 
187  dataFilters.clear();
188  dataQueues .clear();
189 
190  reporting = false;
191 
192  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
193 
194  properties["dataWriter"] = hostname;
195  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
196  properties["detector"] = detector;
197  properties["triggerParameters"] = parameters;
198  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
199  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
200  properties["logger_s"] = logger_s;
201  properties["JDataFilter"] = dataFilters;
202  properties["DataQueue"] = dataQueues;
203  properties["pmtIdCheck"] = checkForInvalidPMTs = false;
204  properties["maxHitsFrame"] = maxHitsFrame = 31*2000; // default is 20 kHz
205 
206  try {
207  properties.read(string(buffer, length));
208  }
209  catch(const exception& error) {
210  JErrorStream(logger) << error.what();
211  }
212 
213  hostname = trim(hostname);
214 
215  if (hostname != "")
216  datawriter.reset(new JControlHost_t(hostname)); // may throw an error
217  else
218  throw JException("Undefined data writer host name.");
219 
220  maximum_frames_per_slice = frames_per_slice;
221 
222  // process processlist
223 
224  if(dataFilters.empty()) {
225  JNoticeStream(logger) << "No DataFilters in process list, or no process list. Assuming that this process is the only process on this CPU and setting parameters accordingly.";
226  }
227 
228 
229  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
230 
231  unsigned int numberOfDataFiltersOnThisMachine = 0;
232  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
233 
234  for(vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
235 
236  if(i->hostname == getHostname() ||
237  i->hostname == getIPaddress()) {
238 
239  numberOfDataFiltersOnThisMachine++;
240 
241  if (i->port == this->port) {
242  thisProcess = i;
243  }
244  }
245  }
246 
247  if (numberOfDataFiltersOnThisMachine == 0) {
248  JNoticeStream(logger) << "Zero datafilters on this machine according to process list (if it exists). Assuming one datafilter on this machine.";
249  numberOfDataFiltersOnThisMachine = 1;
250  }
251 
252  if (thisProcess == dataFilters.end()) {
253  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
254  }
255 
256  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
257  JErrorStream(logger) << "Mismatch between given process names: "
258  << "I am called " << getName() << ", but in the process list I am referred to as " << thisProcess->index;
259  }
260 
261  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
262  reporting = true;
263  }
264 
265  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
266 
267  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
268 
269  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. Queue size reduced to "
270  << maxQueueSize << " bytes." ;
271  }
272 
273  // trigger parameters
274 
275  parameters.set(getMaximalDistance(detector));
276 
277  trigger3DMuon .reset(new JTrigger3DMuon (parameters));
278  trigger3DShower.reset(new JTrigger3DShower(parameters));
279  triggerMXShower.reset(new JTriggerMXShower(parameters, detector));
280 
281  moduleRouter.reset(new JModuleRouter(detector));
282 
283  if (reporting) {
284  JNoticeStream(logger) << "This JDataFilter process will report.";
285  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
286  JNoticeStream(logger) << "Trigger parameters: " << parameters;
287  JNoticeStream(logger) << "Detector description: " << endl << detector;
288  JNoticeStream(logger) << "Clock interval: " << getClockInterval();
289  }
290 
291  timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
292 
293  // set L1, L2 and SN builders
294 
295  buildL1.reset(new JBuildL1_t(parameters));
296  buildL2.reset(new JBuildL2_t(parameters.L2));
297  buildSN.reset(new JBuildL2_t(parameters.SN));
298 
299  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
300  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
301  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
302 
303  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
304  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
305  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
306  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
307  logErrorInvalidPMTs = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
308  }
309 
310 
311  virtual void actionStart(int length, const char* buffer)
312  {
313  if (reporting) {
314  JNoticeStream(logger) << "Start run " << getRunNumber();
315  }
316 
317  timeslices.clear();
318 
319  current_slice_index = -1;
320  queueSize = 0;
321 
322  numberOfEvents = 0;
323  numberOfBytes = 0;
324  numberOfTimeslicesProcessed = 0;
325  numberOfIncompleteTimeslicesProcessed = 0;
326 
327  numberOfFramesWithInvalidPMTs = 0;
328 
329  number_of_packets_received = 0;
330  number_of_packets_discarded = 0;
331  number_of_bytes_received = 0;
332  number_of_reads = 0;
333 
334  minFrameNumber = numeric_limits<int>::max();
335  maxFrameNumber = numeric_limits<int>::min();
336 
337  // Reset global trigger counter.
338 
340 
341  logErrorRun .reset();
342  logErrorDetector .reset();
343  logErrorIndex .reset();
344  logErrorIncomplete .reset();
345  logErrorInvalidPMTs.reset();
346 
347  timer.reset();
348  timer.start();
349 
350  // send trigger parameters to the datawriter
351 
352  ostringstream os;
353 
354  os << getRunNumber() << ' ' << parameters;
355 
356  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
357  }
358 
359 
360  virtual void actionPause(int length, const char* buffer)
361  {
362  if (!timeslices.empty()) {
363 
364  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
365 
366  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.begin(); ++i) {
367  queueSize -= i->getSize();
368  }
369 
370  timeslices.clear();
371  }
372 
373  if (queueSize != 0) {
374  JWarningStream(logger) << "Pending data in queue [B] " << queueSize;
375  }
376 
377  current_slice_index = -1;
378  queueSize = 0;
379 
380  timer.stop();
381  }
382 
383 
384  virtual void actionContinue(int length, const char* buffer)
385  {
386  timer.start();
387  }
388 
389 
390  virtual void actionStop(int length, const char* buffer)
391  {
392  typeout();
393  }
394 
395 
396  virtual void actionReset(int length, const char* buffer)
397  {}
398 
399 
400  virtual void actionQuit(int length, const char* buffer)
401  {
402  datawriter.reset();
403  }
404 
405 
406  virtual void setSelect(JFileDescriptorMask& mask) const
407  {
408  if (serversocket.is_valid()) {
409  mask.set(*serversocket);
410  }
411 
412  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
413  if (!channel->isReady()) {
414  mask.set(channel->getFileDescriptor());
415  }
416  }
417  }
418 
419 
420  virtual void actionSelect(const JFileDescriptorMask& mask)
421  {
422  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
423 
424  try {
425 
426  if (mask.has(channel->getFileDescriptor())) {
427  channel->read();
428  }
429 
430  if (channel->isReady()) {
431 
432  number_of_packets_received += 1;
433  number_of_reads += channel->getCounter();
434  number_of_bytes_received += channel->size();
435 
436  if (isRunning()) {
437 
438  updateFrameQueue(channel);
439 
440  } else {
441 
442  JErrorStream(logErrorRun) << "Receiving data while not running.";
443 
444  number_of_packets_discarded += 1;
445  }
446 
447  channel->reset();
448  }
449 
450  ++channel;
451  }
452  catch(const JSocketException& error) {
453 
454  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
455 
456  channel->shutdown();
457 
458  channel = channelList.erase(channel);
459  }
460  }
461 
462 
463  if (serversocket.is_valid()) {
464 
465  if (mask.has(*serversocket)) {
466 
467  JSocket socket;
468 
469  socket.accept(serversocket->getFileDescriptor());
470 
471  //socket.setSendBufferSize (buffer_size);
473 
474  socket.setKeepAlive (true);
475  socket.setNonBlocking(true);
476 
477  JNoticeStream(logger) <<"New channel" << '[' << socket.getFileDescriptor() << ']';
478 
479  channelList.push_back(JSocketInputChannel_t(socket));
480  }
481  }
482 
483 
484  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
485  timeslices.size() >= maxQueueDepth ||
486  queueSize >= maxQueueSize)) {
487 
488  JDAQTimesliceL0 pending_slice = timeslices.front();
489  queueSize -= pending_slice.getSize();
490  timeslices.pop_front();
491 
492  current_slice_index = pending_slice.getFrameIndex();
493  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
494  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
495 
496  if(pending_slice.size()>frames_per_slice) {
497 
498  JErrorStream(logger) << "More frames in timeslice than expected : " << pending_slice.size();
499 
500  if(pending_slice.size()<=maximum_frames_per_slice) {
501 
502  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
503 
504  frames_per_slice = pending_slice.size();
505  }
506  }
507 
508  if (!pending_slice.empty()) {
509 
510  processTimeSlice(pending_slice);
511 
512  numberOfTimeslicesProcessed += 1;
513 
514  if (pending_slice.size() < frames_per_slice) {
515  numberOfIncompleteTimeslicesProcessed += 1;
516  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice "
517  << "Timeslice frameindex " << pending_slice.getFrameIndex() << ' '
518  << "Size of timeslice " << pending_slice.size() << ' '
519  << "Queue depth " << timeslices.size() << ' '
520  << "Queue size " << queueSize;
521 
522  if(!timeslices.empty()) {
523  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
524  frames_per_slice = pending_slice.size();
525  }
526  }
527  }
528  }
529  }
530 
531 
532  virtual void actionRunning()
533  {
534  if (reporting) {
535  typeout();
536  }
537  }
538 
539 
540  void updateFrameQueue(const JChannelList_t::const_iterator channel)
541  {
542  JIO::JByteArrayReader in(channel->data(), channel->size());
543 
544  JDAQPreamble preamble;
545  JDAQSuperFrameHeader header;
546 
547  in >> preamble;
548  in >> header;
549 
550  if(preamble.getLength()!=channel->size()) {
551 
552  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble."
553  << " preamble.getLength(): " << preamble.getLength()
554  << " channel->size(): " << channel->size() ;
555  number_of_packets_discarded += 1;
556 
557  return;
558  }
559 
560  if (header.getRunNumber() != getRunNumber()) {
561 
562  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber() << " unequal to current run " << getRunNumber() << " -> Dropping frame.";
563 
564  number_of_packets_discarded += 1;
565 
566  return;
567  }
568 
569  if (header.getFrameIndex() <= current_slice_index) {
570 
571  JErrorStream(logErrorIndex) << "FrameIndex " << header.getFrameIndex() <<" already processed, dropping frame.";
572 
573  number_of_packets_discarded += 1;
574 
575  if(frames_per_slice<maximum_frames_per_slice) {
576 
577  frames_per_slice++;
578 
579  JErrorStream(logErrorIndex) << "Increased number of frames expected to: " << frames_per_slice;
580  }
581 
582  return;
583  }
584 
585  list<JDAQTimesliceL0>::iterator timesliceIterator;
586 
587  for (timesliceIterator = timeslices.begin();
588  timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex();
589  ++timesliceIterator) ;
590 
591  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
592 
593  } else {
594 
595  // This is the first frame of its slice; insert a new slice at the right position in the list
596 
597  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
598  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
599 
600  queueSize += timesliceIterator->getSize();
601  }
602 
603 
604  if (checkForInvalidPMTs) {
605 
606  JDAQSuperFrame frame(header);
607 
608  in >> static_cast<JDAQFrame&> (frame);
609 
610  if(frame.size()>maxHitsFrame) {
611  JErrorStream(logger) << "Frame size exceeds limit. Size: " << frame.size()<< " Frame discarded.";
612  return;
613  }
614  JDAQSuperFrame::const_iterator hit = frame.begin() ;
615 
616  // std::vector<unsigned int> prevHitT(30,0);
617 
618  for (; hit!=frame.end(); ++hit) {
619 
620  if(hit->getPMT()>30) //||
621  // hit->getT()<prevHitT[hit->getPMT()])
622  {
623  break;
624  }
625 
626  // prevHitT[hit->getPMT()]=hit->getT();
627  }
628 
629  if(hit!=frame.end()) {
630  numberOfFramesWithInvalidPMTs +=1;
631  JErrorStream(logErrorInvalidPMTs)<< "Frame with invalid PMT id or wrong time ordering discarded. Total discarded: "<< numberOfFramesWithInvalidPMTs;
632  }
633  else
634  {
635 
636 
637  timesliceIterator->push_back(frame);
638  queueSize += timesliceIterator->rbegin()->getSize();
639  }
640  }
641  else {
642 
643 
644  timesliceIterator->push_back(JDAQSuperFrame(header));
645 
646  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
647  queueSize += timesliceIterator->rbegin()->getSize();
648  }
649 
650  }
651 
652 
653  void processTimeSlice(const JDAQTimesliceL0& pending_slice)
654  {
655  //JDebugStream(logger) << "Processing slice: "
656  // << pending_slice.getRunNumber() << " "
657  // << pending_slice.getFrameIndex() << " "
658  // << pending_slice.size();
659 
660  try {
661 
662  if (parameters.writeSummary()) {
663 
664  try {
665 
666  JDAQSummaryslice object(pending_slice);
667 
668  datawriter->put(object);
669 
670  numberOfBytes += object.getSize();
671  }
672  catch(const JControlHostException& error) {
673  JErrorStream(logger) << error;
674  }
675  }
676 
677  timesliceRouter->configure(pending_slice);
678 
679  if (parameters.trigger3DMuon.enabled ||
680  parameters.trigger3DShower.enabled ||
681  parameters.triggerMXShower.enabled ||
682  parameters.writeL1.prescale ||
683  parameters.writeL2.prescale ||
684  parameters.writeSN.prescale) {
685 
686 
687  JTimeslice_t timesliceL0(pending_slice.getDAQChronometer());
688  JTimeslice_t timesliceL1(pending_slice.getDAQChronometer());
689  JTimeslice_t timesliceL2(pending_slice.getDAQChronometer());
690  JTimeslice_t timesliceSN(pending_slice.getDAQChronometer());
691 
692  for (JDAQTimesliceL0::const_iterator super_frame = pending_slice.begin(); super_frame != pending_slice.end(); ++super_frame) {
693 
694  if (moduleRouter->hasModule(super_frame->getModuleID())) {
695 
696  const JModule& module = moduleRouter->getModule(super_frame->getModuleID());
697  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*super_frame, module);
698 
699  // Apply high-rate veto
700 
701  buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
702 
703  // L0
704 
705  timesliceL0.push_back(JSuperFrame1D_t(buffer));
706 
707  // L1
708 
709 
710  timesliceL1.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
711  super_frame->getModuleIdentifier(),
712  module.getPosition()));
713 
714  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
715 
716  // L2
717 
718  timesliceL2.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
719  super_frame->getModuleIdentifier(),
720  module.getPosition()));
721 
722  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
723 
724  // SN
725 
726  timesliceSN.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
727  super_frame->getModuleIdentifier(),
728  module.getPosition()));
729 
730  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
731 
732  } else {
733 
734  JErrorStream(logErrorDetector) << "No detector information for module " << super_frame->getModuleID();
735  }
736  }
737 
738  // Trigger
739 
740  JTriggerInput trigger_input(timesliceL2);
741  JTriggerOutput trigger_output;
742 
743  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
744  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
745  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
746 
747  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
748 
749  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
750 
751  try {
752 
753  JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
754 
755  datawriter->put(object);
756 
757  numberOfEvents += 1;
758  numberOfBytes += object.getSize();
759  }
760  catch(const JControlHostException& error) {
761  JErrorStream(logger) << error;
762  }
763  }
764 
765  if (parameters.writeL1()) {
766 
767  try {
768 
769  JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
770 
771  datawriter->put(object);
772 
773  numberOfBytes += object.getSize();
774  }
775  catch(const JControlHostException& error) {
776  JErrorStream(logger) << error;
777  }
778  }
779 
780  if (parameters.writeL2()) {
781 
782  try {
783 
784  JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
785 
786  datawriter->put(object);
787 
788  numberOfBytes += object.getSize();
789  }
790  catch(const JControlHostException& error) {
791  JErrorStream(logger) << error;
792  }
793  }
794 
795  if (parameters.writeSN()) {
796 
797  try {
798 
799  JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
800 
801  datawriter->put(object);
802 
803  numberOfBytes += object.getSize();
804  }
805  catch(const JControlHostException& error) {
806  JErrorStream(logger) << error;
807  }
808  }
809  }
810 
811  if (parameters.writeTimeslices() || parameters.writeL0()) { // backward compatible, remove later
812 
813  try {
814 
815  datawriter->put(pending_slice);
816 
817  numberOfBytes += pending_slice.getSize();
818  }
819  catch(const JControlHostException& error) {
820  JErrorStream(logger)<<error;
821  }
822  }
823 
824  } catch(const JTriggerException& error) {
825  JErrorStream(logger) << error.what();
826  JErrorStream(logger) << "Run: " << pending_slice.getRunNumber() << " frame index: " << pending_slice.getFrameIndex() << " not processed!";
827  }
828  }
829 
830 
831  void typeout()
832  {
833  timer.stop();
834 
835  const double T_us = (double) timer.usec_wall;
836 
837  JNoticeStream(logger) << "Reporting statistics for this datafilter.";
838  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
839  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
840  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
841  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
842  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
843 
844  if (number_of_packets_received > 0) { JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received; }
845 
846  JNoticeStream(logger) << "Current queue depth " << timeslices.size();
847  JNoticeStream(logger) << "Current queue size " << queueSize;
848  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
849 
850  //if (T_us > 0) { JNoticeStream(logger) << "Packet rate [kHz] " << number_of_packets_received * 1000 / T_us; }
851  //if (T_us > 0) { JNoticeStream(logger) << "Data rate [MB/s] " << number_of_bytes_received / T_us; }
852  //if (T_us > 0) { JNoticeStream(logger) << "Event rate [Hz] " << numberOfEvents * 1e6 / T_us; }
853  //if (T_us > 0) { JNoticeStream(logger) << "Output data rate [MB/s] " << numberOfBytes / T_us; }
854 
855  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
856  JNoticeStream(logger) << "Number of frames with invalid PMTs dropped " << numberOfFramesWithInvalidPMTs;
857 
858  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed; }
859  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed; }
860  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed; }
861 
862  const double processedSlicesTimeMus = numberOfTimeslicesProcessed * getFrameTime() / 1000;
863  const double processedDetectorTimeMus = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
864 
865  if (processedSlicesTimeMus > 0) { JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTimeMus; }
866  if (processedDetectorTimeMus > 0) { JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTimeMus; }
867 
868  timer.start();
869  }
870 
871 
872 private:
873 
875  JSinglePointer<JControlHost_t> datawriter; /// controlhost to send data to the datawriter
876 
877  std::string hostname; //!< host name of data server
878  int port; /// serversocket port
879  int backlog;
881 
884 
885  JTimer timer;
886 
889 
890  unsigned int frames_per_slice;
892 
893  // trigger
894 
898 
904 
908 
914 
915  // process management
918 
919  // memory management
920 
921  long long int totalCPURAM;
922  unsigned int maxQueueDepth;
923  long long int maxQueueSize;
924  long long int queueSize;
925 
926  // statistics
927 
928  bool reporting;
929 
930  long long int numberOfEvents;
931  long long int numberOfBytes;
935 
938 
939  // temporary:
940 
943  long long int number_of_reads;
945 
948 };
949 
950 
951 /**
952  * \file
953  *
954  * Application for real-time filtering of data.
955  * \author rbruijn
956  */
957 int main(int argc, char* argv[])
958 {
959  using namespace std;
960 
961  string server;
962  string logger;
963  string hostname;
964  string client_name;
965  int port;
966  int backlog;
967  int buffer_size;
968  bool use_cout;
969  int debug;
970 
971 
972  try {
973 
974  JParser<> zap("Application for real-time filtering of data.");
975 
976  zap['H'] = make_field(server) = "localhost";
977  zap['M'] = make_field(logger) = "localhost";
978  zap['D'] = make_field(hostname) = "";
979  zap['u'] = make_field(client_name) = "JDataFilter";
980  zap['P'] = make_field(port);
981  zap['q'] = make_field(backlog) = 1024;
982  zap['s'] = make_field(buffer_size) = 8*1024*1024; // TCP buffer of 8 MB
983  zap['c'] = make_field(use_cout);
984  zap['d'] = make_field(debug) = 0;
985 
986  zap(argc, argv);
987  }
988  catch(const exception& error) {
989  FATAL(error.what() << endl);
990  }
991 
992 
993  JLogger* out = NULL;
994 
995  if (use_cout)
996  out = new JStreamLogger(cout);
997  else
998  out = new JControlHostLogger(logger);
999 
1000  JDataFilter dfilter(client_name,
1001  server,
1002  hostname,
1003  out,
1004  debug,
1005  port,
1006  backlog,
1007  buffer_size);
1008 
1009  dfilter.enter();
1010  dfilter.run();
1011 }
Message logger with time scheduler.
int current_slice_index
Definition: JDataFilter.cc:888
Utility class to parse command line options.
Definition: JParser.hh:1410
General exception.
Definition: JException.hh:40
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Definition: JDataFilter.cc:81
virtual const char * what() const
Get error message.
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:665
Data structure for all trigger parameters.
int maxFrameNumber
Definition: JDataFilter.cc:937
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataFilter.cc:532
virtual void actionContinue(int length, const char *buffer)
Definition: JDataFilter.cc:384
Data structure for a composite optical module.
Definition: JModule.hh:47
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:96
std::string hostname
controlhost to send data to the datawriter
Definition: JDataFilter.cc:877
long long int numberOfFramesWithInvalidPMTs
Definition: JDataFilter.cc:934
Message logging based on std::ostream.
int minFrameNumber
Definition: JDataFilter.cc:936
Set of triggered events.
Detector data structure.
Definition: JDetector.hh:77
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:50
virtual void actionExit()
Definition: JDataFilter.cc:155
long long int numberOfEvents
Definition: JDataFilter.cc:930
Router for direct addressing of module data in detector data structure.
JDetector detector
Definition: JDataFilter.cc:899
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
long long int number_of_packets_received
Definition: JDataFilter.cc:941
Utility class to parse parameter values.
Definition: JProperties.hh:484
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
std::string getIPaddress(const int ip)
Get IP address (decimal-dot notation).
Definition: JNetwork.hh:150
JChannelList_t channelList
Definition: JDataFilter.cc:883
TCP Server socket.
std::vector< JSocketInputChannel_t > JChannelList_t
Definition: JDataFilter.cc:88
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:910
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:66
JSinglePointer< JBuildL1_t > buildL1
Definition: JDataFilter.cc:901
Socket class.
Definition: JSocket.hh:42
JSuperFrame2D< hit_type > JSuperFrame2D_t
Definition: JDataFilter.cc:93
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:493
virtual void actionStop(int length, const char *buffer)
Definition: JDataFilter.cc:390
JPMT_t getPMT() const
Get PMT.
Definition: JDAQHit.hh:74
vector< JDAQProcess > dataFilters
Definition: JDataFilter.cc:916
int getRunNumber() const
Get run number.
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
static const long long int GIGABYTE
Number of bytes in a megabyte.
Definition: JConstants.hh:81
JSinglePointer< JBuildL2_t > buildL2
Definition: JDataFilter.cc:902
Tools for handling different hit types.
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
Definition: JDataFilter.cc:874
Utility class to parse parameter values.
vector< JDAQProcess > dataQueues
Definition: JDataFilter.cc:917
unsigned int maxQueueDepth
Definition: JDataFilter.cc:922
int getFrameIndex() const
Get frame index.
virtual void actionQuit(int length, const char *buffer)
Definition: JDataFilter.cc:400
1-dimensional frame with time calibrated data from one optical module.
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:932
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e.
Basic data structure for L0 hit.
std::string index
index in process list
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:95
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
Basic data structure for time and time over threshold information of hit.
The template JSinglePointer class can be used to hold a pointer to an object.
JTimer timer
Definition: JDataFilter.cc:885
int getLength() const
Get length.
virtual void actionSelect(const JFileDescriptorMask &mask)
Definition: JDataFilter.cc:420
int getFileDescriptor() const
Get file descriptor.
JNET::JSocketInputChannel< KM3NETDAQ::JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:87
Constants.
Scheduling of actions via fixed latency intervals.
Template L2 builder.
Definition: JBuildL2.hh:47
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
virtual void actionInit(int length, const char *buffer)
Definition: JDataFilter.cc:164
Detector file.
Definition: JHead.hh:126
JMessageScheduler logErrorInvalidPMTs
Definition: JDataFilter.cc:913
Auxiliary class for itemization of process list.
JSinglePointer< JTriggerMXShower > triggerMXShower
Definition: JDataFilter.cc:907
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
Hit data structure.
Definition: JDAQHit.hh:36
int backlog
serversocket port
Definition: JDataFilter.cc:879
JSinglePointer< JBuildL2_t > buildSN
Definition: JDataFilter.cc:903
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
Byte array binary input.
Definition: JByteArrayIO.hh:25
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:911
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
const_iterator begin() const
Definition: JDAQFrame.hh:139
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
static const size_t buffer_size
virtual void actionConfigure(int length, const char *buffer)
Definition: JDataFilter.cc:170
JTriggerParameters parameters
Definition: JDataFilter.cc:895
Data frame.
Definition: JDAQFrame.hh:70
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQEvent for a triggered event.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Definition: JDataFilter.cc:540
unsigned int frames_per_slice
Definition: JDataFilter.cc:890
void typeout()
Definition: JDataFilter.cc:831
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:432
int debug
debug level
Definition: JSirene.cc:59
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:129
Simple datastructure for the DAQ preamble without ROOT functionality.
JTimeslice< hit_type > JTimeslice_t
Definition: JDataFilter.cc:94
virtual void actionReset(int length, const char *buffer)
Definition: JDataFilter.cc:396
Socket input channel.
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Match of two events considering overlap in time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
std::string getHostname()
Get host name.
Definition: JNetwork.hh:75
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:909
Template L1 hit builder.
Definition: JBuildL1.hh:76
#define FATAL(A)
Definition: JMessage.hh:65
virtual void actionPause(int length, const char *buffer)
Definition: JDataFilter.cc:360
virtual void setSelect(JFileDescriptorMask &mask) const
Definition: JDataFilter.cc:406
vector< hit_type > JFrameL1_t
Definition: JDataFilter.cc:91
JSinglePointer< JTimesliceRouter > timesliceRouter
Definition: JDataFilter.cc:897
JSinglePointer< JTrigger3DShower > trigger3DShower
Definition: JDataFilter.cc:906
long long int totalCPURAM
Definition: JDataFilter.cc:921
virtual void actionStart(int length, const char *buffer)
Definition: JDataFilter.cc:311
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
Run control client base class.
Definition: JDAQClient.hh:88
virtual int getSize() const
Get size of object.
void merge(const JMatch_t &match)
Merge events.
Utility class to parse command line options.
Implemenation of object output through ControlHost.
long long int queueSize
Definition: JDataFilter.cc:924
bool processIndexSorter(const JDAQProcess &a, const JDAQProcess &b)
Definition: JDataFilter.cc:73
JSinglePointer< JModuleRouter > moduleRouter
Definition: JDataFilter.cc:900
unsigned long long int getRAM()
Get RAM of this CPU.
long long int maxQueueSize
Definition: JDataFilter.cc:923
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size)
Constructor.
Definition: JDataFilter.cc:115
ROOT TTree parameter settings.
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:933
Exception for socket.
Definition: JException.hh:414
long long int number_of_reads
Definition: JDataFilter.cc:943
System calls via shell interactions.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:143
2-dimensional frame with time calibrated data from one optical module.
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:912
const char * getName()
Get ROOT name of given data type.
Data structure for input to trigger algorithm.
Fixed parameters for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
bool checkForInvalidPMTs
Definition: JDataFilter.cc:946
string interfaceInput
Definition: JDataFilter.cc:896
virtual void actionEnter()
Definition: JDataFilter.cc:141
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
void processTimeSlice(const JDAQTimesliceL0 &pending_slice)
Definition: JDataFilter.cc:653
Setting of trigger bits.
long long int number_of_packets_discarded
Definition: JDataFilter.cc:942
virtual const char * what() const
Get error message.
Definition: JException.hh:65
KM3NeT DAQ constants, bit handling, etc.
JSinglePointer< JServerSocket > serversocket
Definition: JDataFilter.cc:882
int size() const
Definition: JDAQFrame.hh:157
Hostname and IP address functions.
Data frame of one optical module.
JSinglePointer< JTrigger3DMuon > trigger3DMuon
Definition: JDataFilter.cc:905
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:891
long long int number_of_bytes_received
Definition: JDataFilter.cc:944
double hit_type
Definition: JDataFilter.cc:89
Time slice with calibrated data.
Definition: JTimeslice.hh:26
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:92
const_iterator end() const
Definition: JDAQFrame.hh:140
long long int numberOfBytes
Definition: JDataFilter.cc:931
int main(int argc, char *argv[])
Definition: Main.cpp:15