181 void open(
const std::string& path,
const JTag& tag)
187 for (
int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
203 this->
open(os.str().c_str());
227 return (sizeL0 > 0 ||
242 if (
object.is_open())
243 out <<
object.getFile()->GetName();
249 return out <<
object.sizeL0 <<
'/'
250 <<
object.sizeL1 <<
'/'
251 <<
object.sizeL2 <<
'/'
252 <<
object.sizeSN <<
'/';
289 const std::string& server,
290 const std::string& hostname,
296 const std::string& path) :
309 current_slice_index = -1;
320 if (c_buffer.is_open()) {
322 JNoticeStream(logger) <<
"Close circular buffer " << c_buffer;
333 JDebugStream(logger) <<
"actionInit() " << std::string(buffer,length);
341 catch(
const std::exception& error) {
342 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
352 JDebugStream(logger) <<
"actionConfigure() " << endl << std::string(buffer,length);
354 long long int update_s = 10;
355 long long int logger_s = 5;
365 properties[
"dataWriter"] = hostname;
366 properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
368 properties[
"triggerParameters"] = parameters;
369 properties[
"queueSize"] = maxQueueSize = (totalCPURAM -
GIGABYTE);
370 properties[
"queueDepth"] = maxQueueDepth = 20;
371 properties[
"logger_s"] = logger_s;
372 properties[
"update_s"] = update_s;
373 properties[
"JDataFilter"] = dataFilters;
374 properties[
"DataQueue"] = dataQueues;
375 properties[
"path"] = path;
376 properties[
"c_sizeL0"] = c_buffer.sizeL0;
377 properties[
"c_sizeL1"] = c_buffer.sizeL1;
378 properties[
"c_sizeL2"] = c_buffer.sizeL2;
379 properties[
"c_sizeSN"] = c_buffer.sizeSN;
382 properties.
read(
string(buffer, length));
384 catch(
const exception& error) {
388 if (update_s <= 0) { update_s = 1; }
389 if (logger_s <= 0) { logger_s = 1; }
391 setClockInterval(update_s * 1000000LL);
393 hostname =
trim(hostname);
398 throw JException(
"Undefined data writer host name.");
400 maximum_frames_per_slice = frames_per_slice;
404 if (dataFilters.empty()) {
405 JNoticeStream(logger) <<
"No DataFilters in process list, or no process list. "
406 <<
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
409 sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
411 unsigned int numberOfDataFiltersOnThisMachine = 0;
419 notice <<
"My IP addresses:";
428 JDebugStream(logger) <<
"Test IP address \"" << i->hostname <<
"\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
430 if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
432 numberOfDataFiltersOnThisMachine++;
434 if (i->port == this->port) {
440 if (numberOfDataFiltersOnThisMachine == 0) {
441 JNoticeStream(logger) <<
"Zero data filters on this machine according to process list (if it exists). "
442 <<
"Assuming one datafilter on this machine.";
443 numberOfDataFiltersOnThisMachine = 1;
446 if (thisProcess == dataFilters.end()) {
447 JErrorStream(logger) <<
"This process cannot be found in the process list. Why do I exist?";
450 if (thisProcess != dataFilters.end() && thisProcess->index !=
getName()) {
451 JErrorStream(logger) <<
"Mismatch between given process names: "
453 <<
", but in the process list I am referred to as " << thisProcess->index;
456 if (dataFilters.begin() == thisProcess || dataFilters.empty()) {
460 if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {
462 maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
464 JNoticeStream(logger) <<
"Maximum queue size is too large given the number of processes on this machine. "
465 <<
"Queue size reduced to "
466 << maxQueueSize <<
" bytes." ;
480 JNoticeStream(logger) <<
"This data filter process will report.";
481 JNoticeStream(logger) <<
"Number of modules: " << (*moduleRouter)->size();
482 JDebugStream (logger) <<
"Trigger parameters: " << parameters;
484 JNoticeStream(logger) <<
"Update period [s]: " << getClockInterval();
495 if (buildL1.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL1."; }
496 if (buildL2.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL2."; }
497 if (buildSN.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
504 if (c_buffer.is_enabled()) {
506 if (!c_buffer.is_open()) {
510 if (c_buffer.is_open()) {
512 JNoticeStream(logger) <<
"Created circular buffer " << c_buffer;
518 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << path <<
">; disable functionality.";
525 JNoticeStream(logger) <<
"Continue using circular buffer " << c_buffer;
535 if (c_buffer.is_open()) {
552 current_slice_index = -1;
557 numberOfTimeslicesProcessed = 0;
558 numberOfIncompleteTimeslicesProcessed = 0;
560 number_of_packets_received = 0;
561 number_of_packets_discarded = 0;
562 number_of_bytes_received = 0;
565 minFrameNumber = numeric_limits<int>::max();
566 maxFrameNumber = numeric_limits<int>::min();
572 logErrorRun .reset();
573 logErrorDetector .reset();
574 logErrorIndex .reset();
575 logErrorIncomplete.reset();
586 os << getRunNumber() <<
' ' << parameters;
596 if (!timeslices.empty()) {
598 JNoticeStream(logger) <<
"Flushing " << timeslices.size() <<
" slices.";
611 timeslices.swap(buffer);
614 if (queueSize != 0) {
615 JWarningStream(logger) <<
"Pending data in queue " << queueSize <<
" [B]";
618 current_slice_index = -1;
641 serversocket.reset();
653 if (serversocket.is_valid()) {
654 mask.
set(*serversocket);
657 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
658 if (!channel->isReady()) {
659 mask.
set(channel->getFileDescriptor());
669 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
673 if (mask.
has(channel->getFileDescriptor())) {
677 if (channel->isReady()) {
679 number_of_packets_received += 1;
680 number_of_reads += channel->getCounter();
681 number_of_bytes_received += channel->size();
685 updateFrameQueue(channel);
689 JErrorStream(logErrorRun) <<
"Receiving data while not running.";
691 number_of_packets_discarded += 1;
701 JNoticeStream(logger) <<
"Disconnecting channel " << channel->getFileDescriptor() <<
' ' << error.
what();
705 channel = channelList.erase(channel);
710 if (serversocket.is_valid()) {
712 if (mask.
has(*serversocket)) {
716 socket.
accept(serversocket->getFileDescriptor());
731 if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
732 timeslices.size() >= maxQueueDepth ||
733 queueSize >= maxQueueSize)) {
739 minFrameNumber = min(minFrameNumber, pending_slice.
getFrameIndex());
740 maxFrameNumber = max(maxFrameNumber, pending_slice.
getFrameIndex());
742 if (pending_slice.size() > frames_per_slice) {
744 JErrorStream(logger) <<
"More frames in timeslice than expected "
745 << pending_slice.size() <<
" > " << frames_per_slice;
747 if (pending_slice.size() <= maximum_frames_per_slice) {
749 JErrorStream(logger) <<
"Adjusting expected frames per timeslice to " << pending_slice.size();
751 frames_per_slice = pending_slice.size();
755 if (!pending_slice.empty()) {
759 processTimeSlice(pending_slice);
763 numberOfTimeslicesProcessed += 1;
767 if (pending_slice.size() < frames_per_slice) {
769 numberOfIncompleteTimeslicesProcessed += 1;
771 JErrorStream(logErrorIncomplete) <<
"Timeout -> processed incomplete timeslice: "
773 <<
"Size of timeslice = " << pending_slice.size() <<
';'
774 <<
"Queue depth = " << timeslices.size() <<
';'
775 <<
"Queue size = " << queueSize;
777 if (!timeslices.empty()) {
779 JErrorStream(logger) <<
"Adjusting expected frames per timeslice from " << frames_per_slice <<
" to " << pending_slice.size();
781 frames_per_slice = pending_slice.size();
786 timeslices.pop_front();
818 if (preamble.getLength() != channel->size()) {
820 JErrorStream(logErrorRun) <<
"Size of received data does not match size reported by preamble: "
821 <<
"preamble.getLength() = " << preamble.getLength() <<
';'
822 <<
"channel->size(): " << channel->size() <<
';';
824 number_of_packets_discarded += 1;
829 if (header.getRunNumber() != getRunNumber()) {
831 JErrorStream(logErrorRun) <<
"Run number " << header.getRunNumber()
832 <<
" unequal to current run " << getRunNumber()
833 <<
" -> Dropping frame.";
835 number_of_packets_discarded += 1;
840 if (header.getFrameIndex() <= current_slice_index) {
842 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" <= " << current_slice_index
843 <<
" -> Dropping frame.";
845 number_of_packets_discarded += 1;
847 if (frames_per_slice < maximum_frames_per_slice) {
851 JErrorStream(logErrorIndex) <<
"Increase number of frames expected to: " << frames_per_slice;
859 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
863 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
871 timesliceIterator = timeslices.insert(timesliceIterator,
JDAQTimesliceL0());
873 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
875 queueSize +=
getSizeof(*timesliceIterator);
880 in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
882 queueSize +=
getSizeof(*timesliceIterator->rbegin());
897 if (parameters.writeSummary()) {
901 if (parameters.trigger3DMuon.enabled ||
902 parameters.trigger3DShower.enabled ||
903 parameters.triggerMXShower.enabled ||
904 parameters.writeL0.prescale ||
905 parameters.writeL1.prescale ||
906 parameters.writeL2.prescale ||
907 parameters.writeSN.prescale ||
908 c_buffer.is_enabled()) {
910 timesliceRouter->configure(timeslice);
918 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
920 if (moduleRouter->hasModule(frame->getModuleID())) {
927 <<
"module = " << frame->getModuleID() <<
";"
928 <<
"discard and dump";
930 timesliceTX.push_back(*frame);
935 const JModule& module = moduleRouter->getModule(frame->getModuleID());
936 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
949 frame->getModuleIdentifier(),
952 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
957 frame->getModuleIdentifier(),
960 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
965 frame->getModuleIdentifier(),
968 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
972 JErrorStream(logErrorDetector) <<
"No detector information for module " << frame->getModuleID();
976 if (!timesliceTX.empty()) {
977 this->put(timesliceTX);
985 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
986 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
987 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
991 for (JTriggerOutput::const_iterator event = trigger_output.begin();
event != trigger_output.end(); ++event) {
995 if (this->put(
object)) {
1000 if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
1004 if (parameters.writeL1) { this->put(
object); }
1005 if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
1008 if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
1012 if (parameters.writeL2) { this->put(
object); }
1013 if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
1016 if (parameters.writeSN() || c_buffer.sizeSN > 0) {
1020 if (parameters.writeSN) { this->put(
object); }
1021 if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
1024 if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
1026 if (parameters.writeL0) { this->put(timeslice); }
1027 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1035 <<
"time slicee not correctly processed!";
1047 const double T_us = (double) timer.usec_wall;
1049 JNoticeStream(logger) <<
"Elapsed real (wall) time [s] " << T_us / 1e6;
1050 JNoticeStream(logger) <<
"Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1051 JNoticeStream(logger) <<
"Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1053 JNoticeStream(logger) <<
"Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3 <<
" +/- " << Qt.getDeviation() * 1.0e-3;
1055 catch(
const std::exception&) {}
1056 JNoticeStream(logger) <<
"Number of packets received/discarded " << number_of_packets_received <<
"/" << number_of_packets_discarded;
1057 JNoticeStream(logger) <<
"Number of events/MB sent " << numberOfEvents <<
"/" << numberOfBytes/1e6;
1059 if (number_of_packets_received > 0) {
1060 JNoticeStream(logger) <<
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
1063 JNoticeStream(logger) <<
"Current queue depth/size " << timeslices.size() <<
"/" << queueSize;
1064 JNoticeStream(logger) <<
"Current number of frames per slice expected: " << frames_per_slice;
1066 JNoticeStream(logger) <<
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed <<
"/" << numberOfIncompleteTimeslicesProcessed;
1068 if (numberOfTimeslicesProcessed > 0) {
1069 JNoticeStream(logger) <<
"Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
1070 JNoticeStream(logger) <<
"User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
1071 JNoticeStream(logger) <<
"System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
1074 const double processedSlicesTime_us = numberOfTimeslicesProcessed *
getFrameTime() / 1000;
1075 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) *
getFrameTime() / 1000;
1077 if (processedSlicesTime_us > 0) {
1078 JNoticeStream(logger) <<
"Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1080 if (processedDetectorTime_us > 0) {
1081 JNoticeStream(logger) <<
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1097 using namespace std;
1099 JDebugStream(logger) <<
"Received message <" << tag.
toString() <<
"> \"" << string(buffer, length) <<
"\"";
1103 if (c_buffer.is_open()) {
1105 JNoticeStream(logger) <<
"Close circular buffer " << c_buffer;
1110 if (c_buffer.is_enabled()) {
1114 if (c_buffer.is_open()) {
1116 JNoticeStream(logger) <<
"Created circular buffer " << c_buffer;
1122 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << path <<
">; disable functionality.";
1136 static const int MAXIMUM_FILE_NUMBER = 100;
1155 datawriter->put(
object);
1252 using namespace std;
1253 using namespace JPP;
1270 JParser<> zap(
"Application for real-time filtering of data.");
1275 zap[
'u'] =
make_field(client_name) =
"JDataFilter";
1285 catch(
const exception& error) {
1286 FATAL(error.what() << endl);