Jpp 20.0.0-195-g190c9e876
the software that should make you happy
Loading...
Searching...
No Matches
JDataFilter.cc
Go to the documentation of this file.
1#include <string>
2#include <iostream>
3#include <iomanip>
4#include <deque>
5#include <limits>
6#include <algorithm>
7#include <unistd.h>
8#include <random>
9
11#include "JLang/JLangToolkit.hh"
12#include "JLang/JException.hh"
13#include "JLang/JVectorize.hh"
14#include "JLang/JStatus.hh"
15#include "Jeep/JParser.hh"
16#include "Jeep/JProperties.hh"
17#include "Jeep/JTimer.hh"
18#include "Jeep/JTimekeeper.hh"
19#include "Jeep/JMessage.hh"
20#include "Jeep/JPrint.hh"
21#include "Jeep/JeepToolkit.hh"
28#include "JDAQ/JDAQTags.hh"
29#include "JDAQ/JDAQEventIO.hh"
33#include "JTrigger/JHit.hh"
38#include "JTrigger/JHitL0.hh"
39#include "JTrigger/JHitL1.hh"
40#include "JTrigger/JBuildL1.hh"
41#include "JTrigger/JBuildL2.hh"
53#include "JTrigger/JChecksum.hh"
56#include "JNet/JControlHost.hh"
58#include "JNet/JTCPSocket.hh"
60#include "JNet/JServerSocket.hh"
62#include "JTools/JStats.hh"
63#include "JSupport/JSupport.hh"
65#include "JSupport/JMeta.hh"
66#include "JSystem/JStat.hh"
67#include "JSystem/JTime.hh"
69#include "JSystem/JNetwork.hh"
71
72
73namespace JNET {
74
75 /**
76 * Get size of packeet.
77 *
78 * \param preamble DAQ data preamble
79 * \return size [B]
80 */
81 template<>
83 {
84 return preamble.getLength();
85 }
86}
87
88namespace KM3NETDAQ {
89
90 using namespace JPP;
91
92
93 /**
94 * Get expected number of frames according a given allowed fraction of active modules.
95 *
96 * \param number_of_frames number of frames
97 * \param factor factor
98 * \return number of frames
99 */
100 inline size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
101 {
102 if (factor <= 0.0) {
103 return 1;
104 }
105
106 if (factor >= 1.0) {
107 return number_of_frames;
108 }
109
110 const size_t n = (size_t) (number_of_frames * factor);
111
112 return (n == 0 ? 1 : n);
113 }
114
115
116 /**
117 * Main class for real-time filtering of data.
118 *
119 * This class implements the action methods for each transition of the state machine.\n
120 * When the state machine is entered, data are continually collected using
121 * custom implementation of virtual methods
122 * JDataFilter::setSelect and
123 * JDataFilter::actionSelect.
124 *
125 * In state <tt>Running</tt>, all incoming data from the DataQueue.cc are buffered, referred to as "queue".\n
126 * These data are formatted as a JDAQSuperFrame and aggregated into a JDAQTimeslice.\n
127 * When a JDAQTimeslice is complete,
128 * it processed in the same way as applications JTriggerProcessor.cc and JTriggerEfficiency.cc.
129 *
130 * The completeness of a time slice is defined by two parameters, namely
131 * <tt>frames_per_slice</tt> and
132 * <tt>factor_per_slice</tt>.\n
133 * These are parsed in method actionConfigure as follows.
134 * <pre>
135 * numberOfFramesPerSlice = <frames per slice> = 1;
136 * factorOfFramesPerSlice = <factor per slice> = 1.0;
137 * </pre>
138 * The expected number of frames in a time slices is defined by their product.\n
139 * Note that the first parameter can change during operation (see below).
140 *
141 * A timeout may occur when the total amount of data or the number of incomplete time slices
142 * in the queue exceeds one of the limits <tt>queueSize</tt> or <tt>queueDepth</tt>.\n
143 * These are parsed in method actionConfigure as follows.
144 * <pre>
145 * queueSize = <maximal queue size [B]>;
146 * queueDepth = <maximal queue depth>;
147 * </pre>
148 * Note that these values apply per JDataFilter.\n
149 *
150 * The parameter <tt>frames_per_slice</tt> is subject to a servo mechanism.\n
151 * When a timeout occurs,
152 * it is set to the number of frames in the oldest time slice.\n
153 * When data are received with a frame index below the current frame index,
154 * it is incremented by one.
155 *
156 * In addition, a circular buffer based on a ROOT TTree for each JDAQTimeslice type can be maintained.\n
157 * The following parameters can be used to configure the circular buffer.\n
158 * <pre>
159 * path = <write directory for temporary circular buffer>;
160 * archive = <write directory for archival of circular buffer>;
161 * c_sizeL0 = <L0 buffer size>;
162 * c_sizeL1 = <L1 buffer size>;
163 * c_sizeL2 = <L2 buffer size>;
164 * c_sizeSN = <SN buffer size>;
165 * </pre>
166 *
167 * Note that when one (or more) of the buffer sizes is non-zero and the path corresponds to a valid directory:
168 * - There will always be a file created which is deleted when the state machine is exited;
169 * - The temporary file name is <tt><path>/KM3NeT_<tag>.root</tt>;
170 * - Following an alert, the temporary file is archived to a file with name <tt><path>/KM3NeT_YYYY-MM-DD_<tag>[_<index>].root</tt>;
171 * - After archival of the temporary file, a new temporary file will be opened;
172 * - L0, L1, L2 and SN buffer sizes are per JDataFilter;
173 * - The tag KM3NETDAQ::RC_ALERT defined in JDAQTags.hh has tentatively been reserved to trigger the archival of the temporary file following an external alert;
174 *
175 * In this, <tt>YYYY</tt>, <tt>MM</tt> and <tt>DD</tt> correspond to the year, month and day at the time of the alert, respectively.\n
176 * The <tt><tag></tt> corresponds to the unique JNET::JTag of the JDataFilter process.\n
177 * The <tt><index></tt> is optional and will only be used to prevent overwriting of an existing file
178 * (e.g. when there is more than one alert on a given day).
179 *
180 * Data can be discarded for various reasons.\n
181 * The list of criteria includes validity ranges of the PMT number and TDC values as well as the sorted times per PMT.\n
182 * The discarded data can be written to disk according to the parameters:
183 * <pre>
184 * dumpLimit = <maximum number of time slices to be dumped>
185 * dumpMask = <mask for data to be dumped>
186 * </pre>
187 * The first limits the number of time slices with discarded data that will be dumped and \n
188 * the latter corresponds to bits according to the enumeration JChecksum::error_types.\n
189 * The discarded data will be stored as JDAQTimeslice.
190 *
191 * Note that the application JConvert.cc can be used to readily convert multiple input files to a single output file.
192 *
193 * The script <tt>$JPP_DIR/tests/JDataFilter/JDataFilter.sh</tt> can be used to test this application.
194 */
196 public JDAQClient
197 {
198 public:
199
202
203 typedef double hit_type;
206 typedef JTimeslice <hit_type> JTimeslice_t;
207 typedef JBuildL1 <hit_type> JBuildL1_t;
208 typedef JBuildL2 <hit_type> JBuildL2_t;
209
210
211
212 /**
213 * Circular buffer.
214 */
216 public JTreeRecorder<JDAQTimesliceTypes_t>
217 {
220
221
222 static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
223
224
225 /**
226 * Constructor.
227 *
228 * \param path directory for temporary storage
229 * \param archive directory for permanent archival
230 * \param tag tag
231 */
232 JCircularBuffer_t(const std::string& path,
233 const std::string& archive,
234 const JTag& tag) :
235 path (path),
237 tag (tag)
238 {
239 disable();
240 }
241
242
243 /**
244 * Open file.
245 *
246 * If file with same name exists, remove it beforehand.
247 */
248 void open()
249 {
250 using namespace std;
251 using namespace JPP;
252
253 gErrorIgnoreLevel = kFatal;
254
255 std::ostringstream os;
256
257 os << getFullPath(path) << "KM3NeT" << "_" << tag << ".root";
258
259 if (getFileStatus(os.str().c_str())) {
260 std::remove(os.str().c_str());
261 }
262
263 this->open(os.str().c_str());
264 }
265
266
267 /**
268 * Close file.
269 *
270 * If option is true, archive file; else delete file.
271 *
272 * \param option option
273 */
274 void close(const bool option)
275 {
276 using namespace std;
277 using namespace JPP;
278
279 const JDateAndTime cal;
280
281 if (this->is_open()) {
282
283 const string file_name = this->getFile()->GetName();
284
285 this->close();
286
287 if (option) {
288
289 for (int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
290
291 ostringstream os;
292
293 os << getFullPath(this->archive)
294 << "KM3NeT"
295 << "_" << cal.getYear() << '-' << FILL(2,'0') << cal.getMonth() << '-' << FILL(2,'0') << cal.getDay()
296 << "_" << this->tag;
297
298 if (i != 0) {
299 os << "_" << i;
300 }
301
302 os << ".root";
303
304 if (!getFileStatus(os.str().c_str())) {
305
306 if (JSYSTEM::rename(file_name, os.str()) == 0)
307 return;
308 else
309 THROW(JFileOpenException, "Failure in renaming file " << file_name << " to " << os.str());
310 }
311 }
312
313 } else {
314
315 std::remove(file_name.c_str());
316 }
317 }
318 }
319
320
321 /**
322 * Disable writing.
323 */
324 void disable()
325 {
326 sizeL0 = 0;
327 sizeL1 = 0;
328 sizeL2 = 0;
329 sizeSN = 0;
330 }
331
332
333 /**
334 * Check whether writing of data is enabled.
335 *
336 * \return true if writing enabled; else false
337 */
338 bool is_enabled() const
339 {
340 return (sizeL0 > 0 ||
341 sizeL1 > 0 ||
342 sizeL2 > 0 ||
343 sizeSN > 0);
344 }
345
346
347 /**
348 * Write circular buffer to output stream.
349 *
350 * \param out output stream
351 * \param object circular buffer
352 * \return output stream
353 */
354 friend inline std::ostream& operator<<(std::ostream& out, const JCircularBuffer_t& object)
355 {
356 if (object.is_open())
357 out << object.getFile()->GetName();
358 else
359 out << "void";
360
361 out << ' ';
362
363 out << object.sizeL0 << '/'
364 << object.sizeL1 << '/'
365 << object.sizeL2 << '/'
366 << object.sizeSN << '/';
367
368 return out;
369 }
370
371 Long64_t sizeL0; //!< Number of L0 time slices
372 Long64_t sizeL1; //!< Number of L1 time slices
373 Long64_t sizeL2; //!< Number of L2 time slices
374 Long64_t sizeSN; //!< Number of SN time slices
375
376 std::string path; //!< Directory for temporary storage
377 std::string archive; //!< Directory for permanent archival
378 JTag tag; //!< Unique tag of this process
379 };
380
381
382 /**
383 * Sort DAQ process by index.
384 *
385 * \param first first DAQ process
386 * \param second second DAQ process
387 * \return true if index of first DAQ process less than that of second; else false
388 */
389 static inline bool compare(const JDAQProcess& first, const JDAQProcess& second)
390 {
391 return first.index < second.index;
392 }
393
394
395 /**
396 * Constructor.
397 *
398 * \param name name of client
399 * \param server name of command message server
400 * \param hostname name of data server
401 * \param logger pointer to logger
402 * \param level debug level
403 * \param port server port
404 * \param backlog server backlog
405 * \param path directory for temporary storage
406 * \param archive directory for parmanent archival
407 */
408 JDataFilter(const std::string& name,
409 const std::string& server,
410 const std::string& hostname,
412 const int level,
413 const int port,
414 const int backlog,
415 const std::string& path,
416 const std::string& archive) :
419 port (port),
421 c_buffer (path, archive, getUniqueTag())
422 {
423 replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
424
426
427 totalCPURAM = getRAM();
429 dataqueue_slice_index.clear();
430 reporting = false;
431
432 this->server->setReceiveBufferSize(DFILTER_RECEIVE_BUFFER_SIZE);
433 }
434
435
436 virtual void actionEnter() override
437 {}
438
439
440 virtual void actionExit() override
441 {
442 if (c_buffer.is_open()) {
443
444 JStatusStream(logger) << "Close and remove circular buffer " << c_buffer;
445
446 c_buffer.close(false);
447 }
448
449 datawriter.reset();
450 }
451
452
453 virtual void actionInit(int length, const char* buffer) override
454 {
455 JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
456
457 try {
458
459 JDebugStream(logger) << "Start server.";
460
461 if (serversocket.is_valid()) {
462 serversocket->shutdown();
463 }
464
466 }
467 catch(const std::exception& error) {
468 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
469 ev_error();
470 }
471 }
472
473
474 virtual void actionConfigure(int length, const char* buffer) override
475 {
476 using namespace std;
477
478 JDebugStream(logger) << "actionConfigure() " << endl << std::string(buffer,length);
479
480 string _hostname_ = "";
481
482 long long int update_s = 20;
483 long long int logger_s = 10;
484
485 parameters .reset();
486 dataFilters.clear();
487 dataQueues .clear();
488
489 reporting = false;
490 dumpCount = 0;
491 dumpLimit = numeric_limits<int>::max();
492 dumpMask = 0;
496 //dumpMask.set(JChecksum::EUDP_t);
498
499 detector.comment.clear();
500 detector.clear();
501
502 JProperties properties(JEquationParameters("=", ";", "", ""), 0);
503
504 properties["dataWriter"] = _hostname_;
505 properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
506 properties["factorOfFramesPerSlice"] = factor_per_slice = 1.0;
507 properties["detector"] = detector;
508 properties["triggerParameters"] = parameters;
509 properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
510 properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
511 properties["frameIndex"] = maximal_frame_index = 100000;
512 properties["logger_s"] = logger_s;
513 properties["update_s"] = update_s;
514 properties["JDataFilter"] = dataFilters;
515 properties["DataQueue"] = dataQueues;
516 properties["path"] = c_buffer.path;
517 properties["archive"] = c_buffer.archive;
518 properties["c_sizeL0"] = c_buffer.sizeL0;
519 properties["c_sizeL1"] = c_buffer.sizeL1;
520 properties["c_sizeL2"] = c_buffer.sizeL2;
521 properties["c_sizeSN"] = c_buffer.sizeSN;
522 properties["dumpLimit"] = dumpLimit;
523 properties["dumpMask"] = dumpMask;
524
525 try {
526 properties.read(string(buffer, length));
527 }
528 catch(const std::exception& error) {
529 JErrorStream(logger) << error.what();
530 }
531
532 if (update_s <= 0) { update_s = 20; }
533 if (logger_s <= 0) { logger_s = 10; }
534
535 setClockInterval(update_s * 1000000LL);
536
537 _hostname_ = trim(_hostname_);
538
539 if (_hostname_ != "" && _hostname_ != hostname) {
540
541 datawriter.reset();
542
543 hostname = _hostname_;
544 }
545
546 bool status = datawriter.is_valid();
547
548 if (status) {
549
550 try {
551 status = datawriter->Connected() == 0;
552 }
553 catch (const exception&) {
554 status = false;
555 }
556 }
557
558 if (!status) {
559
561
562 datawriter->MyId(getFullName());
563 }
564
565 datawriter->setSendBufferSize(DFILTER_SEND_BUFFER_SIZE);
566
567 // process processlist
568
569 if (dataFilters.empty()) {
570 JNoticeStream(logger) << "No DataFilters in process list, or no process list. "
571 << "Assuming that this process is the only process on this CPU and setting parameters accordingly.";
572 }
573
574 sort(dataFilters.begin(), dataFilters.end(), compare);
575
576 unsigned int numberOfDataFiltersOnThisMachine = 0;
577 vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
578
580
581 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
582
583 if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
584
585 numberOfDataFiltersOnThisMachine++;
586
587 if (i->port == this->port) {
588 thisProcess = i;
589 }
590 }
591 }
592
593 if (numberOfDataFiltersOnThisMachine == 0) {
594 JNoticeStream(logger) << "Zero data filters on this machine according to process list (if it exists). "
595 << "Assuming one datafilter on this machine.";
596 numberOfDataFiltersOnThisMachine = 1;
597 }
598
599 if (thisProcess == dataFilters.end()) {
600
601 JErrorStream error(logger);
602
603 error << "This process cannot be found in the process list. Why do I exist?";
604 error << " my IP addresses:";
605
606 for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
607 error << ' ' << *i;
608 }
609
610 error << " my port: " << this->port;
611 error << " process list";
612
613 for (vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
614 error << ' ' << i->hostname << ':' << i->port;
615 }
616 }
617
618 if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
619 JErrorStream(logger) << "Mismatch between given process names: "
620 << "I am called " << getName()
621 << ", but in the process list I am referred to as " << thisProcess->index;
622 }
623
624 if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
625 reporting = true;
626 }
627
628 if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
629
630 maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
631
632 JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. "
633 << "Queue size reduced to "
634 << maxQueueSize << " bytes." ;
635 }
636
637 // detector
638
639 if (parameters.disableHighRateVeto) {
640
641 JNoticeStream(logger) << "Disabling high-rate veto of all PMTs.";
642
643 detector.setPMTStatus(HIGH_RATE_VETO_DISABLE);
644 }
645
646 // trigger parameters
647
649
650 triggerNB .reset(new JTriggerNB (parameters));
654
656
657 if (reporting) {
658 JNoticeStream(logger) << "This data filter process will report.";
659 JNoticeStream(logger) << "Detector version/size " << detector.getVersion() << '/' << detector.size();
660 JDebugStream (logger) << "Trigger parameters: " << parameters;
661 JDebugStream (logger) << "Detector description: " << endl << detector;
662 JNoticeStream(logger) << "Update period [s]: " << getClockInterval();
663 }
664
665 timesliceRouter.reset(new JTimesliceRouter(parameters.numberOfBins));
666
667 // set L1, L2 and SN builders
668
669 buildL1.reset(new JBuildL1_t(parameters));
670 buildL2.reset(new JBuildL2_t(parameters.L2));
671 buildSN.reset(new JBuildL2_t(parameters.SN));
672 buildNB.reset(new JBuildL2_t(parameters.NB));
673
674 if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
675 if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
676 if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
677 if (buildNB.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
678
679 logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
680 logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
681 logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
682 logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
684
685 if (c_buffer.is_enabled()) {
686
687 if (!c_buffer.is_open()) {
688
689 c_buffer.open();
690
691 if (c_buffer.is_open()) {
692
694
695 JStatusStream(logger) << "Created circular buffer " << c_buffer;
696
697 } else {
698
699 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
700 }
701
702 } else {
703
704 JNoticeStream(logger) << "Continue using circular buffer " << c_buffer;
705 }
706 }
707
708 if (c_buffer.is_open()) {
709 if (c_buffer.sizeL0 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL0>&>(c_buffer).SetCircular(c_buffer.sizeL0); }
710 if (c_buffer.sizeL1 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL1>&>(c_buffer).SetCircular(c_buffer.sizeL1); }
711 if (c_buffer.sizeL2 > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceL2>&>(c_buffer).SetCircular(c_buffer.sizeL2); }
712 if (c_buffer.sizeSN > 0) { static_cast<JTreeWriterObjectOutput<JDAQTimesliceSN>&>(c_buffer).SetCircular(c_buffer.sizeSN); }
713 } else {
715 }
716 }
717
718
719 virtual void actionStart(int length, const char* buffer) override
720 {
721 using namespace std;
722
723 if (reporting) {
724 JNoticeStream(logger) << "Start run " << getRunNumber();
725 }
726
727 timeslices.clear();
728
730 dataqueue_slice_index.clear();
731 queueSize = 0;
732
733 numberOfEvents = 0;
734 numberOfBytes = 0;
737
741 number_of_reads = 0;
742
743 minFrameNumber = numeric_limits<int>::max();
744 maxFrameNumber = numeric_limits<int>::min();
745
746 // Reset global trigger counter.
747
749
755
756 timer.reset();
757 timer.start();
758
759 Qt.reset();
760 Qx.reset();
761
762 // send trigger parameters to the datawriter
763
764 ostringstream os;
765
766 os << getRunNumber() << ' ' << parameters;
767
768 datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
769 }
770
771
772 virtual void actionPause(int length, const char* buffer) override
773 {
774 using namespace std;
775
776 if (!timeslices.empty()) {
777
778 JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
779
780 for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
781 queueSize -= getSizeof(*i);
782 }
783
784 timeslices.clear();
785 }
786
787 { // force clearance of memory
788
789 deque<JDAQTimesliceL0> buffer;
790
791 timeslices.swap(buffer);
792 }
793
794 if (queueSize != 0) {
795 JWarningStream(logger) << "Pending data in queue " << queueSize << " [B]";
796 }
797
799 dataqueue_slice_index.clear();
800 queueSize = 0;
801
802 timer.stop();
803 }
804
805
806 virtual void actionContinue(int length, const char* buffer) override
807 {
808 timer.start();
809 }
810
811
812 virtual void actionStop(int length, const char* buffer) override
813 {
814 typeout();
815 }
816
817
818 virtual void actionReset(int length, const char* buffer) override
819 {
820 if (serversocket.is_valid()) {
821 serversocket->shutdown();
822 }
823
824 serversocket.reset();
825 }
826
827
828 virtual void actionQuit(int length, const char* buffer) override
829 {
830 datawriter.reset();
831 }
832
833
834 virtual void setSelect(JFileDescriptorMask& mask) const override
835 {
836 if (serversocket.is_valid()) {
837 mask.set(*serversocket);
838 }
839
840 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
841 if (!channel->isReady()) {
842 mask.set(channel->getFileDescriptor());
843 }
844 }
845 }
846
847
848 virtual void actionSelect(const JFileDescriptorMask& mask) override
849 {
850 using namespace std;
851
852 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
853
854 try {
855
856 if (mask.has(channel->getFileDescriptor())) {
857 channel->read();
858 }
859
860 if (channel->isReady()) {
861
863 number_of_reads += channel->getCounter();
864 number_of_bytes_received += channel->size();
865
866 if (isRunning()) {
867
868 try {
869 updateFrameQueue(*channel);
870 }
871 catch(const std::exception& error) {
872
873 JErrorStream(logErrorRun) << "Update frame queue " << channel->getFileDescriptor() << ' ' << channel->size() << ' ' << error.what();
874
876 }
877
878 } else {
879
880 JErrorStream(logErrorRun) << "Receiving data while not running.";
881
883 }
884
885 channel->reset();
886 }
887
888 ++channel;
889 }
890 catch(const std::exception& error) {
891
892 if (isRunning()) {
893 JErrorStream(logger) << "Disconnect channel " << channel->getFileDescriptor() << ' ' << error.what();
894 }
895
896 channel->shutdown();
897
898 channel = channelList.erase(channel);
899 }
900 }
901
902
903 if (serversocket.is_valid()) {
904
905 if (mask.has(*serversocket)) {
906
907 JTCPSocket socket(serversocket->getFileDescriptor());
908
910
911 socket.setKeepAlive (true);
912 socket.setNonBlocking(false);
913
914 JStatusStream(logger) << "New channel" << '[' << socket.getFileDescriptor() << ']' << ' ' << socket.getReceiveBufferSize();
915
916 channelList.push_back(JSocketInputChannel_t(socket));
917 }
918 }
919
920
921 if (!timeslices.empty()) {
922
923 const size_t number_of_frames = getNumberOfFrames(frames_per_slice, factor_per_slice);
924 const size_t maximum_in_queue = getMaximum(make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (size_t) 0);
925
926 if (((timeslices[0].size() >= number_of_frames && // normal
927 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
928
929 (maximum_in_queue >= number_of_frames && // intermittent problem
930 timeslices[0].getFrameIndex() < dataqueue_slice_index.min()) ||
931
932 (timeslices.size() >= maxQueueDepth) || // timeout
933 (queueSize >= maxQueueSize))) {
934
935
936 const JDAQTimesliceL0& pending_slice = timeslices.front();
937 queueSize -= getSizeof(pending_slice);
938
939 current_slice_index = pending_slice.getFrameIndex();
940 minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
941 maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
942
943 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
944 modules.insert(i->getModuleID());
945 }
946
947
948 if (isRunning()) {
949
950 const localtime_t t0 = getLocalTime();
951
952 if (!pending_slice.empty()) {
953 processTimeSlice(pending_slice);
954 }
955
956 const localtime_t t1 = getLocalTime();
957
959
960 Qt.put(t1 - t0);
961
962 } else {
963
964 JErrorStream(logErrorRun) << "Skip processing of data while not running.";
965 }
966
967
968 if (modules.size() > frames_per_slice) {
969
970 JErrorStream(logErrorOvercomplete) << "More active modules than expected "
971 << modules.size() << " > " << frames_per_slice
972 << " adjusting frames per slice to " << modules.size();
973
974 frames_per_slice = modules.size();
975 }
976
977
978 if (pending_slice.size() < number_of_frames) {
979
981
982 ostringstream error;
983
984 error << "Timeout -> processed incomplete timeslice: "
985 << "Frame index = " << pending_slice.getFrameIndex() << ';'
986 << "Size of timeslice = " << pending_slice.size() << ';'
987 << "Queue depth = " << timeslices.size() << ';'
988 << "Queue size = " << queueSize << ';'
989 << "DataQueue min = " << dataqueue_slice_index.min() << ';'
990 << "DataQueue max = " << dataqueue_slice_index.max() << ';';
991
992 if (maximum_in_queue >= number_of_frames) {
993
994 error << " intermittent problem -> continues as-is";
995
996 } else {
997
998 modules.clear(); // remove history
999
1000 for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
1001 modules.insert(i->getModuleID());
1002 }
1003
1004 error << " adjusting frames per timeslice from " << frames_per_slice << " to " << modules.size();
1005
1006 frames_per_slice = modules.size();
1007 }
1008
1009 JErrorStream(logErrorIncomplete) << error.str();
1010 }
1011
1012
1013 timeslices.pop_front();
1014 }
1015 }
1016 }
1017
1018
1019 virtual void actionRunning() override
1020 {
1021 if (reporting) {
1022 typeout();
1023 }
1024 }
1025
1026
1027 /**
1028 * Update queue with data frames.
1029 *
1030 * Note that any discarded data will be reported.
1031 *
1032 * \param channel incoming data channel
1033 */
1035 {
1036 using namespace std;
1037
1038 JByteArrayReader in(channel.data(), channel.size());
1039
1040 JDAQPreamble preamble;
1041 JDAQSuperFrameHeader header;
1042
1043 in >> preamble;
1044 in >> header;
1045
1046 if (preamble.getLength() != channel.size()) {
1047
1048 JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble: "
1049 << "preamble.getLength() = " << preamble.getLength() << ';'
1050 << "channel.size(): " << channel.size() << ';';
1051
1053
1054 return;
1055 }
1056
1057 if (header.getRunNumber() != getRunNumber()) {
1058
1059 JErrorStream(logErrorRun) << "Run number " << header.getRunNumber()
1060 << " != " << getRunNumber()
1061 << " -> Dropping frame.";
1062
1064
1065 return;
1066 }
1067
1068 if (header.getFrameIndex() <= current_slice_index) {
1069
1071
1072 if (modules.insert(header.getModuleID()).second) {
1073
1074 frames_per_slice = modules.size();
1075
1076 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " <= " << current_slice_index
1077 << " module " << header.getModuleID()
1078 << " -> dropping frame;"
1079 << " increase number of frames expected to: " << frames_per_slice;
1080 }
1081
1082 return;
1083 }
1084
1086
1088
1089 JErrorStream(logErrorIndex) << "Frame index " << header.getFrameIndex() << " > " << current_slice_index + maximal_frame_index
1090 << " module " << header.getModuleID()
1091 << " -> Dropping frame.";
1092
1093 return;
1094 }
1095
1096 deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1097
1098 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1099 ++timesliceIterator;
1100 }
1101
1102 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1103
1104 // The corresponding time slice already exists
1105
1106 } else {
1107
1108 // This is the first frame of this time slice; insert a new time slice in the buffer at the right position in the list
1109
1110 timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
1111
1112 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1113
1114 queueSize += getSizeof(*timesliceIterator);
1115 }
1116
1117 timesliceIterator->push_back(JDAQSuperFrame(header));
1118
1119 in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
1120
1121 queueSize += getSizeof(*timesliceIterator->rbegin());
1122
1124 }
1125
1126
1127 /**
1128 * Process time slice.
1129 *
1130 * \param timeslice time slice
1131 */
1132 void processTimeSlice(const JDAQTimesliceL0& timeslice)
1133 {
1134 using namespace std;
1135
1136 try {
1137
1138 timesliceRouter->configure(timeslice);
1139
1140 if (parameters.writeSummary()) {
1141 this->put(JDAQSummaryslice(timeslice));
1142 }
1143
1144 if (parameters.trigger3DMuon.enabled ||
1145 parameters.trigger3DShower.enabled ||
1146 parameters.triggerMXShower.enabled ||
1147 parameters.triggerNB.enabled ||
1148 parameters.triggerRandom.enabled ||
1149 parameters.writeL0.prescale ||
1150 parameters.writeL1.prescale ||
1151 parameters.writeL2.prescale ||
1152 parameters.writeSN.prescale ||
1153 c_buffer.is_enabled()) {
1154
1155 JTimeslice_t timesliceL0(timeslice.getDAQChronometer());
1156 JTimeslice_t timesliceL1(timeslice.getDAQChronometer());
1157 JTimeslice_t timesliceL2(timeslice.getDAQChronometer());
1158 JTimeslice_t timesliceSN(timeslice.getDAQChronometer());
1159 JTimeslice_t timesliceNB(timeslice.getDAQChronometer());
1160 JDAQTimeslice timesliceTX(timeslice.getDAQChronometer());
1161
1162 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1163
1164 if (moduleRouter->hasModule(frame->getModuleID())) {
1165
1166 const JChecksum::result_type& result = checksum(*frame);
1167
1168 if (!result) {
1169
1170 JWarningStream(logger) << "Invalid data at "
1171 << "run = " << timeslice.getRunNumber() << ";"
1172 << "frame index = " << timeslice.getFrameIndex() << ";"
1173 << "module = " << frame->getModuleID() << ";"
1174 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1175
1176 if (dumpCount < dumpLimit && result.has(dumpMask)) {
1177 timesliceTX.push_back(*frame);
1178 }
1179
1180 continue;
1181 }
1182
1183 const JModule& module = moduleRouter->getModule(frame->getModuleID());
1184 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1185
1186 // Apply high-rate veto
1187
1188 buffer.applyHighRateVeto(parameters.highRateVeto_Hz);
1189
1190 // L0
1191
1192 timesliceL0.push_back(JSuperFrame1D_t(buffer));
1193
1194 // Nano-beacon trigger
1195
1196 if (parameters.triggerNB.enabled) {
1197
1198 JSuperFrame2D_t::iterator __end = partition(buffer.begin(), buffer.end(), parameters.triggerNB.pmts);
1199
1200 if (buffer.begin() != __end) {
1201
1202 timesliceNB.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1203 frame->getModuleIdentifier(),
1204 module.getPosition()));
1205
1206 JSuperFrame1D_t zbuf;
1207
1208 (*buildL1)(buffer.begin(), __end, back_inserter(zbuf));
1209
1210 (*buildNB)(buffer.begin(), __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1211 }
1212 }
1213
1214 // L1
1215
1216 timesliceL1.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1217 frame->getModuleIdentifier(),
1218 module.getPosition()));
1219
1220 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1221
1222 // L2
1223
1224 timesliceL2.push_back(JSuperFrame1D_t(frame->getDAQChronometer(),
1225 frame->getModuleIdentifier(),
1226 module.getPosition()));
1227
1228 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1229
1230 // SN
1231 {
1232 JTimeslice_t::value_type tv(frame->getDAQChronometer(),
1233 frame->getModuleIdentifier(),
1234 module.getPosition());
1235
1236 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(tv));
1237
1238 if (!tv.empty()) {
1239 timesliceSN.push_back(tv);
1240 }
1241 }
1242
1243 } else {
1244
1245 JErrorStream(logErrorDetector) << "No detector information for module " << frame->getModuleID();
1246 }
1247 }
1248
1249 if (!timesliceTX.empty()) {
1250
1251 if (dumpCount < dumpLimit) {
1252
1253 this->put(timesliceTX);
1254
1255 dumpCount += 1;
1256 }
1257 }
1258
1259 // Triggers
1260
1261 if (parameters.triggerNB.enabled) {
1262
1263 const JTriggerInput trigger_input(timesliceNB);
1264
1265 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1266
1267 if (parameters.triggerNB.write()) {
1268
1269 JTriggeredEvent tev(timesliceNB.getDAQChronometer(),
1270 getTriggerMask(triggerNB->getTriggerBit()),
1271 *hit,
1273 *moduleRouter,
1274 parameters.TMaxLocal_ns,
1275 parameters.triggerNB.DMax_m,
1276 getTimeRange(parameters.triggerNB));
1277
1278 this->put(tev);
1279 }
1280 }
1281 }
1282
1283 if (parameters.triggerRandom.enabled) {
1284
1285 if (parameters.triggerRandom.rate_Hz > 0.0) {
1286
1287 exponential_distribution<double> f1(parameters.triggerRandom.rate_Hz * 1.0e-9);
1288
1289 while (rabbit.t1_ns < getTimeOfFrame(timeslice.getFrameIndex())) {
1290 rabbit.t1_ns += f1(rabbit.gr);
1291 }
1292
1293 for (double t1_ns; (t1_ns = rabbit.t1_ns - getTimeOfFrame(timeslice.getFrameIndex())) <= getFrameTime(); ) {
1294
1295 JTriggeredEvent tev(t1_ns,
1296 parameters.triggerRandom.TMax_ns,
1299 *moduleRouter);
1300
1301 this->put(tev);
1302
1303 rabbit.t1_ns += f1(rabbit.gr);
1304 }
1305 }
1306 }
1307
1308 JTriggerInput trigger_input(timesliceL2);
1309 JTriggerOutput trigger_output;
1310
1311 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1312 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1313 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1314
1315 trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
1316
1317 for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
1318
1319 const JTriggeredEvent object(*event, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns, getTimeRange(parameters));
1320
1321 this->put(object);
1322
1323 numberOfEvents += 1;
1324 }
1325
1326 if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1327
1328 const JTimesliceL1<JDAQTimesliceL1> object(timesliceL1, *timesliceRouter, *moduleRouter, parameters.TMaxLocal_ns);
1329
1330 if (parameters.writeL1) { this->put(object); }
1331 if (c_buffer.sizeL1 > 0) { c_buffer.put(object); }
1332 }
1333
1334 if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1335
1336 const JTimesliceL1<JDAQTimesliceL2> object(timesliceL2, *timesliceRouter, *moduleRouter, parameters.L2.TMaxLocal_ns);
1337
1338 if (parameters.writeL2) { this->put(object); }
1339 if (c_buffer.sizeL2 > 0) { c_buffer.put(object); }
1340 }
1341
1342 if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1343
1344 const JTimesliceL1<JDAQTimesliceSN> object(timesliceSN, *timesliceRouter, *moduleRouter, parameters.SN.TMaxLocal_ns);
1345
1346 if (parameters.writeSN) { this->put(object); }
1347 if (c_buffer.sizeSN > 0) { c_buffer.put(object); }
1348 }
1349
1350 if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1351
1352 if (parameters.writeL0) { this->put(timeslice); }
1353 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1354 }
1355 }
1356
1357 } catch(const std::exception& error) {
1358
1359 JErrorStream(logger) << "Error = " << error.what() << ";"
1360 << "run = " << timeslice.getRunNumber() << ";"
1361 << "frame index = " << timeslice.getFrameIndex() << ";"
1362 << "time slice not correctly processed;"
1363 << "discard" << (dumpCount < dumpLimit ? " and dump" : "");
1364
1365 if (dumpCount < dumpLimit) {
1366
1367 this->put(static_cast<const JDAQTimeslice&>(timeslice));
1368
1369 dumpCount += 1;
1370 }
1371 }
1372
1373 timesliceRouter->reset();
1374 }
1375
1376
1377 /**
1378 * Report status to message logger.
1379 */
1380 void typeout()
1381 {
1382 timer.stop();
1383
1384 const double T_us = (double) timer.usec_wall;
1385
1386 JStatusStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
1387 JStatusStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1388 JStatusStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1389 JStatusStream(logger) << "Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 << " <= " << Qt.getXmax() * 1.0e-3;
1390 JStatusStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
1391 JStatusStream(logger) << "Number of events/MB/us " << numberOfEvents << "/" << numberOfBytes/1e6 << "/" << Qx.getMean(0.0);
1392
1394 JStatusStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received;
1395 }
1396
1397 JStatusStream(logger) << "Current queue depth/size " << timeslices.size() << "/" << queueSize;
1398 JStatusStream(logger) << "Current number of frames per slice expected: " << frames_per_slice << ' ' << FIXED(5,3) << factor_per_slice;
1399
1400 JStatusStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
1401
1402 const double processedSlicesTime_us = numberOfTimeslicesProcessed * getFrameTime() / 1000;
1403 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
1404
1405 if (processedSlicesTime_us > 0) {
1406 JStatusStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1407 }
1408 if (processedDetectorTime_us > 0) {
1409 JStatusStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1410 }
1411
1412 timer.start();
1413 }
1414
1415
1416 /**
1417 * Tagged action to handle alerts.
1418 *
1419 * \param tag tag
1420 * \param length number of characters
1421 * \param buffer message
1422 */
1423 virtual void actionTagged(const JTag& tag, int length, const char* buffer) override
1424 {
1425 using namespace std;
1426
1427 JDebugStream(logger) << "Received message <" << tag.toString() << "> \"" << string(buffer, length) << "\"";
1428
1429 if (tag == RC_ALERT) {
1430
1431 if (c_buffer.is_open()) {
1432
1433 JStatusStream(logger) << "Archive circular buffer in <" << c_buffer.archive << ">";
1434
1435 c_buffer.close(true);
1436 }
1437
1438 if (c_buffer.is_enabled()) {
1439
1440 c_buffer.open();
1441
1442 if (c_buffer.is_open()) {
1443
1444 JStatusStream(logger) << "Created circular buffer " << c_buffer;
1445
1447
1448 } else {
1449
1450 JErrorStream (logger) << "Failed to create circular buffer in directory <" << c_buffer.path << ">; disable functionality.";
1451
1452 c_buffer.disable();
1453 }
1454 }
1455
1456 } else {
1457
1458 JWarningStream(logger) << "Tag <" << tag.toString() << "> not implemented";
1459 }
1460 }
1461
1462 JMeta meta; //!< meta data
1463
1464 private:
1465
1467 JSinglePointer<JControlHost_t> datawriter; //!< controlhost of data server (to which data writer should be connected)
1468 std::string hostname; //!< host name of data server
1469
1470 /**
1471 * Auxiliary method to send object to data server.
1472 *
1473 * \param object object to be sent
1474 */
1475 template<class T>
1476 void put(const T& object)
1477 {
1478 try {
1479
1480 const localtime_t t0 = getLocalTime();
1481
1482 datawriter->put(object);
1483
1484 const localtime_t t1 = getLocalTime();
1485
1486 numberOfBytes += getSizeof(object);
1487
1488 Qx.put(t1 - t0);
1489 }
1490 catch(const std::exception& error) {
1491 JErrorStream(logger) << "Error \"" << error.what() << "\"; trigger ev_error.";
1492 ev_error();
1493 }
1494 }
1495
1496
1497 int port; //!< server socket port
1499
1500 JSinglePointer<JServerSocket> serversocket; //!< server for data queue connections
1501 JChannelList_t channelList; //!< connections to data queue
1502
1505
1506 std::deque<JDAQTimesliceL0> timeslices; //!< buffer with pending time slice data
1512
1513 struct :
1514 public std::map<int, int>
1515 {
1516 /**
1517 * Get minimal frame index.
1518 *
1519 * \return frame index
1520 */
1521 int min() const
1522 {
1523 if (this->empty()) {
1524
1525 return -1;
1526
1527 } else {
1528
1529 int min = std::numeric_limits<int>::max();
1530
1531 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1532 if (i->second < min) {
1533 min = i->second;
1534 }
1535 }
1536
1537 return min;
1538 }
1539 }
1540
1541 /**
1542 * Get maximal frame index.
1543 *
1544 * \return frame index
1545 */
1546 int max() const
1547 {
1548 if (this->empty()) {
1549
1550 return -1;
1551
1552 } else {
1553
1554 int max = std::numeric_limits<int>::lowest();
1555
1556 for (const_iterator i = this->begin(); i != this->end(); ++i) {
1557 if (i->second > max) {
1558 max = i->second;
1559 }
1560 }
1561
1562 return max;
1563 }
1564 }
1565
1567
1568 // trigger
1569
1572
1579
1584
1590
1592 int dumpLimit; //!< maximum number of time slices to be dumped
1593 JStatus dumpMask; //!< mask for data to be dumped
1594
1595 // process management
1596
1599
1600 // memory management
1601
1602 long long int totalCPURAM;
1604 long long int maxQueueSize;
1605 long long int queueSize;
1606
1607 // statistics
1608
1610
1611 long long int numberOfEvents;
1612 long long int numberOfBytes;
1615
1618
1619 // temporary
1620
1623 long long int number_of_reads;
1625
1626 // circular buffer
1627
1629
1630 // random trigger
1631
1633
1635 gr(rd()),
1636 t1_ns(0.0)
1637 {}
1638
1639 std::random_device rd;
1640 std::mt19937_64 gr;
1641 double t1_ns;
1642
1644 };
1645}
1646
1647/**
1648 * \file
1649 *
1650 * Application for real-time filtering of data.
1651 * For more information, see KM3NETDAQ::JDataFilter.
1652 *
1653 * \author rbruijn and mdejong
1654 */
1655int main(int argc, char* argv[])
1656{
1657 using namespace std;
1658 using namespace JPP;
1659 using namespace KM3NETDAQ;
1660
1661 string server;
1662 string logger;
1663 string hostname;
1664 string client_name;
1665 int port;
1666 int backlog;
1667 bool use_cout;
1668 string path;
1669 string archive;
1670 int debug;
1671
1672
1673 try {
1674
1675 JParser<> zap("Application for real-time filtering of data.");
1676
1677 zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
1678 zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
1679 zap['D'] = make_field(hostname, "host name of server of data writer") = "";
1680 zap['u'] = make_field(client_name, "client name") = "%";
1681 zap['P'] = make_field(port, "port to listen for incoming data from data queue");
1682 zap['q'] = make_field(backlog, "back log") = 1024;
1683 zap['c'] = make_field(use_cout, "print to terminal");
1684 zap['p'] = make_field(path, "directory for temporary storage of circular buffer") = "/tmp/";
1685 zap['A'] = make_field(archive, "directory for permanent archival of circular buffer") = "/tmp/";
1686 zap['d'] = make_field(debug, "debug level") = 0;
1687
1688 zap(argc, argv);
1689 }
1690 catch(const std::exception& error) {
1691 FATAL(error.what() << endl);
1692 }
1693
1694
1695 JLogger* out = NULL;
1696
1697 if (use_cout)
1698 out = new JStreamLogger(cout);
1699 else
1700 out = new JControlHostLogger(logger);
1701
1702 JDataFilter dfilter(client_name,
1703 server,
1704 hostname,
1705 out,
1706 debug,
1707 port,
1708 backlog,
1709 path,
1710 archive);
1711
1712 dfilter.meta = JMeta(argc, argv);
1713
1714 dfilter.enter();
1715 dfilter.run();
1716}
Fixed parameters and ControlHost tags for KM3NeT DAQ.
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Data structure for detector geometry and calibration.
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Basic data structure for L0 hit.
Basic data structure for L1 hit.
Tools for handling different hit types.
General purpose messaging.
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:74
ROOT I/O of application specific meta data.
Hostname and IP address functions.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
Physics constants.
I/O formatting auxiliaries.
Utility class to parse parameter values.
File status.
ROOT TTree parameter settings of various packages.
System auxiliaries.
System time information.
Scheduling of actions via fixed latency intervals.
Setting of trigger bits.
Basic data structure for time and time over threshold information of hit.
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
Auxiliary methods for handling file names, type names and environment.
void merge(const JMatch_t &match)
Merge events.
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_error_event ev_error
JDAQStateMachine::ev_configure_event ev_configure
Detector data structure.
Definition JDetector.hh:96
Router for direct addressing of module data in detector data structure.
Data structure for a composite optical module.
Definition JModule.hh:76
Utility class to parse parameter values.
bool read(const JEquation &equation)
Read equation.
Time keeper.
Auxiliary class for CPU timing and usage.
Definition JTimer.hh:33
unsigned long long usec_ucpu
Definition JTimer.hh:239
unsigned long long usec_wall
Definition JTimer.hh:238
void stop()
Stop timer.
Definition JTimer.hh:127
void reset()
Reset timer.
Definition JTimer.hh:93
unsigned long long usec_scpu
Definition JTimer.hh:240
void start()
Start timer.
Definition JTimer.hh:106
const JPosition3D & getPosition() const
Get position.
Byte array binary input.
const char * data() const
Get data.
int size() const
Get size.
int getFileDescriptor() const
Get file descriptor.
virtual bool is_open() const =0
Check is device is open.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Auxiliary class for method select.
void set(const int file_descriptor)
Set file descriptor.
bool has(const int file_descriptor) const
Has file descriptor.
Exception for opening of file.
The template JSinglePointer class can be used to hold a pointer to an object.
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
Message logger with time scheduler.
Message logging based on std::ostream.
Implemenation of object output through ControlHost.
TCP server socket.
Socket input channel.
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition JSocket.hh:148
int getReceiveBufferSize() const
Set receive buffer size.
Definition JSocket.hh:159
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition JSocket.hh:104
TCP socket.
Definition JTCPSocket.hh:30
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition JTCPSocket.hh:56
ControlHost tag.
Definition JTag.hh:38
std::string toString() const
Convert tag to string.
Definition JTag.hh:171
Utility class to parse command line options.
Definition JParser.hh:1698
TFile * getFile() const
Get file.
Definition JRootFile.hh:66
virtual bool is_open() const override
Check is file is open.
Definition JRootFile.hh:77
JTreeWriter< T, JRootCreateFlatTree< T >::value > * out
virtual bool put(const T &object) override
Object output.
ROOT TTree object output.
1-dimensional frame with time calibrated data from one optical module.
2-dimensional frame with time calibrated data from one optical module.
static JSuperFrame2D< JElement_t, JAllocator_t > demultiplex
Demultiplexer.
std::vector< value_type >::iterator iterator
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Time slice with calibrated data.
Definition JTimeslice.hh:29
Data structure for input to trigger algorithm.
Nano-beacon trigger.
Definition JTriggerNB.hh:21
Auxiliary class to build KM3NETDAQ::JDAQEvent for a triggered event.
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
int getRunNumber() const
Get run number.
int getFrameIndex() const
Get frame index.
Control unit client base class.
JSharedPointer< JControlHost > server
message server
bool isRunning() const
Check if this client is in runnig state.
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
void addSubscription(const JSubscription &subscription)
Add custom subscription.
int getModuleID() const
Get module identifier.
Auxiliary class for itemization of process list.
std::string index
index in process list
Data frame of one optical module.
static void reset()
Reset counter of unique instance of this class object.
Main class for real-time filtering of data.
long long int number_of_packets_received
JMessageScheduler logErrorRun
int dumpLimit
maximum number of time slices to be dumped
JSinglePointer< JServerSocket > serversocket
server for data queue connections
JMessageScheduler logErrorDetector
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
JCircularBuffer_t c_buffer
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
long long int numberOfBytes
long long int number_of_packets_discarded
JMessageScheduler logErrorIndex
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
virtual void actionInit(int length, const char *buffer) override
void put(const T &object)
Auxiliary method to send object to data server.
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
long long int number_of_bytes_received
virtual void actionContinue(int length, const char *buffer) override
long long int maxQueueSize
int port
server socket port
long long int totalCPURAM
void processTimeSlice(const JDAQTimesliceL0 &timeslice)
Process time slice.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
std::vector< JDAQProcess > dataQueues
JMessageScheduler logErrorOvercomplete
long long int number_of_reads
std::vector< JSocketInputChannel_t > JChannelList_t
virtual void actionStop(int length, const char *buffer) override
JSinglePointer< JBuildL2_t > buildNB
JSinglePointer< JBuildL2_t > buildL2
JBuildL1< hit_type > JBuildL1_t
JSinglePointer< JBuildL1_t > buildL1
virtual void actionExit() override
virtual void actionQuit(int length, const char *buffer) override
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
std::string hostname
host name of data server
virtual void actionConfigure(int length, const char *buffer) override
long long int numberOfTimeslicesProcessed
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JChannelList_t channelList
connections to data queue
long long int numberOfIncompleteTimeslicesProcessed
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
JSuperFrame1D< hit_type > JSuperFrame1D_t
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
JMessageScheduler logErrorIncomplete
JSuperFrame2D< hit_type > JSuperFrame2D_t
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
JTimeslice< hit_type > JTimeslice_t
JStatus dumpMask
mask for data to be dumped
struct KM3NETDAQ::JDataFilter::rabbit_type rabbit
JSinglePointer< JModuleRouter > moduleRouter
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JSinglePointer< JTimesliceRouter > timesliceRouter
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
JSinglePointer< JTriggerNB > triggerNB
JBuildL2< hit_type > JBuildL2_t
long long int numberOfEvents
std::vector< JDAQProcess > dataFilters
JSinglePointer< JTrigger3DShower > trigger3DShower
virtual void actionStart(int length, const char *buffer) override
virtual void actionPause(int length, const char *buffer) override
JSinglePointer< JTriggerMXShower > triggerMXShower
virtual void actionReset(int length, const char *buffer) override
std::set< int > modules
JSinglePointer< JBuildL2_t > buildSN
JTriggerParameters parameters
void typeout()
Report status to message logger.
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
Definition JVectorize.hh:54
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
std::string trim(const std::string &buffer)
Trim string.
static const long long int GIGABYTE
Number of bytes in a mega-byte.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const char * getName()
Get ROOT name of given data type.
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
long long int localtime_t
Type definition of local time.
unsigned long long int getRAM()
Get RAM of this CPU.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
Definition JNetwork.hh:216
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
static JStat getFileStatus
Function object for file status.
Definition JStat.hh:173
return result
Definition JPolint.hh:862
const int n
Definition JPolint.hh:791
static const JChecksum checksum
Function object to perform check-sum of raw data.
Definition JChecksum.hh:200
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
static const JNET::JTag RC_DFILTER
Definition JDAQTags.hh:73
double getFrameTime()
Get frame time duration.
Definition JDAQClock.hh:162
double getTimeOfFrame(const int frame_index)
Get start time of frame in ns since start of run for a given frame index.
Definition JDAQClock.hh:185
static const JNET::JTag RC_ALERT
Definition JDAQTags.hh:79
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
Definition JDAQTags.hh:36
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
Definition JDAQTags.hh:37
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
Definition JDAQTags.hh:38
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition JDAQTags.hh:90
size_t getSizeof()
Definition of method to get size of data type.
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
Definition pmt_status.hh:14
Auxiliary data structure for sequence of same character.
Definition JManip.hh:330
Auxiliary data structure for floating point format specification.
Definition JManip.hh:448
Detector file.
Definition JHead.hh:227
Match of two events considering overlap in time and position.
Auxiliary class for handling status.
Definition JStatus.hh:31
void set(const int bit)
Set PMT status.
Definition JStatus.hh:209
Type list.
Definition JTypeList.hh:23
Level specific message streamers.
Auxiliary class for all subscription.
Auxiliary class for ROOT I/O of application specific meta data.
Definition JMeta.hh:72
Auxiliary class for date and time.
int getYear() const
year a.d.
int getDay() const
day of the month [1-31]
int getMonth() const
month of the year [1-12]
Auxiliary data structure for running average and standard deviation.
Definition JStats.hh:44
double getXmax() const
Get maximum value.
Definition JStats.hh:201
void put(const double x, const double w=1.0)
Put value.
Definition JStats.hh:119
double getMean() const
Get mean value.
Definition JStats.hh:234
void reset()
Reset.
Definition JStats.hh:79
Auxiliary data structure for result of checksum.
Definition JChecksum.hh:90
@ ETDC_t
TDC value error.
Definition JChecksum.hh:47
@ SIZE_t
size error
Definition JChecksum.hh:50
@ EPMT_t
PMT number error.
Definition JChecksum.hh:46
@ TIME_t
Time order error.
Definition JChecksum.hh:48
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
const JTag & getUniqueTag() const
Get unique tag of this run control client.
const std::string & getFullName() const
Get full name of this run control client.
void setClockInterval(const long long int interval_us)
Set interval time.
long long int getClockInterval() const
Get interval time.
Timeslice data structure for L0 data.
std::string archive
Directory for permanent archival.
std::string path
Directory for temporary storage.
Long64_t sizeL1
Number of L1 time slices.
Long64_t sizeSN
Number of SN time slices.
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
JTag tag
Unique tag of this process.
Long64_t sizeL0
Number of L0 time slices.
void close(const bool option)
Close file.
Long64_t sizeL2
Number of L2 time slices.
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
bool is_enabled() const
Check whether writing of data is enabled.
static const int JTRIGGERRANDOM
Random trigger.
Definition trigger.hh:17