182 void open(
const std::string& path,
const JTag& tag)
188 for (
int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
204 this->
open(os.str().c_str());
206 catch(
const exception& error) {}
228 return (sizeL0 > 0 ||
243 if (
object.is_open())
244 out <<
object.getFile()->GetName();
250 return out <<
object.sizeL0 <<
'/'
251 <<
object.sizeL1 <<
'/'
252 <<
object.sizeL2 <<
'/'
253 <<
object.sizeSN <<
'/';
290 const std::string& server,
291 const std::string& hostname,
297 const std::string& path) :
310 current_slice_index = -1;
321 if (c_buffer.is_open()) {
323 JNoticeStream(logger) <<
"Close circular buffer " << c_buffer;
334 JDebugStream(logger) <<
"actionInit() " << std::string(buffer,length);
342 catch(
const std::exception& error) {
343 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
353 JDebugStream(logger) <<
"actionConfigure() " << endl << std::string(buffer,length);
355 long long int update_s = 10;
356 long long int logger_s = 5;
366 properties[
"dataWriter"] = hostname;
367 properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
369 properties[
"triggerParameters"] = parameters;
370 properties[
"queueSize"] = maxQueueSize = (totalCPURAM -
GIGABYTE);
371 properties[
"queueDepth"] = maxQueueDepth = 20;
372 properties[
"frameIndex"] = maximal_frame_index = 100000;
373 properties[
"logger_s"] = logger_s;
374 properties[
"update_s"] = update_s;
375 properties[
"JDataFilter"] = dataFilters;
376 properties[
"DataQueue"] = dataQueues;
377 properties[
"path"] = path;
378 properties[
"c_sizeL0"] = c_buffer.sizeL0;
379 properties[
"c_sizeL1"] = c_buffer.sizeL1;
380 properties[
"c_sizeL2"] = c_buffer.sizeL2;
381 properties[
"c_sizeSN"] = c_buffer.sizeSN;
384 properties.
read(
string(buffer, length));
386 catch(
const exception& error) {
390 if (update_s <= 0) { update_s = 1; }
391 if (logger_s <= 0) { logger_s = 1; }
393 setClockInterval(update_s * 1000000LL);
395 hostname =
trim(hostname);
400 throw JException(
"Undefined data writer host name.");
402 maximum_frames_per_slice = frames_per_slice;
406 if (dataFilters.empty()) {
407 JNoticeStream(logger) <<
"No DataFilters in process list, or no process list. "
408 <<
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
411 sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
413 unsigned int numberOfDataFiltersOnThisMachine = 0;
421 notice <<
"My IP addresses:";
430 JDebugStream(logger) <<
"Test IP address \"" << i->hostname <<
"\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
432 if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
434 numberOfDataFiltersOnThisMachine++;
436 if (i->port == this->port) {
442 if (numberOfDataFiltersOnThisMachine == 0) {
443 JNoticeStream(logger) <<
"Zero data filters on this machine according to process list (if it exists). "
444 <<
"Assuming one datafilter on this machine.";
445 numberOfDataFiltersOnThisMachine = 1;
448 if (thisProcess == dataFilters.end()) {
449 JErrorStream(logger) <<
"This process cannot be found in the process list. Why do I exist?";
452 if (thisProcess != dataFilters.end() && thisProcess->index !=
getName()) {
453 JErrorStream(logger) <<
"Mismatch between given process names: "
455 <<
", but in the process list I am referred to as " << thisProcess->index;
458 if (dataFilters.begin() == thisProcess || dataFilters.empty()) {
462 if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {
464 maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
466 JNoticeStream(logger) <<
"Maximum queue size is too large given the number of processes on this machine. "
467 <<
"Queue size reduced to "
468 << maxQueueSize <<
" bytes." ;
475 triggerNB .reset(
new JTriggerNB (parameters));
483 JNoticeStream(logger) <<
"This data filter process will report.";
484 JNoticeStream(logger) <<
"Number of modules: " << (*moduleRouter)->size();
485 JDebugStream (logger) <<
"Trigger parameters: " << parameters;
487 JNoticeStream(logger) <<
"Update period [s]: " << getClockInterval();
498 if (buildL1.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL1."; }
499 if (buildL2.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL2."; }
500 if (buildSN.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
507 if (c_buffer.is_enabled()) {
509 if (!c_buffer.is_open()) {
513 if (c_buffer.is_open()) {
515 JNoticeStream(logger) <<
"Created circular buffer " << c_buffer;
521 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << path <<
">; disable functionality.";
528 JNoticeStream(logger) <<
"Continue using circular buffer " << c_buffer;
538 if (c_buffer.is_open()) {
555 current_slice_index = -1;
560 numberOfTimeslicesProcessed = 0;
561 numberOfIncompleteTimeslicesProcessed = 0;
563 number_of_packets_received = 0;
564 number_of_packets_discarded = 0;
565 number_of_bytes_received = 0;
568 minFrameNumber = numeric_limits<int>::max();
569 maxFrameNumber = numeric_limits<int>::min();
575 logErrorRun .reset();
576 logErrorDetector .reset();
577 logErrorIndex .reset();
578 logErrorIncomplete.reset();
589 os << getRunNumber() <<
' ' << parameters;
599 if (!timeslices.empty()) {
601 JNoticeStream(logger) <<
"Flushing " << timeslices.size() <<
" slices.";
604 queueSize -= i->getSize();
614 timeslices.swap(buffer);
617 if (queueSize != 0) {
618 JWarningStream(logger) <<
"Pending data in queue " << queueSize <<
" [B]";
621 current_slice_index = -1;
644 serversocket.reset();
656 if (serversocket.is_valid()) {
657 mask.
set(*serversocket);
660 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
661 if (!channel->isReady()) {
662 mask.
set(channel->getFileDescriptor());
672 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
676 if (mask.
has(channel->getFileDescriptor())) {
680 if (channel->isReady()) {
682 number_of_packets_received += 1;
683 number_of_reads += channel->getCounter();
684 number_of_bytes_received += channel->size();
688 updateFrameQueue(channel);
692 JErrorStream(logErrorRun) <<
"Receiving data while not running.";
694 number_of_packets_discarded += 1;
702 catch(
const exception& error) {
704 JNoticeStream(logger) <<
"Disconnecting channel " << channel->getFileDescriptor() <<
' ' << error.what();
708 channel = channelList.erase(channel);
713 if (serversocket.is_valid()) {
715 if (mask.
has(*serversocket)) {
719 socket.
accept(serversocket->getFileDescriptor());
734 if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
735 timeslices.size() >= maxQueueDepth ||
736 queueSize >= maxQueueSize)) {
739 queueSize -= pending_slice.
getSize();
742 minFrameNumber = min(minFrameNumber, pending_slice.
getFrameIndex());
743 maxFrameNumber = max(maxFrameNumber, pending_slice.
getFrameIndex());
745 if (pending_slice.size() > frames_per_slice) {
747 JErrorStream(logger) <<
"More frames in timeslice than expected "
748 << pending_slice.size() <<
" > " << frames_per_slice;
750 if (pending_slice.size() <= maximum_frames_per_slice) {
752 JErrorStream(logger) <<
"Adjusting expected frames per timeslice to " << pending_slice.size();
754 frames_per_slice = pending_slice.size();
758 if (!pending_slice.empty()) {
762 processTimeSlice(pending_slice);
766 numberOfTimeslicesProcessed += 1;
770 if (pending_slice.size() < frames_per_slice) {
772 numberOfIncompleteTimeslicesProcessed += 1;
774 JErrorStream(logErrorIncomplete) <<
"Timeout -> processed incomplete timeslice: "
776 <<
"Size of timeslice = " << pending_slice.size() <<
';'
777 <<
"Queue depth = " << timeslices.size() <<
';'
778 <<
"Queue size = " << queueSize;
780 if (!timeslices.empty()) {
782 JErrorStream(logger) <<
"Adjusting expected frames per timeslice from " << frames_per_slice <<
" to " << pending_slice.size();
784 frames_per_slice = pending_slice.size();
789 timeslices.pop_front();
821 if (preamble.getLength() != channel->size()) {
823 JErrorStream(logErrorRun) <<
"Size of received data does not match size reported by preamble: "
824 <<
"preamble.getLength() = " << preamble.getLength() <<
';'
825 <<
"channel->size(): " << channel->size() <<
';';
827 number_of_packets_discarded += 1;
832 if (header.getRunNumber() != getRunNumber()) {
834 JErrorStream(logErrorRun) <<
"Run number " << header.getRunNumber()
835 <<
" unequal to current run " << getRunNumber()
836 <<
" -> Dropping frame.";
838 number_of_packets_discarded += 1;
843 if (header.getFrameIndex() <= current_slice_index) {
845 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" <= " << current_slice_index
846 <<
" -> Dropping frame.";
848 number_of_packets_discarded += 1;
850 if (frames_per_slice < maximum_frames_per_slice) {
854 JErrorStream(logErrorIndex) <<
"Increase number of frames expected to: " << frames_per_slice;
860 if (header.getFrameIndex() > current_slice_index + maximal_frame_index) {
862 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" > " << current_slice_index + maximal_frame_index
863 <<
" -> Dropping frame.";
865 number_of_packets_discarded += 1;
872 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
876 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
884 timesliceIterator = timeslices.insert(timesliceIterator,
JDAQTimesliceL0());
886 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
888 queueSize += timesliceIterator->getSize();
893 in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
895 queueSize += timesliceIterator->rbegin()->getSize();
910 if (parameters.writeSummary()) {
914 if (parameters.trigger3DMuon.enabled ||
915 parameters.trigger3DShower.enabled ||
916 parameters.triggerMXShower.enabled ||
917 parameters.triggerNB.enabled ||
918 parameters.writeL0.prescale ||
919 parameters.writeL1.prescale ||
920 parameters.writeL2.prescale ||
921 parameters.writeSN.prescale ||
922 c_buffer.is_enabled()) {
924 timesliceRouter->configure(timeslice);
933 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
935 if (moduleRouter->hasModule(frame->getModuleID())) {
942 <<
"module = " << frame->getModuleID() <<
";"
943 <<
"discard and dump";
945 timesliceTX.push_back(*frame);
950 const JModule& module = moduleRouter->getModule(frame->getModuleID());
951 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
963 if (parameters.triggerNB.enabled) {
967 if (buffer.begin() != __end) {
970 frame->getModuleIdentifier(),
973 (*buildL1)(buffer.begin(), __end , back_inserter(*timesliceNB.rbegin()));
980 frame->getModuleIdentifier(),
983 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
988 frame->getModuleIdentifier(),
991 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
996 frame->getModuleIdentifier(),
999 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1003 JErrorStream(logErrorDetector) <<
"No detector information for module " << frame->getModuleID();
1007 if (!timesliceTX.empty()) {
1008 this->put(timesliceTX);
1013 if (parameters.triggerNB.enabled) {
1017 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.
end(); ++hit) {
1024 parameters.TMaxLocal_ns,
1025 parameters.triggerNB.DMax_m,
1035 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1036 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1037 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1041 for (JTriggerOutput::const_iterator event = trigger_output.begin();
event != trigger_output.end(); ++event) {
1045 if (this->put(
object)) {
1046 numberOfEvents += 1;
1050 if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1054 if (parameters.writeL1) { this->put(
object); }
1055 if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
1058 if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1062 if (parameters.writeL2) { this->put(
object); }
1063 if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
1066 if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1070 if (parameters.writeSN) { this->put(
object); }
1071 if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
1074 if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1076 if (parameters.writeL0) { this->put(timeslice); }
1077 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1081 }
catch(
const exception& error) {
1082 JErrorStream(logger) <<
"Error = " << error.what() <<
";"
1085 <<
"time slice not correctly processed!";
1097 const double T_us = (double) timer.usec_wall;
1099 JNoticeStream(logger) <<
"Elapsed real (wall) time [s] " << T_us / 1e6;
1100 JNoticeStream(logger) <<
"Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1101 JNoticeStream(logger) <<
"Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1103 JNoticeStream(logger) <<
"Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 <<
" +/- " << Qt.getDeviation() * 1.0e-3;
1105 catch(
const std::exception&) {}
1106 JNoticeStream(logger) <<
"Number of packets received/discarded " << number_of_packets_received <<
"/" << number_of_packets_discarded;
1107 JNoticeStream(logger) <<
"Number of events/MB sent " << numberOfEvents <<
"/" << numberOfBytes/1e6;
1109 if (number_of_packets_received > 0) {
1110 JNoticeStream(logger) <<
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
1113 JNoticeStream(logger) <<
"Current queue depth/size " << timeslices.size() <<
"/" << queueSize;
1114 JNoticeStream(logger) <<
"Current number of frames per slice expected: " << frames_per_slice;
1116 JNoticeStream(logger) <<
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed <<
"/" << numberOfIncompleteTimeslicesProcessed;
1118 if (numberOfTimeslicesProcessed > 0) {
1119 JNoticeStream(logger) <<
"Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1120 JNoticeStream(logger) <<
"User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1121 JNoticeStream(logger) <<
"System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1124 const double processedSlicesTime_us = numberOfTimeslicesProcessed *
getFrameTime() / 1000;
1125 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) *
getFrameTime() / 1000;
1127 if (processedSlicesTime_us > 0) {
1128 JNoticeStream(logger) <<
"Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1130 if (processedDetectorTime_us > 0) {
1131 JNoticeStream(logger) <<
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1147 using namespace std;
1149 JDebugStream(logger) <<
"Received message <" << tag.
toString() <<
"> \"" << string(buffer, length) <<
"\"";
1153 if (c_buffer.is_open()) {
1155 JNoticeStream(logger) <<
"Close circular buffer " << c_buffer;
1160 if (c_buffer.is_enabled()) {
1164 if (c_buffer.is_open()) {
1166 JNoticeStream(logger) <<
"Created circular buffer " << c_buffer;
1172 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << path <<
">; disable functionality.";
1186 static const int MAXIMUM_FILE_NUMBER = 100;
1205 datawriter->put(
object);
1207 numberOfBytes +=
object.getSize();
1211 catch(
const std::exception& error) {
1304 using namespace std;
1305 using namespace JPP;
1322 JParser<> zap(
"Application for real-time filtering of data.");
1327 zap[
'u'] =
make_field(client_name) =
"JDataFilter";
1337 catch(
const exception& error) {
1338 FATAL(error.what() << endl);
1342 JLogger* out = NULL;
1345 out =
new JStreamLogger(cout);
1347 out =
new JControlHostLogger(logger);