Jpp test-rotations-old-533-g2bdbdb559
the software that should make you happy
Loading...
Searching...
No Matches
JDataFilter.cc
Go to the documentation of this file.
1
2#include <string>
3#include <iostream>
4#include <iomanip>
5#include <deque>
6#include <limits>
7#include <algorithm>
8#include <unistd.h>
9
11#include "JLang/JLangToolkit.hh"
12#include "JLang/JException.hh"
13#include "JLang/JVectorize.hh"
14#include "Jeep/JParser.hh"
15#include "Jeep/JProperties.hh"
16#include "Jeep/JTimer.hh"
17#include "Jeep/JTimekeeper.hh"
18#include "Jeep/JMessage.hh"
19#include "Jeep/JPrint.hh"
20#include "Jeep/JeepToolkit.hh"
21#include "Jeep/JStatus.hh"
28#include "JDAQ/JDAQTags.hh"
29#include "JDAQ/JDAQEventIO.hh"
33#include "JTrigger/JHit.hh"
38#include "JTrigger/JHitL0.hh"
39#include "JTrigger/JHitL1.hh"
40#include "JTrigger/JBuildL1.hh"
41#include "JTrigger/JBuildL2.hh"
53#include "JTrigger/JChecksum.hh"
56#include "JNet/JControlHost.hh"
58#include "JNet/JTCPSocket.hh"
60#include "JNet/JServerSocket.hh"
62#include "JTools/JStats.hh"
63#include "JSupport/JSupport.hh"
65#include "JSupport/JMeta.hh"
66#include "JSystem/JStat.hh"
67#include "JSystem/JTime.hh"
69#include "JSystem/JNetwork.hh"
71
72
73namespace JNET {
74
75 /**
76 * Get size of packeet.
77 *
78 * \param preamble DAQ data preamble
79 * \return size [B]
80 */
81 template<>
83 {
84 return preamble.getLength();
85 }
86}
87
88namespace KM3NETDAQ {
89
90 using namespace JPP;
91
92
93 /**
94 * Get expected number of frames according a given allowed fraction of active modules.
95 *
96 * \param number_of_frames number of frames
97 * \param factor factor
98 * \return number of frames
99 */
100 inline size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
101 {
102 if (factor <= 0.0) {
103 return 1;
104 }
105
106 if (factor >= 1.0) {
107 return number_of_frames;
108 }
109
110 const size_t n = (size_t) (number_of_frames * factor);
111
112 return (n == 0 ? 1 : n);
113 }
114
115
116 /**
117 * Main class for real-time filtering of data.
118 *
119 * This class implements the action methods for each transition of the state machine.\n
120 * When the state machine is entered, data are continually collected using
121 * custom implementation of virtual methods
122 * JDataFilter::setSelect and
123 * JDataFilter::actionSelect.
124 *
125 * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
126 * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
127 * When a JDAQTimeslice is complete,
128 * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.
129 *
130 * The completeness of a time slice is defined by two parameters, namely
131 * <tt>frames_per_slice</tt> and
132 * <tt>factor_per_slice</tt>.\n
133 * These are parsed in method actionConfigure as follows.
134 * <pre>
135 * numberOfFramesPerSlice = <frames per slice> = 1;
136 * factorOfFramesPerSlice = <factor per slice> = 1.0;
137 * </pre>
138 * The expected number of frames in a time slices is defined by their product.\n
139 * Note that the first parameter can change during operation (see below).
140 *
141 * A timeout may occur when the total amount of data or the number of incomplete time slices
142 * in the queue exceeds one of the limits <tt>queueSize</tt> or <tt>queueDepth</tt>.\n
143 * These are parsed in method actionConfigure as follows.
144 * <pre>
145 * queueSize = <maximal queue size [B]>;
146 * queueDepth = <maximal queue depth>;
147 * </pre>
148 * Note that these values apply per JDataFilter.\n
149 *
150 * The parameter <tt>frames_per_slice</tt> is subject to a servo mechanism.\n
151 * When a timeout occurs,
152 * it is set to the number of frames in the oldest time slice.\n
153 * When data are received with a frame index below the current frame index,
154 * it is incremented by one.
155 *
156 * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
157 * The following parameters can be used to configure the circular buffer.\n
158 * <pre>
159 * path = <write directory for temporary circular buffer>;
160 * archive = <write directory for archival of circular buffer>;
161 * c_sizeL0 = <L0 buffer size>;
162 * c_sizeL1 = <L1 buffer size>;
163 * c_sizeL2 = <L2 buffer size>;
164 * c_sizeSN = <SN buffer size>;
165 * </pre>
166 *
167 * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
168 * - There will always be a file created which is deleted when the state machine is exited;
169 * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
170 * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
171 * - After archival of the temporary file, a new temporary file will be opened;
172 * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
173 * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
174 *
175 * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
176 * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
177 * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
178 * (e.g. when there is more than one alert on a given day).
179 *
180 * Data can be discarded for various reasons.\n
181 * The list of criteria includes validity ranges of the PMT number and TDC values as well as the sorted times per PMT.\n
182 * The discarded data can be written to disk according to the parameters:
183 * <pre>
184 * dumpLimit = <maximum number of time slices to be dumped>
185 * dumpMask = <mask for data to be dumped>
186 * </pre>
187 * The first limits the number of time slices with discarded data that will be dumped and \n
188 * the latter corresponds to bits according to the enumeration JChecksum::error_types.\n
189 * The discarded data will be stored as JDAQTimeslice.
190 *
191 * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
192 *
193 * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
194 */
196 public JDAQClient
197 {
198 public:
199
202
203 typedef double hit_type;
206 typedef JTimeslice <hit_type> JTimeslice_t;
207 typedef JBuildL1 <hit_type> JBuildL1_t;
208 typedef JBuildL2 <hit_type> JBuildL2_t;
209
210
211
212 /**
213 * Circular buffer.
214 */
216 public JTreeRecorder<JDAQTimesliceTypes_t>
217 {
220
221
222 static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
223
224
225 /**
226 * Constructor.
227 *
228 * \param path directory for temporary storage
229 * \param archive directory for permanent archival
230 * \param tag tag
231 */
232 JCircularBuffer_t(const std::string& path,
233 const std::string& archive,
234 const JTag& tag) :
235 path (path),
237 tag (tag)
238 {
239 disable();
240 }
241
242
243 /**
244 * Open file.
245 *
246 * If file with same name exists, remove it beforehand.
247 */
248 void open()
249 {
250 using namespace std;
251 using namespace JPP;
252
253 gErrorIgnoreLevel = kFatal;
254
255 std::ostringstream os;
256
257 os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
258
259 if (getFileStatus(os.str().c_str())) {
260 std::remove(os.str().c_str());
261 }
262
263 this->open(os.str().c_str());
264 }
265
266
267 /**
268 * Close file.
269 *
270 * If option is true, archive file; else delete file.
271 *
272 * \param option option
273 */
274 void close(const bool option)
275 {
276 using namespace std;
277 using namespace JPP;
278
279 const JDateAndTime cal;
280
281 if (this->is_open()) {
282
283 const string file_name = this->getFile()->GetName();
284
285 this->close();
286
287 if (option) {
288
289 for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
290
291 ostringstream os;
292
293 os << getFullPath(this->archive)
294 << "KM3NeT"
295 << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
296 << "_" << this->tag;
297
298 if (i != 0) {
299 os << "_" << i;
300 }
301
302 os << ".root";
303
304 if (!getFileStatus(os.str().c_str())) {
305
306 if (JSYSTEM::rename(file_name, os.str()) == 0)
307 return;
308 else
309 THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
310 }
311 }
312
313 } else {
314
315 std::remove(file_name.c_str());
316 }
317 }
318 }
319
320
321 /**
322 * Disable writing.
323 */
324 void disable()
325 {
326 sizeL0 = 0;
327 sizeL1 = 0;
328 sizeL2 = 0;
329 sizeSN = 0;
330 }
331
332
333 /**
334 * Check whether writing of data is enabled.
335 *
336 * \return true if writing enabled; else false
337 */
338 bool is_enabled() const
339 {
340 return (sizeL0 > 0 ||
341 sizeL1 > 0 ||
342 sizeL2 > 0 ||
343 sizeSN > 0);
344 }
345
346
347 /**
348 * Write circular buffer to output stream.
349 *
350 * \param out output stream
351 * \param object circular buffer
352 * \return output stream
353 */
354 friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
355 {
356 if (object.is_open())
357 out << object.getFile()->GetName();
358 else
359 out << "void";
360
361 out << ' ';
362
363 out << object.sizeL0 << '/'
364 << object.sizeL1 << '/'
365 << object.sizeL2 << '/'
366 << object.sizeSN << '/';
367
368 return out;
369 }
370
371 Long64_t sizeL0; //!< Number of L0 time slices
372 Long64_t sizeL1; //!< Number of L1 time slices
373 Long64_t sizeL2; //!< Number of L2 time slices
374 Long64_t sizeSN; //!< Number of SN time slices
375
376 std::string path; //!< Directory for temporary storage
377 std::string archive; //!< Directory for permanent archival
378 JTag tag; //!< Unique tag of this process
379 };
380
381
382 /**
383 * Sort DAQ process by index.
384 *
385 * \param first first DAQ process
386 * \param second second DAQ process
387 * \return true if index of first DAQ process less than that of second; else false
388 */
389 static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
390 {
391 return first.index < second.index;
392 }
393
394
395 /**
396 * Constructor.
397 *
398 * \param name name of client
399 * \param server name of command message server
400 * \param hostname name of data server
401 * \param logger pointer to logger
402 * \param level debug level
403 * \param port server port
404 * \param backlog server backlog
405 * \param path directory for temporary storage
406 * \param archive directory for parmanent archival
407 */
408 JDataFilter(const std::string& name,
409 const std::string& server,
410 const std::string& hostname,
412 const int level,
413 const int port,
414 const int backlog,
415 const std::string& path,
416 const std::string& archive) :
419 port (port),
421 c_buffer (path, archive, getUniqueTag())
422 {
423 replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
424
426
427 totalCPURAM = getRAM();
429 dataqueue_slice_index.clear();
430 reporting = false;
431
432 this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
433 }
434
435
436 virtual void actionEnter() override
437 {}
438
439
440 virtual void actionExit() override
441 {
442 if (c_buffer.is_open()) {
443
444 JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
445
446 c_buffer.close(false);
447 }
448
449 datawriter.reset();
450 }
451
452
453 virtual void actionInit(int length, const char* buffer) override
454 {
455 JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
456
457 try {
458
459 JDebugStream(logger) << "Start server.";
460
461 if (serversocket.is_valid()) {
462 serversocket->shutdown();
463 }
464
466 }
467 catch(const std::exception& error) {
468 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
469 ev_error();
470 }
471 }
472
473
474 virtual void actionConfigure(int length, const char* buffer) override
475 {
476 using namespace std;
477
478 JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
479
480 string _hostname_ = "";
481
482 long long int update_s = 20;
483 long long int logger_s = 10;
484
485 parameters .reset();
486 dataFilters.clear();
487 dataQueues .clear();
488
489 reporting = false;
490 dumpCount = 0;
491 dumpLimit = numeric_limits<int>::max();
492 dumpMask = 0;
496 //dumpMask.set(JChecksum::EUDP_t);
498
499 detector.comment.clear();
500 detector.clear();
501
502 JProperties properties(JEquationParameters("=", ";", "", ""), 0);
503
504 properties["dataWriter"] = _hostname_;
505 properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
506 properties["factorOfFramesPerSlice"] = factor_per_slice = 1.0;
507 properties["detector"] = detector;
508 properties["triggerParameters"] = parameters;
509 properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
510 properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
511 properties["frameIndex"] = maximal_frame_index = 100000;
512 properties["logger_s"] = logger_s;
513 properties["update_s"] = update_s;
514 properties["JDataFilter"] = dataFilters;
515 properties["DataQueue"] = dataQueues;
516 properties["path"] = c_buffer.path;
517 properties["archive"] = c_buffer.archive;
518 properties["c_sizeL0"] = c_buffer.sizeL0;
519 properties["c_sizeL1"] = c_buffer.sizeL1;
520 properties["c_sizeL2"] = c_buffer.sizeL2;
521 properties["c_sizeSN"] = c_buffer.sizeSN;
522 properties["dumpLimit"] = dumpLimit;
523 properties["dumpMask"] = dumpMask;
524
525 try {
526 properties.read(string(buffer, length));
527 }
528 catch(const std::exception& error) {
529 JErrorStream(logger) << error.what();
530 }
531
532 if (update_s <= 0) { update_s = 20; }
533 if (logger_s <= 0) { logger_s = 10; }
534
535 setClockInterval(update_s * 1000000LL);
536
537 _hostname_ = trim(_hostname_);
538
539 if (_hostname_ != "" && _hostname_ != hostname) {
540
541 datawriter.reset();
542
543 hostname = _hostname_;
544 }
545
546 bool status = datawriter.is_valid();
547
548 if (status) {
549
550 try {
551 status = datawriter->Connected() == 0;
552 }
553 catch (const exception&) {
554 status = false;
555 }
556 }
557
558 if (!status) {
559
561
562 datawriter->MyId(getFullName());
563 }
564
565 datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
566
567 // process processlist
568
569 if (dataFilters.empty()) {
570 JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
571 << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
572 }
573
574 sort(dataFilters.begin(), dataFilters.end(), compare);
575
576 unsigned int numberOfDataFiltersOnThisMachine = 0;
577 vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
578
580
581 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
582
583 if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
584
585 numberOfDataFiltersOnThisMachine++;
586
587 if (i->port == this->port) {
588 thisProcess = i;
589 }
590 }
591 }
592
593 if (numberOfDataFiltersOnThisMachine == 0) {
594 JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
595 << "Assuming one datafilter on this machine.";
596 numberOfDataFiltersOnThisMachine = 1;
597 }
598
599 if (thisProcess == dataFilters.end()) {
600
601 JErrorStream error(logger);
602
603 error << "This process cannot be found in the process list. Why do I exist?";
604 error << " my IP addresses:";
605
606 for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
607 error << ' ' << *i;
608 }
609
610 error << " my port: " << this->port;
611 error << " process list";
612
613 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
614 error << ' ' << i->hostname << ':' << i->port;
615 }
616 }
617
618 if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
619 JErrorStream(logger) << "Mismatch between given process names: "
620 << "I am called " << getName()
621 << ", but in the process list I am referred to as " << thisProcess->index;
622 }
623
624 if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
625 reporting = true;
626 }
627
628 if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
629
630 maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
631
632 JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
633 << "Queue size reduced to "
634 << maxQueueSize << " bytes." ;
635 }
636
637 // detector
638
639 if (parameters.disableHighRateVeto) {
640
641 JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
642
643 detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
644 }
645
646 // trigger parameters
647
649
650 triggerNB .reset(new JTriggerNB (parameters));
654
656
657 if (reporting) {
658 JNoticeStream(logger) << "This data filter process will report.";
659 JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
660 JDebugStream (logger) << "Trigger parameters: " << parameters;
661 JDebugStream (logger) << "Detector description: " << endl << detector;
662 JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
663 }
664
665 timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
666
667 // set L1, L2 and SN builders
668
669 buildL1.reset(new JBuildL1_t(parameters));
670 buildL2.reset(new JBuildL2_t(parameters.L2));
671 buildSN.reset(new JBuildL2_t(parameters.SN));
672 buildNB.reset(new JBuildL2_t(parameters.NB));
673
674 if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
675 if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
676 if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
677 if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
678
679 logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
680 logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
681 logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
682 logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
684
685 if (c_buffer.is_enabled()) {
686
687 if (!c_buffer.is_open()) {
688
689 c_buffer.open();
690
691 if (c_buffer.is_open()) {
692
694
695 JStatusStream(logger) << "Created circular buffer " << c_buffer;
696
697 } else {
698
699 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
700 }
701
702 } else {
703
704 JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
705 }
706 }
707
708 if (c_buffer.is_open()) {
709 if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
710 if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
711 if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
712 if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
713 } else {
715 }
716 }
717
718
719 virtual void actionStart(int length, const char* buffer) override
720 {
721 using namespace std;
722
723 if (reporting) {
724 JNoticeStream(logger) << "Start run " << getRunNumber();
725 }
726
727 timeslices.clear();
728
730 dataqueue_slice_index.clear();
731 queueSize = 0;
732
733 numberOfEvents = 0;
734 numberOfBytes = 0;
737
741 number_of_reads = 0;
742
743 minFrameNumber = numeric_limits<int>::max();
744 maxFrameNumber = numeric_limits<int>::min();
745
746 // Reset global trigger counter.
747
749
755
756 timer.reset();
757 timer.start();
758
759 Qt.reset();
760 Qx.reset();
761
762 // send trigger parameters to the datawriter
763
764 ostringstream os;
765
766 os << getRunNumber() << ' ' << parameters;
767
768 datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
769 }
770
771
772 virtual void actionPause(int length, const char* buffer) override
773 {
774 using namespace std;
775
776 if (!timeslices.empty()) {
777
778 JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
779
780 for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
781 queueSize -= getSizeof(*i);
782 }
783
784 timeslices.clear();
785 }
786
787 { // force clearance of memory
788
789 deque<JDAQTimesliceL0> buffer;
790
791 timeslices.swap(buffer);
792 }
793
794 if (queueSize != 0) {
795 JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
796 }
797
799 dataqueue_slice_index.clear();
800 queueSize = 0;
801
802 timer.stop();
803 }
804
805
806 virtual void actionContinue(int length, const char* buffer) override
807 {
808 timer.start();
809 }
810
811
812 virtual void actionStop(int length, const char* buffer) override
813 {
814 typeout();
815 }
816
817
818 virtual void actionReset(int length, const char* buffer) override
819 {
820 if (serversocket.is_valid()) {
821 serversocket->shutdown();
822 }
823
824 serversocket.reset();
825 }
826
827
828 virtual void actionQuit(int length, const char* buffer) override
829 {
830 datawriter.reset();
831 }
832
833
834 virtual void setSelect(JFileDescriptorMask& mask) const override
835 {
836 if (serversocket.is_valid()) {
837 mask.set(*serversocket);
838 }
839
840 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
841 if (!channel->isReady()) {
842 mask.set(channel->getFileDescriptor());
843 }
844 }
845 }
846
847
848 virtual void actionSelect(const JFileDescriptorMask& mask) override
849 {
850 using namespace std;
851
852 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
853
854 try {
855
856 if (mask.has(channel->getFileDescriptor())) {
857 channel->read();
858 }
859
860 if (channel->isReady()) {
861
863 number_of_reads += channel->getCounter();
864 number_of_bytes_received += channel->size();
865
866 if (isRunning()) {
867
868 try {
869 updateFrameQueue(*channel);
870 }
871 catch(const std::exception& error) {
872
873 JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
874
876 }
877
878 } else {
879
880 JErrorStream(logErrorRun) << "Receiving data while not running.";
881
883 }
884
885 channel->reset();
886 }
887
888 ++channel;
889 }
890 catch(const std::exception& error) {
891
892 if (isRunning()) {
893 JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
894 }
895
896 channel->shutdown();
897
898 channel = channelList.erase(channel);
899 }
900 }
901
902
903 if (serversocket.is_valid()) {
904
905 if (mask.has(*serversocket)) {
906
907 JTCPSocket socket(serversocket->getFileDescriptor());
908
910
911 socket.setKeepAlive (true);
912 socket.setNonBlocking(false);
913
914 JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']' << ' ' << socket.getReceiveBufferSize();
915
916 channelList.push_back(JSocketInputChannel_t(socket));
917 }
918 }
919
920
921 if (!timeslices.empty()) {
922
923 const size_t number_of_frames = getNumberOfFrames(frames_per_slice, factor_per_slice);
924 const size_t maximum_in_queue = getMaximum(make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (size_t) 0);
925
926 if (((timeslices[0].size() >= number_of_frames && // normal
927 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
928
929 (maximum_in_queue >= number_of_frames && // intermittent problem
930 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
931
932 (timeslices.size() >= maxQueueDepth) || // timeout
933 (queueSize >= maxQueueSize))) {
934
935
936 const JDAQTimesliceL0& pending_slice = timeslices.front();
937 queueSize -= getSizeof(pending_slice);
938
939 current_slice_index = pending_slice.getFrameIndex();
940 minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
941 maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
942
943 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
944 modules.insert(i->getModuleID());
945 }
946
947
948 if (isRunning()) {
949
950 const localtime_t t0 = getLocalTime();
951
952 if (!pending_slice.empty()) {
953 processTimeSlice(pending_slice);
954 }
955
956 const localtime_t t1 = getLocalTime();
957
959
960 Qt.put(t1 - t0);
961
962 } else {
963
964 JErrorStream(logErrorRun) << "Skip processing of data while not running.";
965 }
966
967
968 if (modules.size() > frames_per_slice) {
969
970 JErrorStream(logErrorOvercomplete) << "More active modules than expected "
971 << modules.size() << " > " << frames_per_slice
972 << " adjusting frames per slice to " << modules.size();
973
974 frames_per_slice = modules.size();
975 }
976
977
978 if (pending_slice.size() < number_of_frames) {
979
981
982 ostringstream error;
983
984 error << "Timeout -> processed incomplete timeslice: "
985 << "Frame index = " << pending_slice.getFrameIndex() << ';'
986 << "Size of timeslice = " << pending_slice.size() << ';'
987 << "Queue depth = " << timeslices.size() << ';'
988 << "Queue size = " << queueSize << ';'
989 << "DataQueue min = " << dataqueue_slice_index.min() << ';'
990 << "DataQueue max = " << dataqueue_slice_index.max() << ';';
991
992 if (maximum_in_queue >= number_of_frames) {
993
994 error << " intermittent problem -> continues as-is";
995
996 } else {
997
998 modules.clear(); // remove history
999
1000 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
1001 modules.insert(i->getModuleID());
1002 }
1003
1004 error << " adjusting frames per timeslice from " << frames_per_slice << " to " << modules.size();
1005
1006 frames_per_slice = modules.size();
1007 }
1008
1009 JErrorStream(logErrorIncomplete) << error.str();
1010 }
1011
1012
1013 timeslices.pop_front();
1014 }
1015 }
1016 }
1017
1018
1019 virtual void actionRunning() override
1020 {
1021 if (reporting) {
1022 typeout();
1023 }
1024 }
1025
1026
1027 /**
1028 * Update queue with data frames.
1029 *
1030 * Note that any discarded data will be reported.
1031 *
1032 * \param channel incoming data channel
1033 */
1035 {
1036 using namespace std;
1037
1038 JByteArrayReader in(channel.data(), channel.size());
1039
1040 JDAQPreamble preamble;
1041 JDAQSuperFrameHeader header;
1042
1043 in >> preamble;
1044 in >> header;
1045
1046 if (preamble.getLength() != channel.size()) {
1047
1048 JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
1049 << "preamble.getLength() = " << preamble.getLength() << ';'
1050 << "channel.size(): " << channel.size() << ';';
1051
1053
1054 return;
1055 }
1056
1057 if (header.getRunNumber() != getRunNumber()) {
1058
1059 JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
1060 << " != " << getRunNumber()
1061 << " -> Dropping frame.";
1062
1064
1065 return;
1066 }
1067
1068 if (header.getFrameIndex() <= current_slice_index) {
1069
1071
1072 if (modules.insert(header.getModuleID()).second) {
1073
1074 frames_per_slice = modules.size();
1075
1076 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
1077 << " module " << header.getModuleID()
1078 << " -> dropping frame;"
1079 << " increase number of frames expected to: " << frames_per_slice;
1080 }
1081
1082 return;
1083 }
1084
1086
1088
1089 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1090 << " module " << header.getModuleID()
1091 << " -> Dropping frame.";
1092
1093 return;
1094 }
1095
1096 deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1097
1098 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1099 ++timesliceIterator;
1100 }
1101
1102 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1103
1104 // The corresponding time slice already exists
1105
1106 } else {
1107
1108 // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1109
1110 timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1111
1112 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1113
1114 queueSize += getSizeof(*timesliceIterator);
1115 }
1116
1117 timesliceIterator->push_back(JDAQSuperFrame(header));
1118
1119 in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1120
1121 queueSize += getSizeof(*timesliceIterator->rbegin());
1122
1124 }
1125
1126
1127 /**
1128 * Process time slice.
1129 *
1130 * \param timeslice time slice
1131 */
1132 void processTimeSlice(const JDAQTimesliceL0& timeslice)
1133 {
1134 using namespace std;
1135
1136 try {
1137
1138 timesliceRouter->configure(timeslice);
1139
1140 if (parameters.writeSummary()) {
1141 this->put(JDAQSummaryslice(timeslice));
1142 }
1143
1144 if (parameters.trigger3DMuon.enabled ||
1145 parameters.trigger3DShower.enabled ||
1146 parameters.triggerMXShower.enabled ||
1147 parameters.triggerNB.enabled ||
1148 parameters.writeL0.prescale ||
1149 parameters.writeL1.prescale ||
1150 parameters.writeL2.prescale ||
1151 parameters.writeSN.prescale ||
1152 c_buffer.is_enabled()) {
1153
1154 JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1155 JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1156 JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1157 JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1158 JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1159 JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1160
1161 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1162
1163 if (moduleRouter->hasModule(frame->getModuleID())) {
1164
1165 const JChecksum::result_type& result = checksum(*frame);
1166
1167 if (!result) {
1168
1169 JWarningStream(logger) << "Invalid data at "
1170 << "run = " << timeslice.getRunNumber() << ";"
1171 << "frame index = " << timeslice.getFrameIndex() << ";"
1172 << "module = " << frame->getModuleID() << ";"
1173 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1174
1175 if (dumpCount < dumpLimit && result.has(dumpMask)) {
1176 timesliceTX.push_back(*frame);
1177 }
1178
1179 continue;
1180 }
1181
1182 const JModule& module = moduleRouter->getModule(frame->getModuleID());
1183 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1184
1185 // Apply high-rate veto
1186
1187 buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1188
1189 // L0
1190
1191 timesliceL0.push_back(JSuperFrame1D_t(buffer));
1192
1193 // Nano-beacon trigger
1194
1195 if (parameters.triggerNB.enabled) {
1196
1197 JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1198
1199 if (buffer.begin() != __end) {
1200
1201 timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1202 frame->getModuleIdentifier(),
1203 module.getPosition()));
1204
1205 JSuperFrame1D_t zbuf;
1206
1207 (*buildL1)(buffer.begin(), __end, back_inserter(zbuf));
1208
1209 (*buildNB)(buffer.begin(), __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1210 }
1211 }
1212
1213 // L1
1214
1215 timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1216 frame->getModuleIdentifier(),
1217 module.getPosition()));
1218
1219 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1220
1221 // L2
1222
1223 timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1224 frame->getModuleIdentifier(),
1225 module.getPosition()));
1226
1227 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1228
1229 // SN
1230 {
1231 JTimeslice_t::value_type tv(frame->getDAQChronometer(),
1232 frame->getModuleIdentifier(),
1233 module.getPosition());
1234
1235 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(tv));
1236
1237 if (!tv.empty()) {
1238 timesliceSN.push_back(tv);
1239 }
1240 }
1241
1242 } else {
1243
1244 JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1245 }
1246 }
1247
1248 if (!timesliceTX.empty()) {
1249
1250 if (dumpCount < dumpLimit) {
1251
1252 this->put(timesliceTX);
1253
1254 dumpCount += 1;
1255 }
1256 }
1257
1258 // Trigger
1259
1260 if (parameters.triggerNB.enabled) {
1261
1262 const JTriggerInput trigger_input(timesliceNB);
1263
1264 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1265
1266 if (parameters.triggerNB.write()) {
1267
1268 JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1269 getTriggerMask(triggerNB->getTriggerBit()),
1270 *hit,
1272 *moduleRouter,
1273 parameters.TMaxLocal_ns,
1274 parameters.triggerNB.DMax_m,
1275 getTimeRange(parameters.triggerNB));
1276
1277 this->put(tev);
1278 }
1279 }
1280 }
1281
1282 JTriggerInput trigger_input(timesliceL2);
1283 JTriggerOutput trigger_output;
1284
1285 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1286 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1287 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1288
1289 trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1290
1291 for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1292
1293 const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1294
1295 this->put(object);
1296
1297 numberOfEvents += 1;
1298 }
1299
1300 if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1301
1302 const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1303
1304 if (parameters.writeL1) { this->put(object); }
1305 if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1306 }
1307
1308 if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1309
1310 const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1311
1312 if (parameters.writeL2) { this->put(object); }
1313 if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1314 }
1315
1316 if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1317
1318 const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1319
1320 if (parameters.writeSN) { this->put(object); }
1321 if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1322 }
1323
1324 if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1325
1326 if (parameters.writeL0) { this->put(timeslice); }
1327 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1328 }
1329 }
1330
1331 } catch(const std::exception& error) {
1332
1333 JErrorStream(logger) << "Error = " << error.what() << ";"
1334 << "run = " << timeslice.getRunNumber() << ";"
1335 << "frame index = " << timeslice.getFrameIndex() << ";"
1336 << "time slice not correctly processed;"
1337 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1338
1339 if (dumpCount < dumpLimit) {
1340
1341 this->put(static_cast<const JDAQTimeslice&>(timeslice));
1342
1343 dumpCount += 1;
1344 }
1345 }
1346
1347 timesliceRouter->reset();
1348 }
1349
1350
1351 /**
1352 * Report status to message logger.
1353 */
1354 void typeout()
1355 {
1356 timer.stop();
1357
1358 const double T_us = (double) timer.usec_wall;
1359
1360 JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1361 JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1362 JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1363 JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1364 JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1365 JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1366
1368 JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1369 }
1370
1371 JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1372 JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice << ' ' << FIXED(5,3) << factor_per_slice;
1373
1374 JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1375
1376 const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1377 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1378
1379 if (processedSlicesTime_us > 0) {
1380 JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1381 }
1382 if (processedDetectorTime_us > 0) {
1383 JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1384 }
1385
1386 timer.start();
1387 }
1388
1389
1390 /**
1391 * Tagged action to handle alerts.
1392 *
1393 * \param tag tag
1394 * \param length number of characters
1395 * \param buffer message
1396 */
1397 virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1398 {
1399 using namespace std;
1400
1401 JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1402
1403 if (tag == RC_ALERT) {
1404
1405 if (c_buffer.is_open()) {
1406
1407 JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1408
1409 c_buffer.close(true);
1410 }
1411
1412 if (c_buffer.is_enabled()) {
1413
1414 c_buffer.open();
1415
1416 if (c_buffer.is_open()) {
1417
1418 JStatusStream(logger) << "Created circular buffer " << c_buffer;
1419
1421
1422 } else {
1423
1424 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1425
1426 c_buffer.disable();
1427 }
1428 }
1429
1430 } else {
1431
1432 JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1433 }
1434 }
1435
1436 JMeta meta; //!< meta data
1437
1438 private:
1439
1441 JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1442 std::string hostname; //!< host name of data server
1443
1444 /**
1445 * Auxiliary method to send object to data server.
1446 *
1447 * \param object object to be sent
1448 */
1449 template<class T>
1450 void put(const T& object)
1451 {
1452 try {
1453
1454 const localtime_t t0 = getLocalTime();
1455
1456 datawriter->put(object);
1457
1458 const localtime_t t1 = getLocalTime();
1459
1460 numberOfBytes += getSizeof(object);
1461
1462 Qx.put(t1 - t0);
1463 }
1464 catch(const std::exception& error) {
1465 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1466 ev_error();
1467 }
1468 }
1469
1470
1471 int port; //!< server socket port
1473
1474 JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1475 JChannelList_t channelList; //!< connections to data queue
1476
1479
1480 std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1486
1487 struct :
1488 public std::map<int, int>
1489 {
1490 /**
1491 * Get minimal frame index.
1492 *
1493 * \return frame index
1494 */
1495 int min() const
1496 {
1497 if (this->empty()) {
1498
1499 return -1;
1500
1501 } else {
1502
1503 int min = std::numeric_limits<int>::max();
1504
1505 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1506 if (i->second < min) {
1507 min = i->second;
1508 }
1509 }
1510
1511 return min;
1512 }
1513 }
1514
1515 /**
1516 * Get maximal frame index.
1517 *
1518 * \return frame index
1519 */
1520 int max() const
1521 {
1522 if (this->empty()) {
1523
1524 return -1;
1525
1526 } else {
1527
1528 int max = std::numeric_limits<int>::lowest();
1529
1530 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1531 if (i->second > max) {
1532 max = i->second;
1533 }
1534 }
1535
1536 return max;
1537 }
1538 }
1539
1541
1542 // trigger
1543
1546
1553
1558
1564
1566 int dumpLimit; //!< maximum number of time slices to be dumped
1567 JStatus dumpMask; //!< mask for data to be dumped
1568
1569 // process management
1570
1573
1574 // memory management
1575
1576 long long int totalCPURAM;
1578 long long int maxQueueSize;
1579 long long int queueSize;
1580
1581 // statistics
1582
1584
1585 long long int numberOfEvents;
1586 long long int numberOfBytes;
1589
1592
1593 // temporary
1594
1597 long long int number_of_reads;
1599
1600 // circular buffer
1601
1603 };
1604}
1605
1606/**
1607 * \file
1608 *
1609 * Application for real-time filtering of data.
1610 * For more information, see KM3NETDAQ::JDataFilter.
1611 *
1612 * \author rbruijn and mdejong
1613 */
1614int main(int argc, char* argv[])
1615{
1616 using namespace std;
1617 using namespace JPP;
1618 using namespace KM3NETDAQ;
1619
1620 string server;
1621 string logger;
1622 string hostname;
1623 string client_name;
1624 int port;
1625 int backlog;
1626 bool use_cout;
1627 string path;
1628 string archive;
1629 int debug;
1630
1631
1632 try {
1633
1634 JParser<> zap("Application for real-time filtering of data.");
1635
1636 zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1637 zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1638 zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1639 zap['u'] = make_field(client_name, "client name") = "%";
1640 zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1641 zap['q'] = make_field(backlog, "back log") = 1024;
1642 zap['c'] = make_field(use_cout, "print to terminal");
1643 zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1644 zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1645 zap['d'] = make_field(debug, "debug level") = 0;
1646
1647 zap(argc, argv);
1648 }
1649 catch(const std::exception& error) {
1650 FATAL(error.what() << endl);
1651 }
1652
1653
1654 JLogger* out = NULL;
1655
1656 if (use_cout)
1657 out = new JStreamLogger(cout);
1658 else
1659 out = new JControlHostLogger(logger);
1660
1661 JDataFilter dfilter(client_name,
1662 server,
1663 hostname,
1664 out,
1665 debug,
1666 port,
1667 backlog,
1668 path,
1669 archive);
1670
1671 dfilter.meta = JMeta(argc, argv);
1672
1673 dfilter.enter();
1674 dfilter.run();
1675}
Fixed parameters and ControlHost tags for KM3NeT DAQ.
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Data structure for detector geometry and calibration.
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Basic data structure for L0 hit.
Basic data structure for L1 hit.
Tools for handling different hit types.
General purpose messaging.
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:72
ROOT I/O of application specific meta data.
Hostname and IP address functions.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
Physics constants.
I/O formatting auxiliaries.
Utility class to parse parameter values.
File status.
ROOT TTree parameter settings of various packages.
System auxiliaries.
System time information.
Scheduling of actions via fixed latency intervals.
Setting of trigger bits.
Basic data structure for time and time over threshold information of hit.
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
Auxiliary methods for handling file names, type names and environment.
void merge(const JMatch_t &match)
Merge events.
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_error_event ev_error
JDAQStateMachine::ev_configure_event ev_configure
Detector data structure.
Definition JDetector.hh:96
Router for direct addressing of module data in detector data structure.
Data structure for a composite optical module.
Definition JModule.hh:75
Utility class to parse parameter values.
bool read(const JEquation &equation)
Read equation.
Time keeper.
Auxiliary class for CPU timing and usage.
Definition JTimer.hh:33
unsigned long long usec_ucpu
Definition JTimer.hh:239
unsigned long long usec_wall
Definition JTimer.hh:238
void stop()
Stop timer.
Definition JTimer.hh:127
void reset()
Reset timer.
Definition JTimer.hh:93
unsigned long long usec_scpu
Definition JTimer.hh:240
void start()
Start timer.
Definition JTimer.hh:106
const JPosition3D & getPosition() const
Get position.
Byte array binary input.
const char * data() const
Get data.
int size() const
Get size.
int getFileDescriptor() const
Get file descriptor.
virtual bool is_open() const =0
Check is device is open.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Auxiliary class for method select.
void set(const int file_descriptor)
Set file descriptor.
bool has(const int file_descriptor) const
Has file descriptor.
Exception for opening of file.
The template JSinglePointer class can be used to hold a pointer to an object.
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
Message logger with time scheduler.
Message logging based on std::ostream.
Implemenation of object output through ControlHost.
TCP server socket.
Socket input channel.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition JSocket.hh:148
int getReceiveBufferSize() const
Set receive buffer size.
Definition JSocket.hh:159
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition JSocket.hh:104
TCP socket.
Definition JTCPSocket.hh:30
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition JTCPSocket.hh:56
ControlHost tag.
Definition JTag.hh:38
std::string toString() const
Convert tag to string.
Definition JTag.hh:171
Utility class to parse command line options.
Definition JParser.hh:1698
TFile * getFile() const
Get file.
Definition JRootFile.hh:66
virtual bool is_open() const override
Check is file is open.
Definition JRootFile.hh:77
JTreeWriter< T, JRootCreateFlatTree< T >::value > * out
virtual bool put(const T &object) override
Object output.
ROOT TTree object output.
1-dimensional frame with time calibrated data from one optical module.
2-dimensional frame with time calibrated data from one optical module.
static JSuperFrame2D< JElement_t, JAllocator_t > demultiplex
Demultiplexer.
std::vector< value_type >::iterator iterator
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Time slice with calibrated data.
Definition JTimeslice.hh:29
Data structure for input to trigger algorithm.
Nano-beacon trigger.
Definition JTriggerNB.hh:21
Auxiliary class to build KM3NETDAQ::JDAQEvent for a triggered event.
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
int getRunNumber() const
Get run number.
int getFrameIndex() const
Get frame index.
Control unit client base class.
JSharedPointer< JControlHost > server
message server
bool isRunning() const
Check if this client is in runnig state.
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
void addSubscription(const JSubscription &subscription)
Add custom subscription.
int getModuleID() const
Get module identifier.
Auxiliary class for itemization of process list.
std::string index
index in process list
Data frame of one optical module.
static void reset()
Reset counter of unique instance of this class object.
Main class for real-time filtering of data.
long long int number_of_packets_received
JMessageScheduler logErrorRun
int dumpLimit
maximum number of time slices to be dumped
JSinglePointer< JServerSocket > serversocket
server for data queue connections
JMessageScheduler logErrorDetector
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
JCircularBuffer_t c_buffer
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
long long int numberOfBytes
long long int number_of_packets_discarded
JMessageScheduler logErrorIndex
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
virtual void actionInit(int length, const char *buffer) override
void put(const T &object)
Auxiliary method to send object to data server.
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
long long int number_of_bytes_received
virtual void actionContinue(int length, const char *buffer) override
long long int maxQueueSize
int port
server socket port
long long int totalCPURAM
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
std::vector< JDAQProcess > dataQueues
JMessageScheduler logErrorOvercomplete
long long int number_of_reads
std::vector< JSocketInputChannel_t > JChannelList_t
virtual void actionStop(int length, const char *buffer) override
JSinglePointer< JBuildL2_t > buildNB
JSinglePointer< JBuildL2_t > buildL2
JBuildL1< hit_type > JBuildL1_t
JSinglePointer< JBuildL1_t > buildL1
virtual void actionExit() override
virtual void actionQuit(int length, const char *buffer) override
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
std::string hostname
host name of data server
virtual void actionConfigure(int length, const char *buffer) override
long long int numberOfTimeslicesProcessed
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JChannelList_t channelList
connections to data queue
long long int numberOfIncompleteTimeslicesProcessed
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
JSuperFrame1D< hit_type > JSuperFrame1D_t
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
JMessageScheduler logErrorIncomplete
JSuperFrame2D< hit_type > JSuperFrame2D_t
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
JTimeslice< hit_type > JTimeslice_t
JStatus dumpMask
mask for data to be dumped
JSinglePointer< JModuleRouter > moduleRouter
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JSinglePointer< JTimesliceRouter > timesliceRouter
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
JSinglePointer< JTriggerNB > triggerNB
JBuildL2< hit_type > JBuildL2_t
long long int numberOfEvents
std::vector< JDAQProcess > dataFilters
JSinglePointer< JTrigger3DShower > trigger3DShower
virtual void actionStart(int length, const char *buffer) override
virtual void actionPause(int length, const char *buffer) override
JSinglePointer< JTriggerMXShower > triggerMXShower
virtual void actionReset(int length, const char *buffer) override
std::set< int > modules
JSinglePointer< JBuildL2_t > buildSN
JTriggerParameters parameters
void typeout()
Report status to message logger.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
Definition JVectorize.hh:54
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
std::string trim(const std::string &buffer)
Trim string.
static const long long int GIGABYTE
Number of bytes in a mega-byte.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const char * getName()
Get ROOT name of given data type.
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
long long int localtime_t
Type definition of local time.
unsigned long long int getRAM()
Get RAM of this CPU.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition JNetwork.hh:216
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
static JStat getFileStatus
Function object for file status.
Definition JStat.hh:173
return result
Definition JPolint.hh:862
const int n
Definition JPolint.hh:791
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition JChecksum.hh:200
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
static const JNET::JTag RC_DFILTER
Definition JDAQTags.hh:73
double getFrameTime()
Get frame time duration.
Definition JDAQClock.hh:162
static const JNET::JTag RC_ALERT
Definition JDAQTags.hh:79
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
Definition JDAQTags.hh:36
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
Definition JDAQTags.hh:37
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
Definition JDAQTags.hh:38
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition JDAQTags.hh:90
size_t getSizeof()
Definition of method to get size of data type.
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition pmt_status.hh:13
Auxiliary data structure for sequence of same character.
Definition JManip.hh:330
Auxiliary data structure for floating point format specification.
Definition JManip.hh:448
Detector file.
Definition JHead.hh:227
Match of two events considering overlap in time and position.
Auxiliary class for handling status.
Definition JStatus.hh:39
void set(const int bit)
Set PMT status.
Definition JStatus.hh:131
Type list.
Definition JTypeList.hh:23
Level specific message streamers.
Auxiliary class for all subscription.
Auxiliary class for ROOT I/O of application specific meta data.
Definition JMeta.hh:72
Auxiliary class for date and time.
int getYear() const
year a.d.
int getDay() const
day of the month [1-31]
int getMonth() const
month of the year [1-12]
Auxiliary data structure for running average and standard deviation.
Definition JStats.hh:44
double getXmax() const
Get maximum value.
Definition JStats.hh:201
void put(const double x, const double w=1.0)
Put value.
Definition JStats.hh:119
double getMean() const
Get mean value.
Definition JStats.hh:234
void reset()
Reset.
Definition JStats.hh:79
Auxiliary data structure for result of checksum.
Definition JChecksum.hh:90
@ ETDC_t
TDC value error.
Definition JChecksum.hh:47
@ SIZE_t
size error
Definition JChecksum.hh:50
@ EPMT_t
PMT number error.
Definition JChecksum.hh:46
@ TIME_t
Time order error.
Definition JChecksum.hh:48
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
const JTag & getUniqueTag() const
Get unique tag of this run control client.
const std::string & getFullName() const
Get full name of this run control client.
void setClockInterval(const long long int interval_us)
Set interval time.
long long int getClockInterval() const
Get interval time.
Timeslice data structure for L0 data.
std::string archive
Directory for permanent archival.
std::string path
Directory for temporary storage.
Long64_t sizeL1
Number of L1 time slices.
Long64_t sizeSN
Number of SN time slices.
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
JTag tag
Unique tag of this process.
Long64_t sizeL0
Number of L0 time slices.
void close(const bool option)
Close file.
Long64_t sizeL2
Number of L2 time slices.
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
bool is_enabled() const
Check whether writing of data is enabled.