107       return number_of_frames;
 
  110     const size_t n = (size_t) (number_of_frames * factor);
 
  112     return (
n == 0 ? 1 : 
n);
 
  211       static const int MAXIMUM_FILE_NUMBER = 100; 
 
  222                         const std::string& archive,
 
  242         gErrorIgnoreLevel = kFatal;
 
  244         std::ostringstream os;
 
  246         os << 
getFullPath(path) << 
"KM3NeT" << 
"_"  << tag << 
".root";
 
  249           std::remove(os.str().c_str());
 
  252         this->
open(os.str().c_str());
 
  270         if (this->is_open()) {
 
  272           const string file_name = this->
getFile()->GetName();
 
  278             for (
int i = 0; i != MAXIMUM_FILE_NUMBER; ++i) {
 
  304             std::remove(file_name.c_str());
 
  329         return (sizeL0 > 0 ||
 
  345         if (
object.is_open())
 
  346           out << 
object.getFile()->GetName();
 
  352         out << 
object.sizeL0 << 
'/' 
  353             << 
object.sizeL1 << 
'/' 
  354             << 
object.sizeL2 << 
'/' 
  355             << 
object.sizeSN << 
'/';
 
  398                 const std::string& server,
 
  399                 const std::string& hostname,
 
  404                 const std::string& path,
 
  405                 const std::string& archive) :
 
  417       current_slice_index  = -1;
 
  418       dataqueue_slice_index.clear();
 
  431       if (c_buffer.is_open()) {
 
  433         JStatusStream(logger) << 
"Close and remove circular buffer " << c_buffer;
 
  435         c_buffer.close(
false);
 
  442     virtual void actionInit(
int length, 
const char* buffer)
 override 
  444       JDebugStream(logger) << 
"actionInit() " << std::string(buffer,length);
 
  450         if (serversocket.is_valid()) {
 
  451           serversocket->shutdown();
 
  456       catch(
const std::exception& error) {
 
  457         JErrorStream(logger) << 
"Error \"" << error.what() << 
"\"; trigger ev_error.";
 
  467       JDebugStream(logger) << 
"actionConfigure() " << endl << std::string(buffer,length);
 
  469       string _hostname_ = 
"";
 
  471       long long int update_s = 20;
 
  472       long long int logger_s = 10;
 
  480       dumpLimit = numeric_limits<int>::max();
 
  493       properties[
"dataWriter"]             = _hostname_;
 
  494       properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
 
  495       properties[
"factorOfFramesPerSlice"] = factor_per_slice = 1.0;
 
  497       properties[
"triggerParameters"]      = parameters;
 
  498       properties[
"queueSize"]              = maxQueueSize  = (totalCPURAM - 
GIGABYTE);  
 
  499       properties[
"queueDepth"]             = maxQueueDepth = 20;                        
 
  500       properties[
"frameIndex"]             = maximal_frame_index = 100000;
 
  501       properties[
"logger_s"]               = logger_s;
 
  502       properties[
"update_s"]               = update_s;
 
  503       properties[
"JDataFilter"]            = dataFilters;
 
  504       properties[
"DataQueue"]              = dataQueues;
 
  505       properties[
"path"]                   = c_buffer.path;
 
  506       properties[
"archive"]                = c_buffer.archive;
 
  507       properties[
"c_sizeL0"]               = c_buffer.sizeL0;
 
  508       properties[
"c_sizeL1"]               = c_buffer.sizeL1;
 
  509       properties[
"c_sizeL2"]               = c_buffer.sizeL2;
 
  510       properties[
"c_sizeSN"]               = c_buffer.sizeSN;
 
  511       properties[
"dumpLimit"]              = dumpLimit;
 
  512       properties[
"dumpMask"]               = dumpMask;
 
  515         properties.
read(
string(buffer, length));
 
  517       catch(
const std::exception& error) {
 
  521       if (update_s <=  0) { update_s = 20; }
 
  522       if (logger_s <=  0) { logger_s = 10; }
 
  524       setClockInterval(update_s * 1000000LL);
 
  526       _hostname_ = 
trim(_hostname_);
 
  528       if (_hostname_ != 
"" && _hostname_ != hostname) {
 
  532         hostname = _hostname_;
 
  535       bool status = datawriter.is_valid();
 
  540           status = datawriter->Connected() == 0;
 
  542         catch (
const exception&) {
 
  558       if (dataFilters.empty()) {
 
  559         JNoticeStream(logger) << 
"No DataFilters in process list, or no process list. " 
  560                               << 
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
 
  563       sort(dataFilters.begin(), dataFilters.end(), compare);
 
  565       unsigned int numberOfDataFiltersOnThisMachine = 0;
 
  572         if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
 
  574           numberOfDataFiltersOnThisMachine++;
 
  576           if (i->port == this->port) {
 
  582       if (numberOfDataFiltersOnThisMachine == 0) {
 
  583         JNoticeStream(logger) << 
"Zero data filters on this machine according to process list (if it exists). " 
  584                               << 
"Assuming one datafilter on this machine.";
 
  585         numberOfDataFiltersOnThisMachine = 1;
 
  588       if (thisProcess == dataFilters.end()) {
 
  592         error << 
"This process cannot be found in the process list. Why do I exist?";
 
  593         error << 
" my IP addresses:";
 
  599         error << 
" my port: " << this->port;
 
  600         error << 
" process list";
 
  603           error << 
' ' << i->hostname << 
':' << i->port;
 
  607       if (thisProcess != dataFilters.end() && thisProcess->index != 
getName()) {
 
  608         JErrorStream(logger) << 
"Mismatch between given process names: " 
  610                              << 
", but in the process list I am referred to as " << thisProcess->index;
 
  613       if (dataFilters.begin() == thisProcess || dataFilters.empty()) {                    
 
  617       if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {     
 
  619         maxQueueSize  =  (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
 
  621         JNoticeStream(logger) << 
"Maximum queue size is too large given the number of processes on this machine. " 
  622                               << 
"Queue size reduced to "  
  623                               << maxQueueSize << 
" bytes." ;  
 
  628       if (parameters.disableHighRateVeto) {
 
  630         JNoticeStream(logger) << 
"Disabling high-rate veto of all PMTs.";
 
  639       triggerNB      .reset(
new JTriggerNB      (parameters));
 
  647         JNoticeStream(logger) << 
"This data filter process will report.";
 
  649         JDebugStream (logger) << 
"Trigger parameters: "   << parameters;
 
  651         JNoticeStream(logger) << 
"Update period [s]: "    << getClockInterval();
 
  663       if (buildL1.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL1."; }
 
  664       if (buildL2.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL2."; }
 
  665       if (buildSN.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildSN."; }
 
  666       if (buildNB.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildSN."; }
 
  674       if (c_buffer.is_enabled()) {
 
  676         if (!c_buffer.is_open()) {
 
  680           if (c_buffer.is_open()) {
 
  684             JStatusStream(logger) << 
"Created circular buffer " << c_buffer;
 
  688             JErrorStream (logger) << 
"Failed to create circular buffer in directory <" << c_buffer.path << 
">; disable functionality.";
 
  693           JNoticeStream(logger) << 
"Continue using circular buffer " << c_buffer;
 
  697       if (c_buffer.is_open()) {
 
  718       current_slice_index = -1;
 
  719       dataqueue_slice_index.clear();
 
  724       numberOfTimeslicesProcessed           = 0;
 
  725       numberOfIncompleteTimeslicesProcessed = 0;
 
  727       number_of_packets_received  = 0;
 
  728       number_of_packets_discarded = 0;
 
  729       number_of_bytes_received    = 0;
 
  732       minFrameNumber = numeric_limits<int>::max();
 
  733       maxFrameNumber = numeric_limits<int>::min();
 
  739       logErrorRun         .reset();
 
  740       logErrorDetector    .reset();
 
  741       logErrorIndex       .reset();
 
  742       logErrorIncomplete  .reset();
 
  743       logErrorOvercomplete.reset();
 
  765       if (!timeslices.empty()) {
 
  767         JNoticeStream(logger) << 
"Flushing " << timeslices.size() << 
" slices.";
 
  769         for (deque<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.end(); ++i) {
 
  778         deque<JDAQTimesliceL0> buffer;
 
  780         timeslices.swap(buffer);
 
  783       if (queueSize != 0) {
 
  784         JWarningStream(logger) << 
"Pending data in queue " << queueSize << 
" [B]";
 
  787       current_slice_index = -1;
 
  788       dataqueue_slice_index.clear();
 
  801     virtual void actionStop(
int length, 
const char* buffer)
 override 
  809       if (serversocket.is_valid()) {
 
  810         serversocket->shutdown();
 
  813       serversocket.reset();
 
  817     virtual void actionQuit(
int length, 
const char* buffer)
 override 
  825       if (serversocket.is_valid()) {
 
  826         mask.
set(*serversocket);
 
  829       for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {      
 
  830         if (!channel->isReady()) {
 
  831           mask.
set(channel->getFileDescriptor());   
 
  841       for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
 
  845           if (mask.
has(channel->getFileDescriptor())) {
 
  849           if (channel->isReady()) {
 
  851             number_of_packets_received += 1;
 
  852             number_of_reads            += channel->getCounter();
 
  853             number_of_bytes_received   += channel->size();
 
  858                 updateFrameQueue(*channel);
 
  860               catch(
const std::exception& error) {
 
  862                 JErrorStream(logErrorRun) << 
"Update frame queue " << channel->getFileDescriptor() << 
' ' << channel->size() << 
' ' << error.what(); 
 
  864                 number_of_packets_discarded += 1;
 
  869               JErrorStream(logErrorRun) << 
"Receiving data while not running.";
 
  871               number_of_packets_discarded += 1;
 
  879         catch(
const std::exception& error) {
 
  882             JErrorStream(logger) << 
"Disconnect channel " << channel->getFileDescriptor() << 
' ' << error.what(); 
 
  887           channel = channelList.erase(channel);
 
  892       if (serversocket.is_valid()) {
 
  894         if (mask.
has(*serversocket)) {
 
  896           JTCPSocket socket(serversocket->getFileDescriptor());
 
  910       if (!timeslices.empty()) {
 
  912         const size_t number_of_frames  =  
getNumberOfFrames(frames_per_slice, factor_per_slice);
 
  913         const size_t maximum_in_queue  =  
getMaximum(
make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (
size_t) 0);
 
  915         if (((timeslices[0].size()          >=  number_of_frames &&                
 
  916               timeslices[0].
getFrameIndex() <   dataqueue_slice_index.min())   ||
 
  918              (maximum_in_queue              >=  number_of_frames &&                
 
  919               timeslices[0].getFrameIndex() <   dataqueue_slice_index.min())   ||
 
  921              (timeslices.size()             >=  maxQueueDepth)                 ||  
 
  922              (queueSize                     >=  maxQueueSize))) {
 
  929           minFrameNumber      = min(minFrameNumber, pending_slice.
getFrameIndex());
 
  930           maxFrameNumber      = max(maxFrameNumber, pending_slice.
getFrameIndex());
 
  932           for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
 
  933             modules.insert(i->getModuleID());
 
  941             if (!pending_slice.empty()) {
 
  942               processTimeSlice(pending_slice);
 
  947             numberOfTimeslicesProcessed += 1;
 
  953             JErrorStream(logErrorRun) << 
"Skip processing of data while not running.";
 
  957           if (modules.size() > frames_per_slice) {
 
  959             JErrorStream(logErrorOvercomplete) << 
"More active modules than expected " 
  960                                                << modules.size() << 
" > " << frames_per_slice
 
  961                                                << 
" adjusting frames per slice to " << modules.size();
 
  963             frames_per_slice = modules.size();
 
  967           if (pending_slice.size() < number_of_frames) {
 
  969             numberOfIncompleteTimeslicesProcessed += 1;
 
  973             error << 
"Timeout -> processed incomplete timeslice: "  
  975                   << 
"Size of timeslice = " << pending_slice.size()          << 
';' 
  976                   << 
"Queue depth = "       << timeslices.size()             << 
';' 
  977                   << 
"Queue size = "        << queueSize                     << 
';' 
  978                   << 
"DataQueue min = "     << dataqueue_slice_index.min()   << 
';' 
  979                   << 
"DataQueue max = "     << dataqueue_slice_index.max()   << 
';';
 
  981             if (maximum_in_queue >= number_of_frames) {
 
  983               error << 
" intermittent problem -> continues as-is";
 
  989               for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
 
  990                 modules.insert(i->getModuleID());
 
  993               error << 
" adjusting frames per timeslice from " << frames_per_slice << 
" to " << modules.size();
 
  995               frames_per_slice = modules.size();
 
 1002           timeslices.pop_front();
 
 1025       using namespace std;
 
 1037         JErrorStream(logErrorRun) << 
"Size of received data does not match size reported by preamble: "  
 1038                                   << 
"preamble.getLength() = " << preamble.
getLength() << 
';' 
 1039                                   << 
"channel.size(): "        << channel.
size()       << 
';';
 
 1041         number_of_packets_discarded += 1;
 
 1050                                   << 
" -> Dropping frame.";
 
 1052         number_of_packets_discarded += 1;
 
 1059         number_of_packets_discarded += 1;
 
 1061         if (modules.insert(header.
getModuleID()).second) {
 
 1063           frames_per_slice = modules.size();
 
 1067                                       << 
" -> dropping frame;"  
 1068                                       << 
" increase number of frames expected to: " << frames_per_slice;
 
 1074       if (current_slice_index != -1 && header.
getFrameIndex() > current_slice_index + maximal_frame_index) {
 
 1076         number_of_packets_discarded += 1;
 
 1078         JErrorStream(logErrorIndex) << 
"Frame index " << header.
getFrameIndex() << 
" > " << current_slice_index + maximal_frame_index 
 
 1080                                     << 
" -> Dropping frame.";
 
 1085       deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
 
 1087       while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.
getFrameIndex()) {
 
 1088         ++timesliceIterator;
 
 1091       if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.
getFrameIndex()) {
 
 1099         timesliceIterator = timeslices.insert(timesliceIterator, 
JDAQTimesliceL0());
 
 1103         queueSize += 
getSizeof(*timesliceIterator);
 
 1108       in >> 
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
 
 1110       queueSize += 
getSizeof(*timesliceIterator->rbegin());
 
 1123       using namespace std;
 
 1127         timesliceRouter->configure(timeslice);
 
 1129         if (parameters.writeSummary()) {
 
 1133         if (parameters.trigger3DMuon.enabled    || 
 
 1134             parameters.trigger3DShower.enabled  || 
 
 1135             parameters.triggerMXShower.enabled  || 
 
 1136             parameters.triggerNB.enabled        || 
 
 1137             parameters.writeL0.prescale         ||
 
 1138             parameters.writeL1.prescale         ||
 
 1139             parameters.writeL2.prescale         ||
 
 1140             parameters.writeSN.prescale         ||
 
 1141             c_buffer.is_enabled()) {
 
 1150           for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
 
 1152             if (moduleRouter->hasModule(frame->getModuleID())) {
 
 1161                                        << 
"module = "      << frame->getModuleID()      << 
";" 
 1162                                        << 
"discard" << (dumpCount < dumpLimit ? 
" and dump" : 
"");
 
 1164                 if (dumpCount < dumpLimit && 
result.has(dumpMask)) {
 
 1165                   timesliceTX.push_back(*frame);
 
 1171               const JModule&   module = moduleRouter->getModule(frame->getModuleID()); 
 
 1172               JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
 
 1184               if (parameters.triggerNB.enabled) {
 
 1188                 if (buffer.begin() != __end) {
 
 1191                                                         frame->getModuleIdentifier(),
 
 1196                   (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
 
 1198                   (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
 
 1205                                                     frame->getModuleIdentifier(),
 
 1208               (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
 
 1213                                                     frame->getModuleIdentifier(),
 
 1216               (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
 
 1221                                                     frame->getModuleIdentifier(),
 
 1224               (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
 
 1228               JErrorStream(logErrorDetector) << 
"No detector information for module " << frame->getModuleID(); 
 
 1232           if (!timesliceTX.empty()) {
 
 1234             if (dumpCount < dumpLimit) {
 
 1236               this->put(timesliceTX);
 
 1244           if (parameters.triggerNB.enabled) {      
 
 1248             for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
 
 1250               if (parameters.triggerNB.write()) {
 
 1257                                     parameters.TMaxLocal_ns, 
 
 1258                                     parameters.triggerNB.DMax_m,
 
 1269           (*trigger3DMuon)  (trigger_input, back_inserter(trigger_output));
 
 1270           (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
 
 1271           (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
 
 1275           for (JTriggerOutput::const_iterator event = trigger_output.begin(); 
event != trigger_output.end(); ++event) {
 
 1281             numberOfEvents += 1;
 
 1284           if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
 
 1288             if (parameters.writeL1)  { this->put(
object);  }
 
 1289             if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
 
 1292           if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
 
 1296             if (parameters.writeL2)  { this->put(
object); }
 
 1297             if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
 
 1300           if (parameters.writeSN() || c_buffer.sizeSN > 0) {
 
 1304             if (parameters.writeSN)  { this->put(
object); }
 
 1305             if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
 
 1308           if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
 
 1310             if (parameters.writeL0)  { this->put(timeslice); }
 
 1311             if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
 
 1315       } 
catch(
const std::exception& error) {
 
 1317         JErrorStream(logger) << 
"Error = "       << error.what()              << 
";" 
 1320                              << 
"time slice not correctly processed;" 
 1321                              << 
"discard" << (dumpCount < dumpLimit ? 
" and dump" : 
"");
 
 1323         if (dumpCount < dumpLimit) {
 
 1331       timesliceRouter->reset();
 
 1342       const double T_us = (double) timer.usec_wall;
 
 1344       JStatusStream(logger) << 
"Elapsed real (wall) time [s] "          << T_us / 1e6;
 
 1345       JStatusStream(logger) << 
"Elapsed user CPU time [s] "             << (double) timer.usec_ucpu/ 1e6;
 
 1346       JStatusStream(logger) << 
"Elapsed system CPU time [s] "           << (double) timer.usec_scpu/ 1e6;
 
 1347       JStatusStream(logger) << 
"Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3  << 
" <= " << Qt.getXmax() * 1.0e-3;
 
 1348       JStatusStream(logger) << 
"Number of packets received/discarded "  << number_of_packets_received << 
"/" << number_of_packets_discarded;
 
 1349       JStatusStream(logger) << 
"Number of events/MB/us "                << numberOfEvents             << 
"/" << numberOfBytes/1e6 << 
"/" << Qx.getMean(0.0);
 
 1351       if (number_of_packets_received > 0) {
 
 1352         JStatusStream(logger) << 
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
 
 1355       JStatusStream(logger) << 
"Current queue depth/size " << timeslices.size() << 
"/" << queueSize;
 
 1356       JStatusStream(logger) << 
"Current number of frames per slice expected: " << frames_per_slice << 
' ' << 
FIXED(5,3) << factor_per_slice;
 
 1358       JStatusStream(logger) << 
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << 
"/" << numberOfIncompleteTimeslicesProcessed;
 
 1360       const double processedSlicesTime_us   =  numberOfTimeslicesProcessed      * 
getFrameTime() / 1000;
 
 1361       const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * 
getFrameTime() / 1000;
 
 1363       if (processedSlicesTime_us   > 0) {
 
 1364         JStatusStream(logger) << 
"Performance factor (inaccurate estimate): "                << T_us / processedSlicesTime_us;
 
 1366       if (processedDetectorTime_us > 0) {
 
 1367         JStatusStream(logger) << 
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
 
 1383       using namespace std;
 
 1385       JDebugStream(logger) << 
"Received message <" << tag.
toString() << 
"> \"" << string(buffer, length) << 
"\"";
 
 1389         if (c_buffer.is_open()) {
 
 1391           JStatusStream(logger) << 
"Archive circular buffer in <" << c_buffer.archive << 
">";
 
 1393           c_buffer.close(
true);
 
 1396         if (c_buffer.is_enabled()) {
 
 1400           if (c_buffer.is_open()) {
 
 1402             JStatusStream(logger) << 
"Created circular buffer " << c_buffer;
 
 1408             JErrorStream (logger) << 
"Failed to create circular buffer in directory <" << c_buffer.path << 
">; disable functionality.";
 
 1440         datawriter->put(
object);
 
 1448       catch(
const std::exception& error) {
 
 1449         JErrorStream(logger) << 
"Error \"" << error.what() << 
"\"; trigger ev_error.";
 
 1481         if (this->empty()) {
 
 1487           int min = std::numeric_limits<int>::max();
 
 1489           for (const_iterator i = this->begin(); i != this->end(); ++i) {
 
 1490             if (i->second < min) {
 
 1506         if (this->empty()) {
 
 1512           int max = std::numeric_limits<int>::lowest();
 
 1514           for (const_iterator i = this->begin(); i != this->end(); ++i) {
 
 1515             if (i->second > max) {
 
 1524     } dataqueue_slice_index;
 
 1600   using namespace std;
 
 1601   using namespace JPP;
 
 1618     JParser<> zap(
"Application for real-time filtering of data.");
 
 1620     zap[
'H'] = 
make_field(server,      
"host name of server for command messages")               = 
"localhost";
 
 1621     zap[
'M'] = 
make_field(logger,      
"host name of server for logger messages")                = 
"localhost";
 
 1622     zap[
'D'] = 
make_field(hostname,    
"host name of server of data writer")                     = 
"";
 
 1623     zap[
'u'] = 
make_field(client_name, 
"client name")                                            = 
"%";
 
 1624     zap[
'P'] = 
make_field(port,        
"port to listen for incoming data from data queue");
 
 1625     zap[
'q'] = 
make_field(backlog,     
"back log")                                               =   1024;
 
 1626     zap[
'c'] = 
make_field(use_cout,    
"print to terminal");
 
 1627     zap[
'p'] = 
make_field(path,        
"directory for temporary storage of circular buffer")     = 
"/tmp/";
 
 1628     zap[
'A'] = 
make_field(archive,     
"directory for permanent archival of circular buffer")    = 
"/tmp/";
 
 1633   catch(
const std::exception& error) {
 
 1634     FATAL(error.what() << endl);
 
KM3NeT DAQ constants, bit handling, etc.
 
int main(int argc, char *argv[])
 
Data structure for detector geometry and calibration.
 
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
 
Basic data structure for L0 hit.
 
Basic data structure for L1 hit.
 
General purpose messaging.
 
Hostname and IP address functions.
 
Utility class to parse command line options.
 
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
 
I/O formatting auxiliaries.
 
Utility class to parse parameter values.
 
ROOT TTree parameter settings of various packages.
 
Scheduling of actions via fixed latency intervals.
 
Basic data structure for time and time over threshold information of hit.
 
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
 
void merge(const JMatch_t &match)
Merge events.
 
Router for direct addressing of module data in detector data structure.
 
Data structure for a composite optical module.
 
Utility class to parse parameter values.
 
bool read(const JEquation &equation)
Read equation.
 
Auxiliary class for CPU timing and usage.
 
const JPosition3D & getPosition() const
Get position.
 
const char * data() const
Get data.
 
int size() const
Get size.
 
int getFileDescriptor() const
Get file descriptor.
 
Simple data structure to support I/O of equations (see class JLANG::JEquation).
 
Auxiliary class for method select.
 
void set(const int file_descriptor)
Set file descriptor.
 
bool has(const int file_descriptor) const
Has file descriptor.
 
Exception for opening of file.
 
The template JSinglePointer class can be used to hold a pointer to an object.
 
Message logging based on ControlHost.
 
Interface for logging messages.
 
Message logger with time scheduler.
 
Message logging based on std::ostream.
 
Implemenation of object output through ControlHost.
 
void setReceiveBufferSize(const int size)
Set receive buffer size.
 
int getReceiveBufferSize() const
Set receive buffer size.
 
void setKeepAlive(const bool on)
Set keep alive of socket.
 
void setNonBlocking(const bool on)
Set non-blocking of I/O.
 
std::string toString() const
Convert tag to string.
 
Utility class to parse command line options.
 
JTreeWriter object output.
 
ROOT TTree object output.
 
1-dimensional frame with time calibrated data from one optical module.
 
2-dimensional frame with time calibrated data from one optical module.
 
std::vector< value_type >::iterator iterator
 
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
 
Auxiliary class to build JDAQTimeslice for L1 timeslice.
 
Time slice with calibrated data.
 
Auxiliary class to build JDAQEvent for a triggered event.
 
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
 
int getLength() const
Get length.
 
int getRunNumber() const
Get run number.
 
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
 
int getFrameIndex() const
Get frame index.
 
Control unit client base class.
 
void run()
Run as run control client following command messages via JNET::JControlHost.
 
virtual bool enter(const JArgs &args)
Enter the state machine.
 
int getModuleID() const
Get module identifier.
 
Auxiliary class for itemization of process list.
 
std::string index
index in process list
 
Data frame of one optical module.
 
static void reset()
Reset counter of unique instance of this class object.
 
Main class for real-time filtering of data.
 
long long int number_of_packets_received
 
JMessageScheduler logErrorRun
 
JSinglePointer< JServerSocket > serversocket
server for data queue connections
 
JMessageScheduler logErrorDetector
 
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
 
JCircularBuffer_t c_buffer
 
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
 
long long int numberOfBytes
 
long long int number_of_packets_discarded
 
JMessageScheduler logErrorIndex
 
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
 
virtual void actionInit(int length, const char *buffer) override
 
void put(const T &object)
Auxiliary method to send object to data server.
 
long long int number_of_bytes_received
 
virtual void actionContinue(int length, const char *buffer) override
 
std::vector< JSocketInputChannel_t > JChannelList_t
 
long long int maxQueueSize
 
int port
server socket port
 
long long int totalCPURAM
 
JTimeslice< hit_type > JTimeslice_t
 
void processTimeSlice(const JDAQTimesliceL0 ×lice)
Process time slice.
 
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
 
std::vector< JDAQProcess > dataQueues
 
JMessageScheduler logErrorOvercomplete
 
long long int number_of_reads
 
JSuperFrame2D< hit_type > JSuperFrame2D_t
 
virtual void actionStop(int length, const char *buffer) override
 
JSinglePointer< JBuildL2_t > buildNB
 
JSinglePointer< JBuildL2_t > buildL2
 
JSinglePointer< JBuildL1_t > buildL1
 
JBuildL1< hit_type > JBuildL1_t
 
virtual void actionExit() override
 
virtual void actionQuit(int length, const char *buffer) override
 
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
 
std::string hostname
host name of data server
 
virtual void actionConfigure(int length, const char *buffer) override
 
JSuperFrame1D< hit_type > JSuperFrame1D_t
 
long long int numberOfTimeslicesProcessed
 
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
 
JChannelList_t channelList
connections to data queue
 
long long int numberOfIncompleteTimeslicesProcessed
 
JSinglePointer< JTrigger3DMuon > trigger3DMuon
 
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
 
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
 
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
 
JMessageScheduler logErrorIncomplete
 
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
 
JSinglePointer< JModuleRouter > moduleRouter
 
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
 
JSinglePointer< JTimesliceRouter > timesliceRouter
 
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
 
JSinglePointer< JTriggerNB > triggerNB
 
long long int numberOfEvents
 
std::vector< JDAQProcess > dataFilters
 
JBuildL2< hit_type > JBuildL2_t
 
JSinglePointer< JTrigger3DShower > trigger3DShower
 
virtual void actionStart(int length, const char *buffer) override
 
virtual void actionPause(int length, const char *buffer) override
 
JSinglePointer< JTriggerMXShower > triggerMXShower
 
virtual void actionReset(int length, const char *buffer) override
 
JSinglePointer< JBuildL2_t > buildSN
 
JTriggerParameters parameters
 
void typeout()
Report status to message logger.
 
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
 
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
 
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
 
void close(std::istream *pf)
Close file.
 
T * open(const std::string &file_name)
Open file.
 
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
 
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
 
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
 
std::string trim(const std::string &buffer)
Trim string.
 
static const long long int GIGABYTE
Number of bytes in a mega-byte.
 
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
 
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
 
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
 
const char * getName()
Get ROOT name of given data type.
 
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
 
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
 
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
 
long long int localtime_t
Type definition of local time.
 
unsigned long long int getRAM()
Get RAM of this CPU.
 
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
 
static JStat getFileStatus
Function object for file status.
 
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
 
static const JChecksum checksum
Function object to perform check-sum of raw data.
 
KM3NeT DAQ data structures and auxiliaries.
 
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
 
int getFrameIndex(const double t_ns)
Get frame index for a given time in ns.
 
static const JNET::JTag RC_DFILTER
 
double getFrameTime()
Get frame time duration.
 
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
 
static const JNET::JTag RC_ALERT
 
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
 
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
 
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
 
static const JNET::JTag RC_CMD
 
size_t getSizeof(const JDAQEvent &object)
Get size of object.
 
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
 
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
 
static const JNET::JTag IO_TRIGGER_PARAMETERS
 
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
 
Auxiliary data structure for sequence of same character.
 
Auxiliary data structure for floating point format specification.
 
Match of two events considering overlap in time.
 
Auxiliary class for handling status.
 
Level specific message streamers.
 
Auxiliary class for all subscription.
 
Auxiliary class for date and time.
 
int getYear() const
year a.d.
 
int getDay() const
day of the month [1-31]
 
int getMonth() const
month of the year [1-12]
 
Auxiliary data structure for result of checksum.
 
@ EPMT_t
PMT number error.
 
@ TIME_t
Time order error.
 
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
 
Timeslice data structure for L0 data.
 
std::string archive
Directory for permanent archival.
 
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
 
std::string path
Directory for temporary storage.
 
Long64_t sizeL1
Number of L1 time slices.
 
Long64_t sizeSN
Number of SN time slices.
 
JTag tag
Unique tag of this process.
 
Long64_t sizeL0
Number of L0 time slices.
 
void close(const bool option)
Close file.
 
Long64_t sizeL2
Number of L2 time slices.
 
void disable()
Disable writing.
 
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
 
bool is_enabled() const
Check whether writing of data is enabled.