Jpp test-rotations-new
the software that should make you happy
Loading...
Searching...
No Matches
JDataFilter.cc
Go to the documentation of this file.
1
2#include <string>
3#include <iostream>
4#include <iomanip>
5#include <deque>
6#include <limits>
7#include <algorithm>
8#include <unistd.h>
9
11#include "JLang/JLangToolkit.hh"
12#include "JLang/JException.hh"
13#include "JLang/JVectorize.hh"
14#include "Jeep/JParser.hh"
15#include "Jeep/JProperties.hh"
16#include "Jeep/JTimer.hh"
17#include "Jeep/JTimekeeper.hh"
18#include "Jeep/JMessage.hh"
19#include "Jeep/JPrint.hh"
20#include "Jeep/JeepToolkit.hh"
21#include "Jeep/JStatus.hh"
28#include "JDAQ/JDAQTags.hh"
29#include "JDAQ/JDAQEventIO.hh"
33#include "JTrigger/JHit.hh"
38#include "JTrigger/JHitL0.hh"
39#include "JTrigger/JHitL1.hh"
40#include "JTrigger/JBuildL1.hh"
41#include "JTrigger/JBuildL2.hh"
53#include "JTrigger/JChecksum.hh"
56#include "JNet/JControlHost.hh"
58#include "JNet/JTCPSocket.hh"
60#include "JNet/JServerSocket.hh"
62#include "JTools/JQuantile.hh"
63#include "JSupport/JSupport.hh"
65#include "JSupport/JMeta.hh"
66#include "JSystem/JStat.hh"
67#include "JSystem/JTime.hh"
69#include "JSystem/JNetwork.hh"
71
72
73namespace JNET {
74
75 /**
76 * Get size of packeet.
77 *
78 * \param preamble DAQ data preamble
79 * \return size [B]
80 */
81 template<>
83 {
84 return preamble.getLength();
85 }
86}
87
88namespace KM3NETDAQ {
89
90 using namespace JPP;
91
92
93 /**
94 * Get expected number of frames according a given allowed fraction of active modules.
95 *
96 * \param number_of_frames number of frames
97 * \param factor factor
98 * \return number of frames
99 */
100 inline size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
101 {
102 if (factor <= 0.0) {
103 return 1;
104 }
105
106 if (factor >= 1.0) {
107 return number_of_frames;
108 }
109
110 const size_t n = (size_t) (number_of_frames * factor);
111
112 return (n == 0 ? 1 : n);
113 }
114
115
116 /**
117 * Main class for real-time filtering of data.
118 *
119 * This class implements the action methods for each transition of the state machine.\n
120 * When the state machine is entered, data are continually collected using
121 * custom implementation of virtual methods
122 * setSelect and
123 * actionSelect.
124 *
125 * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
126 * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
127 * When a JDAQTimeslice is complete,
128 * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.
129 *
130 * The completeness of a time slice is defined by two parameters, namely
131 * <tt>frames_per_slice</tt> and
132 * <tt>factor_per_slice</tt>.\n
133 * These are parsed in method actionConfigure as follows.
134 * <pre>
135 * numberOfFramesPerSlice = <frames per slice> = 1;
136 * factorOfFramesPerSlice = <factor per slice> = 1.0;
137 * </pre>
138 * The expected number of frames in a time slices is defined by their product.\n
139 * Note that the first parameter can change during operation (see below).
140 *
141 * A timeout may occur when the total amount of data or the number of incomplete time slices
142 * in the queue exceeds one of the limits <tt>queueSize</tt> or <tt>queueDepth</tt>.\n
143 * These are parsed in method actionConfigure as follows.
144 * <pre>
145 * queueSize = <maximal queue size [B]>;
146 * queueDepth = <maximal queue depth>;
147 * </pre>
148 * Note that these values apply per JDataFilter.\n
149 *
150 * The parameter <tt>frames_per_slice</tt> is subject to a servo mechanism.\n
151 * When a timeout occurs,
152 * it is set to the number of frames in the oldest time slice.\n
153 * When data are received with a frame index below the current frame index,
154 * it is incremented by one.
155 *
156 * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
157 * The following parameters can be used to configure the circular buffer.\n
158 * <pre>
159 * path = <write directory for temporary circular buffer>;
160 * archive = <write directory for archival of circular buffer>;
161 * c_sizeL0 = <L0 buffer size>;
162 * c_sizeL1 = <L1 buffer size>;
163 * c_sizeL2 = <L2 buffer size>;
164 * c_sizeSN = <SN buffer size>;
165 * </pre>
166 *
167 * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
168 * - There will always be a file created which is deleted when the state machine is exited;
169 * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
170 * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
171 * - After archival of the temporary file, a new temporary file will be opened;
172 * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
173 * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
174 *
175 * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
176 * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
177 * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
178 * (e.g. when there is more than one alert on a given day).
179 *
180 * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
181 *
182 * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
183 */
185 public JDAQClient
186 {
187 public:
188
191
192 typedef double hit_type;
195 typedef JTimeslice <hit_type> JTimeslice_t;
196 typedef JBuildL1 <hit_type> JBuildL1_t;
197 typedef JBuildL2 <hit_type> JBuildL2_t;
198
199
200
201 /**
202 * Circular buffer.
203 */
205 public JTreeRecorder<JDAQTimesliceTypes_t>
206 {
209
210
211 static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
212
213
214 /**
215 * Constructor.
216 *
217 * \param path directory for temporary storage
218 * \param archive directory for permanent archival
219 * \param tag tag
220 */
221 JCircularBuffer_t(const std::string& path,
222 const std::string& archive,
223 const JTag& tag) :
224 path (path),
226 tag (tag)
227 {
228 disable();
229 }
230
231
232 /**
233 * Open file.
234 *
235 * If file with same name exists, remove it beforehand.
236 */
237 void open()
238 {
239 using namespace std;
240 using namespace JPP;
241
242 gErrorIgnoreLevel = kFatal;
243
244 std::ostringstream os;
245
246 os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
247
248 if (getFileStatus(os.str().c_str())) {
249 std::remove(os.str().c_str());
250 }
251
252 this->open(os.str().c_str());
253 }
254
255
256 /**
257 * Close file.
258 *
259 * If option is true, archive file; else delete file.
260 *
261 * \param option option
262 */
263 void close(const bool option)
264 {
265 using namespace std;
266 using namespace JPP;
267
268 const JDateAndTime cal;
269
270 if (this->is_open()) {
271
272 const string file_name = this->getFile()->GetName();
273
274 this->close();
275
276 if (option) {
277
278 for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
279
280 ostringstream os;
281
282 os << getFullPath(this->archive)
283 << "KM3NeT"
284 << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
285 << "_" << this->tag;
286
287 if (i != 0) {
288 os << "_" << i;
289 }
290
291 os << ".root";
292
293 if (!getFileStatus(os.str().c_str())) {
294
295 if (JSYSTEM::rename(file_name, os.str()) == 0)
296 return;
297 else
298 THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
299 }
300 }
301
302 } else {
303
304 std::remove(file_name.c_str());
305 }
306 }
307 }
308
309
310 /**
311 * Disable writing.
312 */
313 void disable()
314 {
315 sizeL0 = 0;
316 sizeL1 = 0;
317 sizeL2 = 0;
318 sizeSN = 0;
319 }
320
321
322 /**
323 * Check whether writing of data is enabled.
324 *
325 * \return true if writing enabled; else false
326 */
327 bool is_enabled() const
328 {
329 return (sizeL0 > 0 ||
330 sizeL1 > 0 ||
331 sizeL2 > 0 ||
332 sizeSN > 0);
333 }
334
335
336 /**
337 * Write circular buffer to output stream.
338 *
339 * \param out output stream
340 * \param object circular buffer
341 * \return output stream
342 */
343 friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
344 {
345 if (object.is_open())
346 out << object.getFile()->GetName();
347 else
348 out << "void";
349
350 out << ' ';
351
352 out << object.sizeL0 << '/'
353 << object.sizeL1 << '/'
354 << object.sizeL2 << '/'
355 << object.sizeSN << '/';
356
357 return out;
358 }
359
360 Long64_t sizeL0; //!< Number of L0 time slices
361 Long64_t sizeL1; //!< Number of L1 time slices
362 Long64_t sizeL2; //!< Number of L2 time slices
363 Long64_t sizeSN; //!< Number of SN time slices
364
365 std::string path; //!< Directory for temporary storage
366 std::string archive; //!< Directory for permanent archival
367 JTag tag; //!< Unique tag of this process
368 };
369
370
371 /**
372 * Sort DAQ process by index.
373 *
374 * \param first first DAQ process
375 * \param second second DAQ process
376 * \return true if index of first DAQ process less than that of second; else false
377 */
378 static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
379 {
380 return first.index < second.index;
381 }
382
383
384 /**
385 * Constructor.
386 *
387 * \param name name of client
388 * \param server name of command message server
389 * \param hostname name of data server
390 * \param logger pointer to logger
391 * \param level debug level
392 * \param port server port
393 * \param backlog server backlog
394 * \param path directory for temporary storage
395 * \param archive directory for parmanent archival
396 */
397 JDataFilter(const std::string& name,
398 const std::string& server,
399 const std::string& hostname,
401 const int level,
402 const int port,
403 const int backlog,
404 const std::string& path,
405 const std::string& archive) :
408 port (port),
410 c_buffer (path, archive, getUniqueTag())
411 {
412 replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
413
415
416 totalCPURAM = getRAM();
418 dataqueue_slice_index.clear();
419 reporting = false;
420
421 this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
422 }
423
424
425 virtual void actionEnter() override
426 {}
427
428
429 virtual void actionExit() override
430 {
431 if (c_buffer.is_open()) {
432
433 JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
434
435 c_buffer.close(false);
436 }
437
438 datawriter.reset();
439 }
440
441
442 virtual void actionInit(int length, const char* buffer) override
443 {
444 JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
445
446 try {
447
448 JDebugStream(logger) << "Start server.";
449
450 if (serversocket.is_valid()) {
451 serversocket->shutdown();
452 }
453
455 }
456 catch(const std::exception& error) {
457 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
458 ev_error();
459 }
460 }
461
462
463 virtual void actionConfigure(int length, const char* buffer) override
464 {
465 using namespace std;
466
467 JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
468
469 string _hostname_ = "";
470
471 long long int update_s = 20;
472 long long int logger_s = 10;
473
474 parameters .reset();
475 dataFilters.clear();
476 dataQueues .clear();
477
478 reporting = false;
479 dumpCount = 0;
480 dumpLimit = numeric_limits<int>::max();
481 dumpMask = 0;
485 //dumpMask.set(JChecksum::EUDP_t);
487
488 detector.comment.clear();
489 detector.clear();
490
491 JProperties properties(JEquationParameters("=", ";", "", ""), 0);
492
493 properties["dataWriter"] = _hostname_;
494 properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
495 properties["factorOfFramesPerSlice"] = factor_per_slice = 1.0;
496 properties["detector"] = detector;
497 properties["triggerParameters"] = parameters;
498 properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
499 properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
500 properties["frameIndex"] = maximal_frame_index = 100000;
501 properties["logger_s"] = logger_s;
502 properties["update_s"] = update_s;
503 properties["JDataFilter"] = dataFilters;
504 properties["DataQueue"] = dataQueues;
505 properties["path"] = c_buffer.path;
506 properties["archive"] = c_buffer.archive;
507 properties["c_sizeL0"] = c_buffer.sizeL0;
508 properties["c_sizeL1"] = c_buffer.sizeL1;
509 properties["c_sizeL2"] = c_buffer.sizeL2;
510 properties["c_sizeSN"] = c_buffer.sizeSN;
511 properties["dumpLimit"] = dumpLimit;
512 properties["dumpMask"] = dumpMask;
513
514 try {
515 properties.read(string(buffer, length));
516 }
517 catch(const std::exception& error) {
518 JErrorStream(logger) << error.what();
519 }
520
521 if (update_s <= 0) { update_s = 20; }
522 if (logger_s <= 0) { logger_s = 10; }
523
524 setClockInterval(update_s * 1000000LL);
525
526 _hostname_ = trim(_hostname_);
527
528 if (_hostname_ != "" && _hostname_ != hostname) {
529
530 datawriter.reset();
531
532 hostname = _hostname_;
533 }
534
535 bool status = datawriter.is_valid();
536
537 if (status) {
538
539 try {
540 status = datawriter->Connected() == 0;
541 }
542 catch (const exception&) {
543 status = false;
544 }
545 }
546
547 if (!status) {
548
550
551 datawriter->MyId(getFullName());
552 }
553
554 datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
555
556 // process processlist
557
558 if (dataFilters.empty()) {
559 JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
560 << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
561 }
562
563 sort(dataFilters.begin(), dataFilters.end(), compare);
564
565 unsigned int numberOfDataFiltersOnThisMachine = 0;
566 vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
567
569
570 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
571
572 if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
573
574 numberOfDataFiltersOnThisMachine++;
575
576 if (i->port == this->port) {
577 thisProcess = i;
578 }
579 }
580 }
581
582 if (numberOfDataFiltersOnThisMachine == 0) {
583 JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
584 << "Assuming one datafilter on this machine.";
585 numberOfDataFiltersOnThisMachine = 1;
586 }
587
588 if (thisProcess == dataFilters.end()) {
589
590 JErrorStream error(logger);
591
592 error << "This process cannot be found in the process list. Why do I exist?";
593 error << " my IP addresses:";
594
595 for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
596 error << ' ' << *i;
597 }
598
599 error << " my port: " << this->port;
600 error << " process list";
601
602 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
603 error << ' ' << i->hostname << ':' << i->port;
604 }
605 }
606
607 if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
608 JErrorStream(logger) << "Mismatch between given process names: "
609 << "I am called " << getName()
610 << ", but in the process list I am referred to as " << thisProcess->index;
611 }
612
613 if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
614 reporting = true;
615 }
616
617 if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
618
619 maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
620
621 JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
622 << "Queue size reduced to "
623 << maxQueueSize << " bytes." ;
624 }
625
626 // detector
627
628 if (parameters.disableHighRateVeto) {
629
630 JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
631
632 detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
633 }
634
635 // trigger parameters
636
638
639 triggerNB .reset(new JTriggerNB (parameters));
643
645
646 if (reporting) {
647 JNoticeStream(logger) << "This data filter process will report.";
648 JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
649 JDebugStream (logger) << "Trigger parameters: " << parameters;
650 JDebugStream (logger) << "Detector description: " << endl << detector;
651 JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
652 }
653
654 timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
655
656 // set L1, L2 and SN builders
657
658 buildL1.reset(new JBuildL1_t(parameters));
659 buildL2.reset(new JBuildL2_t(parameters.L2));
660 buildSN.reset(new JBuildL2_t(parameters.SN));
661 buildNB.reset(new JBuildL2_t(parameters.NB));
662
663 if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
664 if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
665 if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
666 if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
667
668 logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
669 logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
670 logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
671 logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
673
674 if (c_buffer.is_enabled()) {
675
676 if (!c_buffer.is_open()) {
677
678 c_buffer.open();
679
680 if (c_buffer.is_open()) {
681
683
684 JStatusStream(logger) << "Created circular buffer " << c_buffer;
685
686 } else {
687
688 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
689 }
690
691 } else {
692
693 JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
694 }
695 }
696
697 if (c_buffer.is_open()) {
698 if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
699 if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
700 if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
701 if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
702 } else {
704 }
705 }
706
707
708 virtual void actionStart(int length, const char* buffer) override
709 {
710 using namespace std;
711
712 if (reporting) {
713 JNoticeStream(logger) << "Start run " << getRunNumber();
714 }
715
716 timeslices.clear();
717
719 dataqueue_slice_index.clear();
720 queueSize = 0;
721
722 numberOfEvents = 0;
723 numberOfBytes = 0;
726
730 number_of_reads = 0;
731
732 minFrameNumber = numeric_limits<int>::max();
733 maxFrameNumber = numeric_limits<int>::min();
734
735 // Reset global trigger counter.
736
738
744
745 timer.reset();
746 timer.start();
747
748 Qt.reset();
749 Qx.reset();
750
751 // send trigger parameters to the datawriter
752
753 ostringstream os;
754
755 os << getRunNumber() << ' ' << parameters;
756
757 datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
758 }
759
760
761 virtual void actionPause(int length, const char* buffer) override
762 {
763 using namespace std;
764
765 if (!timeslices.empty()) {
766
767 JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
768
769 for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
770 queueSize -= getSizeof(*i);
771 }
772
773 timeslices.clear();
774 }
775
776 { // force clearance of memory
777
778 deque<JDAQTimesliceL0> buffer;
779
780 timeslices.swap(buffer);
781 }
782
783 if (queueSize != 0) {
784 JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
785 }
786
788 dataqueue_slice_index.clear();
789 queueSize = 0;
790
791 timer.stop();
792 }
793
794
795 virtual void actionContinue(int length, const char* buffer) override
796 {
797 timer.start();
798 }
799
800
801 virtual void actionStop(int length, const char* buffer) override
802 {
803 typeout();
804 }
805
806
807 virtual void actionReset(int length, const char* buffer) override
808 {
809 if (serversocket.is_valid()) {
810 serversocket->shutdown();
811 }
812
813 serversocket.reset();
814 }
815
816
817 virtual void actionQuit(int length, const char* buffer) override
818 {
819 datawriter.reset();
820 }
821
822
823 virtual void setSelect(JFileDescriptorMask& mask) const override
824 {
825 if (serversocket.is_valid()) {
826 mask.set(*serversocket);
827 }
828
829 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
830 if (!channel->isReady()) {
831 mask.set(channel->getFileDescriptor());
832 }
833 }
834 }
835
836
837 virtual void actionSelect(const JFileDescriptorMask& mask) override
838 {
839 using namespace std;
840
841 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
842
843 try {
844
845 if (mask.has(channel->getFileDescriptor())) {
846 channel->read();
847 }
848
849 if (channel->isReady()) {
850
852 number_of_reads += channel->getCounter();
853 number_of_bytes_received += channel->size();
854
855 if (isRunning()) {
856
857 try {
858 updateFrameQueue(*channel);
859 }
860 catch(const std::exception& error) {
861
862 JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
863
865 }
866
867 } else {
868
869 JErrorStream(logErrorRun) << "Receiving data while not running.";
870
872 }
873
874 channel->reset();
875 }
876
877 ++channel;
878 }
879 catch(const std::exception& error) {
880
881 if (isRunning()) {
882 JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
883 }
884
885 channel->shutdown();
886
887 channel = channelList.erase(channel);
888 }
889 }
890
891
892 if (serversocket.is_valid()) {
893
894 if (mask.has(*serversocket)) {
895
896 JTCPSocket socket(serversocket->getFileDescriptor());
897
899
900 socket.setKeepAlive (true);
901 socket.setNonBlocking(false);
902
903 JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']' << ' ' << socket.getReceiveBufferSize();
904
905 channelList.push_back(JSocketInputChannel_t(socket));
906 }
907 }
908
909
910 if (!timeslices.empty()) {
911
912 const size_t number_of_frames = getNumberOfFrames(frames_per_slice, factor_per_slice);
913 const size_t maximum_in_queue = getMaximum(make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (size_t) 0);
914
915 if (((timeslices[0].size() >= number_of_frames && // normal
916 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
917
918 (maximum_in_queue >= number_of_frames && // intermittent problem
919 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
920
921 (timeslices.size() >= maxQueueDepth) || // timeout
922 (queueSize >= maxQueueSize))) {
923
924
925 const JDAQTimesliceL0& pending_slice = timeslices.front();
926 queueSize -= getSizeof(pending_slice);
927
928 current_slice_index = pending_slice.getFrameIndex();
929 minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
930 maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
931
932 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
933 modules.insert(i->getModuleID());
934 }
935
936
937 if (isRunning()) {
938
939 const localtime_t t0 = getLocalTime();
940
941 if (!pending_slice.empty()) {
942 processTimeSlice(pending_slice);
943 }
944
945 const localtime_t t1 = getLocalTime();
946
948
949 Qt.put(t1 - t0);
950
951 } else {
952
953 JErrorStream(logErrorRun) << "Skip processing of data while not running.";
954 }
955
956
957 if (modules.size() > frames_per_slice) {
958
959 JErrorStream(logErrorOvercomplete) << "More active modules than expected "
960 << modules.size() << " > " << frames_per_slice
961 << " adjusting frames per slice to " << modules.size();
962
963 frames_per_slice = modules.size();
964 }
965
966
967 if (pending_slice.size() < number_of_frames) {
968
970
971 ostringstream error;
972
973 error << "Timeout -> processed incomplete timeslice: "
974 << "Frame index = " << pending_slice.getFrameIndex() << ';'
975 << "Size of timeslice = " << pending_slice.size() << ';'
976 << "Queue depth = " << timeslices.size() << ';'
977 << "Queue size = " << queueSize << ';'
978 << "DataQueue min = " << dataqueue_slice_index.min() << ';'
979 << "DataQueue max = " << dataqueue_slice_index.max() << ';';
980
981 if (maximum_in_queue >= number_of_frames) {
982
983 error << " intermittent problem -> continues as-is";
984
985 } else {
986
987 modules.clear(); // remove history
988
989 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
990 modules.insert(i->getModuleID());
991 }
992
993 error << " adjusting frames per timeslice from " << frames_per_slice << " to " << modules.size();
994
995 frames_per_slice = modules.size();
996 }
997
998 JErrorStream(logErrorIncomplete) << error.str();
999 }
1000
1001
1002 timeslices.pop_front();
1003 }
1004 }
1005 }
1006
1007
1008 virtual void actionRunning() override
1009 {
1010 if (reporting) {
1011 typeout();
1012 }
1013 }
1014
1015
1016 /**
1017 * Update queue with data frames.
1018 *
1019 * Note that any discarded data will be reported.
1020 *
1021 * \param channel incoming data channel
1022 */
1024 {
1025 using namespace std;
1026
1027 JByteArrayReader in(channel.data(), channel.size());
1028
1029 JDAQPreamble preamble;
1030 JDAQSuperFrameHeader header;
1031
1032 in >> preamble;
1033 in >> header;
1034
1035 if (preamble.getLength() != channel.size()) {
1036
1037 JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
1038 << "preamble.getLength() = " << preamble.getLength() << ';'
1039 << "channel.size(): " << channel.size() << ';';
1040
1042
1043 return;
1044 }
1045
1046 if (header.getRunNumber() != getRunNumber()) {
1047
1048 JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
1049 << " != " << getRunNumber()
1050 << " -> Dropping frame.";
1051
1053
1054 return;
1055 }
1056
1057 if (header.getFrameIndex() <= current_slice_index) {
1058
1060
1061 if (modules.insert(header.getModuleID()).second) {
1062
1063 frames_per_slice = modules.size();
1064
1065 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
1066 << " module " << header.getModuleID()
1067 << " -> dropping frame;"
1068 << " increase number of frames expected to: " << frames_per_slice;
1069 }
1070
1071 return;
1072 }
1073
1075
1077
1078 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1079 << " module " << header.getModuleID()
1080 << " -> Dropping frame.";
1081
1082 return;
1083 }
1084
1085 deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1086
1087 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1088 ++timesliceIterator;
1089 }
1090
1091 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1092
1093 // The corresponding time slice already exists
1094
1095 } else {
1096
1097 // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1098
1099 timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1100
1101 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1102
1103 queueSize += getSizeof(*timesliceIterator);
1104 }
1105
1106 timesliceIterator->push_back(JDAQSuperFrame(header));
1107
1108 in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1109
1110 queueSize += getSizeof(*timesliceIterator->rbegin());
1111
1113 }
1114
1115
1116 /**
1117 * Process time slice.
1118 *
1119 * \param timeslice time slice
1120 */
1121 void processTimeSlice(const JDAQTimesliceL0& timeslice)
1122 {
1123 using namespace std;
1124
1125 try {
1126
1127 timesliceRouter->configure(timeslice);
1128
1129 if (parameters.writeSummary()) {
1130 this->put(JDAQSummaryslice(timeslice));
1131 }
1132
1133 if (parameters.trigger3DMuon.enabled ||
1134 parameters.trigger3DShower.enabled ||
1135 parameters.triggerMXShower.enabled ||
1136 parameters.triggerNB.enabled ||
1137 parameters.writeL0.prescale ||
1138 parameters.writeL1.prescale ||
1139 parameters.writeL2.prescale ||
1140 parameters.writeSN.prescale ||
1141 c_buffer.is_enabled()) {
1142
1143 JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1144 JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1145 JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1146 JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1147 JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1148 JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1149
1150 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1151
1152 if (moduleRouter->hasModule(frame->getModuleID())) {
1153
1154 const JChecksum::result_type& result = checksum(*frame);
1155
1156 if (!result) {
1157
1158 JWarningStream(logger) << "Invalid data at "
1159 << "run = " << timeslice.getRunNumber() << ";"
1160 << "frame index = " << timeslice.getFrameIndex() << ";"
1161 << "module = " << frame->getModuleID() << ";"
1162 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1163
1164 if (dumpCount < dumpLimit && result.has(dumpMask)) {
1165 timesliceTX.push_back(*frame);
1166 }
1167
1168 continue;
1169 }
1170
1171 const JModule& module = moduleRouter->getModule(frame->getModuleID());
1172 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1173
1174 // Apply high-rate veto
1175
1176 buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1177
1178 // L0
1179
1180 timesliceL0.push_back(JSuperFrame1D_t(buffer));
1181
1182 // Nano-beacon trigger
1183
1184 if (parameters.triggerNB.enabled) {
1185
1186 JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1187
1188 if (buffer.begin() != __end) {
1189
1190 timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1191 frame->getModuleIdentifier(),
1192 module.getPosition()));
1193
1194 JSuperFrame1D_t zbuf;
1195
1196 (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1197
1198 (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1199 }
1200 }
1201
1202 // L1
1203
1204 timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1205 frame->getModuleIdentifier(),
1206 module.getPosition()));
1207
1208 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1209
1210 // L2
1211
1212 timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1213 frame->getModuleIdentifier(),
1214 module.getPosition()));
1215
1216 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1217
1218 // SN
1219
1220 timesliceSN.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1221 frame->getModuleIdentifier(),
1222 module.getPosition()));
1223
1224 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1225
1226 } else {
1227
1228 JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1229 }
1230 }
1231
1232 if (!timesliceTX.empty()) {
1233
1234 if (dumpCount < dumpLimit) {
1235
1236 this->put(timesliceTX);
1237
1238 dumpCount += 1;
1239 }
1240 }
1241
1242 // Trigger
1243
1244 if (parameters.triggerNB.enabled) {
1245
1246 const JTriggerInput trigger_input(timesliceNB);
1247
1248 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1249
1250 if (parameters.triggerNB.write()) {
1251
1252 JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1253 getTriggerMask(triggerNB->getTriggerBit()),
1254 *hit,
1256 *moduleRouter,
1257 parameters.TMaxLocal_ns,
1258 parameters.triggerNB.DMax_m,
1259 getTimeRange(parameters.triggerNB));
1260
1261 this->put(tev);
1262 }
1263 }
1264 }
1265
1266 JTriggerInput trigger_input(timesliceL2);
1267 JTriggerOutput trigger_output;
1268
1269 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1270 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1271 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1272
1273 trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1274
1275 for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1276
1277 const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1278
1279 this->put(object);
1280
1281 numberOfEvents += 1;
1282 }
1283
1284 if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1285
1286 const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1287
1288 if (parameters.writeL1) { this->put(object); }
1289 if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1290 }
1291
1292 if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1293
1294 const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1295
1296 if (parameters.writeL2) { this->put(object); }
1297 if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1298 }
1299
1300 if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1301
1302 const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1303
1304 if (parameters.writeSN) { this->put(object); }
1305 if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1306 }
1307
1308 if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1309
1310 if (parameters.writeL0) { this->put(timeslice); }
1311 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1312 }
1313 }
1314
1315 } catch(const std::exception& error) {
1316
1317 JErrorStream(logger) << "Error = " << error.what() << ";"
1318 << "run = " << timeslice.getRunNumber() << ";"
1319 << "frame index = " << timeslice.getFrameIndex() << ";"
1320 << "time slice not correctly processed;"
1321 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1322
1323 if (dumpCount < dumpLimit) {
1324
1325 this->put(static_cast<const JDAQTimeslice&>(timeslice));
1326
1327 dumpCount += 1;
1328 }
1329 }
1330
1331 timesliceRouter->reset();
1332 }
1333
1334
1335 /**
1336 * Report status to message logger.
1337 */
1338 void typeout()
1339 {
1340 timer.stop();
1341
1342 const double T_us = (double) timer.usec_wall;
1343
1344 JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1345 JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1346 JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1347 JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1348 JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1349 JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1350
1352 JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1353 }
1354
1355 JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1356 JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice << ' ' << FIXED(5,3) << factor_per_slice;
1357
1358 JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1359
1360 const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1361 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1362
1363 if (processedSlicesTime_us > 0) {
1364 JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1365 }
1366 if (processedDetectorTime_us > 0) {
1367 JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1368 }
1369
1370 timer.start();
1371 }
1372
1373
1374 /**
1375 * Tagged action to handle alerts.
1376 *
1377 * \param tag tag
1378 * \param length number of characters
1379 * \param buffer message
1380 */
1381 virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1382 {
1383 using namespace std;
1384
1385 JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1386
1387 if (tag == RC_ALERT) {
1388
1389 if (c_buffer.is_open()) {
1390
1391 JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1392
1393 c_buffer.close(true);
1394 }
1395
1396 if (c_buffer.is_enabled()) {
1397
1398 c_buffer.open();
1399
1400 if (c_buffer.is_open()) {
1401
1402 JStatusStream(logger) << "Created circular buffer " << c_buffer;
1403
1405
1406 } else {
1407
1408 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1409
1410 c_buffer.disable();
1411 }
1412 }
1413
1414 } else {
1415
1416 JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1417 }
1418 }
1419
1420 JMeta meta; //!< meta data
1421
1422 private:
1423
1425 JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1426 std::string hostname; //!< host name of data server
1427
1428 /**
1429 * Auxiliary method to send object to data server.
1430 *
1431 * \param object object to be sent
1432 */
1433 template<class T>
1434 void put(const T& object)
1435 {
1436 try {
1437
1438 const localtime_t t0 = getLocalTime();
1439
1440 datawriter->put(object);
1441
1442 const localtime_t t1 = getLocalTime();
1443
1444 numberOfBytes += getSizeof(object);
1445
1446 Qx.put(t1 - t0);
1447 }
1448 catch(const std::exception& error) {
1449 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1450 ev_error();
1451 }
1452 }
1453
1454
1455 int port; //!< server socket port
1457
1458 JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1459 JChannelList_t channelList; //!< connections to data queue
1460
1463
1464 std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1470
1471 struct :
1472 public std::map<int, int>
1473 {
1474 /**
1475 * Get minimal frame index.
1476 *
1477 * \return frame index
1478 */
1479 int min() const
1480 {
1481 if (this->empty()) {
1482
1483 return -1;
1484
1485 } else {
1486
1487 int min = std::numeric_limits<int>::max();
1488
1489 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1490 if (i->second < min) {
1491 min = i->second;
1492 }
1493 }
1494
1495 return min;
1496 }
1497 }
1498
1499 /**
1500 * Get maximal frame index.
1501 *
1502 * \return frame index
1503 */
1504 int max() const
1505 {
1506 if (this->empty()) {
1507
1508 return -1;
1509
1510 } else {
1511
1512 int max = std::numeric_limits<int>::lowest();
1513
1514 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1515 if (i->second > max) {
1516 max = i->second;
1517 }
1518 }
1519
1520 return max;
1521 }
1522 }
1523
1525
1526 // trigger
1527
1530
1537
1542
1548
1552
1553 // process management
1554
1557
1558 // memory management
1559
1560 long long int totalCPURAM;
1562 long long int maxQueueSize;
1563 long long int queueSize;
1564
1565 // statistics
1566
1568
1569 long long int numberOfEvents;
1570 long long int numberOfBytes;
1573
1576
1577 // temporary
1578
1581 long long int number_of_reads;
1583
1584 // circular buffer
1585
1587 };
1588}
1589
1590/**
1591 * \file
1592 *
1593 * Application for real-time filtering of data.
1594 * For more information, see KM3NETDAQ::JDataFilter.
1595 *
1596 * \author rbruijn and mdejong
1597 */
1598int main(int argc, char* argv[])
1599{
1600 using namespace std;
1601 using namespace JPP;
1602 using namespace KM3NETDAQ;
1603
1604 string server;
1605 string logger;
1606 string hostname;
1607 string client_name;
1608 int port;
1609 int backlog;
1610 bool use_cout;
1611 string path;
1612 string archive;
1613 int debug;
1614
1615
1616 try {
1617
1618 JParser<> zap("Application for real-time filtering of data.");
1619
1620 zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1621 zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1622 zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1623 zap['u'] = make_field(client_name, "client name") = "%";
1624 zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1625 zap['q'] = make_field(backlog, "back log") = 1024;
1626 zap['c'] = make_field(use_cout, "print to terminal");
1627 zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1628 zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1629 zap['d'] = make_field(debug, "debug level") = 0;
1630
1631 zap(argc, argv);
1632 }
1633 catch(const std::exception& error) {
1634 FATAL(error.what() << endl);
1635 }
1636
1637
1638 JLogger* out = NULL;
1639
1640 if (use_cout)
1641 out = new JStreamLogger(cout);
1642 else
1643 out = new JControlHostLogger(logger);
1644
1645 JDataFilter dfilter(client_name,
1646 server,
1647 hostname,
1648 out,
1649 debug,
1650 port,
1651 backlog,
1652 path,
1653 archive);
1654
1655 dfilter.meta = JMeta(argc, argv);
1656
1657 dfilter.enter();
1658 dfilter.run();
1659}
Fixed parameters and ControlHost tags for KM3NeT DAQ.
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Data structure for detector geometry and calibration.
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Basic data structure for L0 hit.
Basic data structure for L1 hit.
Tools for handling different hit types.
General purpose messaging.
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:72
ROOT I/O of application specific meta data.
Hostname and IP address functions.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
Physics constants.
I/O formatting auxiliaries.
Utility class to parse parameter values.
File status.
ROOT TTree parameter settings of various packages.
System auxiliaries.
System time information.
Scheduling of actions via fixed latency intervals.
Setting of trigger bits.
Basic data structure for time and time over threshold information of hit.
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
Auxiliary methods for handling file names, type names and environment.
void merge(const JMatch_t &match)
Merge events.
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_error_event ev_error
JDAQStateMachine::ev_configure_event ev_configure
Detector data structure.
Definition JDetector.hh:96
Router for direct addressing of module data in detector data structure.
Data structure for a composite optical module.
Definition JModule.hh:75
Utility class to parse parameter values.
bool read(const JEquation &equation)
Read equation.
Time keeper.
Auxiliary class for CPU timing and usage.
Definition JTimer.hh:33
unsigned long long usec_ucpu
Definition JTimer.hh:239
unsigned long long usec_wall
Definition JTimer.hh:238
void stop()
Stop timer.
Definition JTimer.hh:127
void reset()
Reset timer.
Definition JTimer.hh:93
unsigned long long usec_scpu
Definition JTimer.hh:240
void start()
Start timer.
Definition JTimer.hh:106
const JPosition3D & getPosition() const
Get position.
Byte array binary input.
const char * data() const
Get data.
int size() const
Get size.
int getFileDescriptor() const
Get file descriptor.
virtual bool is_open() const =0
Check is device is open.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Auxiliary class for method select.
void set(const int file_descriptor)
Set file descriptor.
bool has(const int file_descriptor) const
Has file descriptor.
Exception for opening of file.
The template JSinglePointer class can be used to hold a pointer to an object.
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
Message logger with time scheduler.
Message logging based on std::ostream.
Implemenation of object output through ControlHost.
TCP server socket.
Socket input channel.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition JSocket.hh:148
int getReceiveBufferSize() const
Set receive buffer size.
Definition JSocket.hh:159
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition JSocket.hh:104
TCP socket.
Definition JTCPSocket.hh:30
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition JTCPSocket.hh:56
ControlHost tag.
Definition JTag.hh:38
std::string toString() const
Convert tag to string.
Definition JTag.hh:171
Utility class to parse command line options.
Definition JParser.hh:1698
TFile * getFile() const
Get file.
Definition JRootFile.hh:66
virtual bool is_open() const override
Check is file is open.
Definition JRootFile.hh:77
JTreeWriter< T, JRootCreateFlatTree< T >::value > * out
virtual bool put(const T &object) override
Object output.
ROOT TTree object output.
1-dimensional frame with time calibrated data from one optical module.
2-dimensional frame with time calibrated data from one optical module.
static JSuperFrame2D< JElement_t, JAllocator_t > demultiplex
Demultiplexer.
std::vector< value_type >::iterator iterator
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Time slice with calibrated data.
Definition JTimeslice.hh:29
Data structure for input to trigger algorithm.
Nano-beacon trigger.
Definition JTriggerNB.hh:21
Auxiliary class to build JDAQEvent for a triggered event.
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
int getRunNumber() const
Get run number.
int getFrameIndex() const
Get frame index.
Control unit client base class.
JSharedPointer< JControlHost > server
message server
bool isRunning() const
Check if this client is in runnig state.
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
void addSubscription(const JSubscription &subscription)
Add custom subscription.
int getModuleID() const
Get module identifier.
Auxiliary class for itemization of process list.
std::string index
index in process list
Data frame of one optical module.
static void reset()
Reset counter of unique instance of this class object.
Main class for real-time filtering of data.
long long int number_of_packets_received
JMessageScheduler logErrorRun
JSinglePointer< JServerSocket > serversocket
server for data queue connections
JMessageScheduler logErrorDetector
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
JCircularBuffer_t c_buffer
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
long long int numberOfBytes
long long int number_of_packets_discarded
JMessageScheduler logErrorIndex
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
virtual void actionInit(int length, const char *buffer) override
void put(const T &object)
Auxiliary method to send object to data server.
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
long long int number_of_bytes_received
virtual void actionContinue(int length, const char *buffer) override
long long int maxQueueSize
int port
server socket port
long long int totalCPURAM
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
std::vector< JDAQProcess > dataQueues
JMessageScheduler logErrorOvercomplete
long long int number_of_reads
std::vector< JSocketInputChannel_t > JChannelList_t
virtual void actionStop(int length, const char *buffer) override
JSinglePointer< JBuildL2_t > buildNB
JSinglePointer< JBuildL2_t > buildL2
JBuildL1< hit_type > JBuildL1_t
JSinglePointer< JBuildL1_t > buildL1
virtual void actionExit() override
virtual void actionQuit(int length, const char *buffer) override
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
std::string hostname
host name of data server
virtual void actionConfigure(int length, const char *buffer) override
long long int numberOfTimeslicesProcessed
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JChannelList_t channelList
connections to data queue
long long int numberOfIncompleteTimeslicesProcessed
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
JSuperFrame1D< hit_type > JSuperFrame1D_t
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
JMessageScheduler logErrorIncomplete
JSuperFrame2D< hit_type > JSuperFrame2D_t
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
JTimeslice< hit_type > JTimeslice_t
JSinglePointer< JModuleRouter > moduleRouter
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JSinglePointer< JTimesliceRouter > timesliceRouter
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
JSinglePointer< JTriggerNB > triggerNB
JBuildL2< hit_type > JBuildL2_t
long long int numberOfEvents
std::vector< JDAQProcess > dataFilters
JSinglePointer< JTrigger3DShower > trigger3DShower
virtual void actionStart(int length, const char *buffer) override
virtual void actionPause(int length, const char *buffer) override
JSinglePointer< JTriggerMXShower > triggerMXShower
virtual void actionReset(int length, const char *buffer) override
std::set< int > modules
JSinglePointer< JBuildL2_t > buildSN
JTriggerParameters parameters
void typeout()
Report status to message logger.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
Definition JVectorize.hh:54
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
std::string trim(const std::string &buffer)
Trim string.
static const long long int GIGABYTE
Number of bytes in a mega-byte.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const char * getName()
Get ROOT name of given data type.
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
long long int localtime_t
Type definition of local time.
unsigned long long int getRAM()
Get RAM of this CPU.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition JNetwork.hh:216
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
static JStat getFileStatus
Function object for file status.
Definition JStat.hh:173
return result
Definition JPolint.hh:862
const int n
Definition JPolint.hh:791
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition JChecksum.hh:200
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
static const JNET::JTag RC_DFILTER
Definition JDAQTags.hh:73
double getFrameTime()
Get frame time duration.
Definition JDAQClock.hh:162
static const JNET::JTag RC_ALERT
Definition JDAQTags.hh:79
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
Definition JDAQTags.hh:36
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
Definition JDAQTags.hh:37
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
Definition JDAQTags.hh:38
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition JDAQTags.hh:90
size_t getSizeof()
Definition of method to get size of data type.
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition pmt_status.hh:13
Auxiliary data structure for sequence of same character.
Definition JManip.hh:330
Auxiliary data structure for floating point format specification.
Definition JManip.hh:448
Detector file.
Definition JHead.hh:227
Match of two events considering overlap in time and position.
Auxiliary class for handling status.
Definition JStatus.hh:39
void set(const int bit)
Set PMT status.
Definition JStatus.hh:131
Type list.
Definition JTypeList.hh:23
Level specific message streamers.
Auxiliary class for all subscription.
Auxiliary class for ROOT I/O of application specific meta data.
Definition JMeta.hh:72
Auxiliary class for date and time.
int getYear() const
year a.d.
int getDay() const
day of the month [1-31]
int getMonth() const
month of the year [1-12]
Auxiliary data structure for running average, standard deviation and quantiles.
Definition JQuantile.hh:46
void reset()
Reset.
Definition JQuantile.hh:87
void put(const double x, const double w=1.0)
Put value.
Definition JQuantile.hh:133
double getMean() const
Get mean value.
Definition JQuantile.hh:252
double getXmax() const
Get maximum value.
Definition JQuantile.hh:219
Auxiliary data structure for result of checksum.
Definition JChecksum.hh:90
@ ETDC_t
TDC value error.
Definition JChecksum.hh:47
@ SIZE_t
size error
Definition JChecksum.hh:50
@ EPMT_t
PMT number error.
Definition JChecksum.hh:46
@ TIME_t
Time order error.
Definition JChecksum.hh:48
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
const JTag & getUniqueTag() const
Get unique tag of this run control client.
const std::string & getFullName() const
Get full name of this run control client.
void setClockInterval(const long long int interval_us)
Set interval time.
long long int getClockInterval() const
Get interval time.
Timeslice data structure for L0 data.
std::string archive
Directory for permanent archival.
std::string path
Directory for temporary storage.
Long64_t sizeL1
Number of L1 time slices.
Long64_t sizeSN
Number of SN time slices.
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
JTag tag
Unique tag of this process.
Long64_t sizeL0
Number of L0 time slices.
void close(const bool option)
Close file.
Long64_t sizeL2
Number of L2 time slices.
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
bool is_enabled() const
Check whether writing of data is enabled.