182       void open(
const std::string& path, 
const JTag& tag)
 
  188         for (
int i = 0; !this->is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
 
  204             this->
open(os.str().c_str());
 
  206           catch(
const exception& error) {}
 
  228         return (sizeL0 > 0 ||
 
  243         if (
object.is_open())
 
  244           out << 
object.getFile()->GetName();
 
  250         return out << 
object.sizeL0 << 
'/' 
  251                    << 
object.sizeL1 << 
'/' 
  252                    << 
object.sizeL2 << 
'/' 
  253                    << 
object.sizeSN << 
'/';
 
  290                 const std::string& server,
 
  291                 const std::string& hostname,
 
  297                 const std::string& path) :
 
  310       current_slice_index  = -1;
 
  321       if (c_buffer.is_open()) {
 
  323         JNoticeStream(logger) << 
"Close circular buffer " << c_buffer;
 
  334       JDebugStream(logger) << 
"actionInit() " << std::string(buffer,length);
 
  342       catch(
const std::exception& error) {
 
  343         JErrorStream(logger) << 
"Error \"" << error.what() << 
"\"; trigger ev_error.";
 
  353       JDebugStream(logger) << 
"actionConfigure() " << endl << std::string(buffer,length);
 
  355       long long int update_s = 10;
 
  356       long long int logger_s =  5;
 
  366       properties[
"dataWriter"]             = hostname;
 
  367       properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
 
  369       properties[
"triggerParameters"]      = parameters;
 
  370       properties[
"queueSize"]              = maxQueueSize  = (totalCPURAM - 
GIGABYTE);  
 
  371       properties[
"queueDepth"]             = maxQueueDepth = 20;                        
 
  372       properties[
"frameIndex"]             = maximal_frame_index = 100000;
 
  373       properties[
"logger_s"]               = logger_s;
 
  374       properties[
"update_s"]               = update_s;
 
  375       properties[
"JDataFilter"]            = dataFilters;
 
  376       properties[
"DataQueue"]              = dataQueues;
 
  377       properties[
"path"]                   = path;
 
  378       properties[
"c_sizeL0"]               = c_buffer.sizeL0;
 
  379       properties[
"c_sizeL1"]               = c_buffer.sizeL1;
 
  380       properties[
"c_sizeL2"]               = c_buffer.sizeL2;
 
  381       properties[
"c_sizeSN"]               = c_buffer.sizeSN;
 
  384         properties.
read(
string(buffer, length));
 
  386       catch(
const exception& error) {
 
  390       if (update_s <=  0) { update_s = 1; }
 
  391       if (logger_s <=  0) { logger_s = 1; }
 
  393       setClockInterval(update_s * 1000000LL);
 
  395       hostname = 
trim(hostname);
 
  400         throw JException(
"Undefined data writer host name.");
 
  402       maximum_frames_per_slice = frames_per_slice;
 
  406       if (dataFilters.empty()) {
 
  407         JNoticeStream(logger) << 
"No DataFilters in process list, or no process list. " 
  408                               << 
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
 
  411       sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
 
  413       unsigned int numberOfDataFiltersOnThisMachine = 0;
 
  421         notice << 
"My IP addresses:";
 
  430         JDebugStream(logger) << 
"Test IP address \"" << i->hostname << 
"\" " << (find(IP.begin(), IP.end(), i->hostname) != IP.end());
 
  432         if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
 
  434           numberOfDataFiltersOnThisMachine++;
 
  436           if (i->port == this->port) {
 
  442       if (numberOfDataFiltersOnThisMachine == 0) {
 
  443         JNoticeStream(logger) << 
"Zero data filters on this machine according to process list (if it exists). " 
  444                               << 
"Assuming one datafilter on this machine.";
 
  445         numberOfDataFiltersOnThisMachine = 1;
 
  448       if (thisProcess == dataFilters.end()) {
 
  449         JErrorStream(logger) << 
"This process cannot be found in the process list. Why do I exist?";    
 
  452       if (thisProcess != dataFilters.end() && thisProcess->index != 
getName()) {
 
  453         JErrorStream(logger) << 
"Mismatch between given process names: " 
  455                              << 
", but in the process list I am referred to as " << thisProcess->index;
 
  458       if (dataFilters.begin() == thisProcess || dataFilters.empty()) {                    
 
  462       if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {     
 
  464         maxQueueSize  =  (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
 
  466         JNoticeStream(logger) << 
"Maximum queue size is too large given the number of processes on this machine. " 
  467                               << 
"Queue size reduced to "  
  468                               << maxQueueSize << 
" bytes." ;  
 
  475       triggerNB      .reset(
new JTriggerNB      (parameters));
 
  483         JNoticeStream(logger) << 
"This data filter process will report.";
 
  484         JNoticeStream(logger) << 
"Number of modules: "    << (*moduleRouter)->size();
 
  485         JDebugStream (logger) << 
"Trigger parameters: "   << parameters;
 
  487         JNoticeStream(logger) << 
"Update period [s]: "    << getClockInterval();
 
  498       if (buildL1.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL1."; }
 
  499       if (buildL2.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL2."; }
 
  500       if (buildSN.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildSN."; }
 
  507       if (c_buffer.is_enabled()) {
 
  509         if (!c_buffer.is_open()) {
 
  513           if (c_buffer.is_open()) {
 
  515             JNoticeStream(logger) << 
"Created circular buffer " << c_buffer;
 
  521             JErrorStream (logger) << 
"Failed to create circular buffer in directory <" << path << 
">; disable functionality.";
 
  528           JNoticeStream(logger) << 
"Continue using circular buffer " << c_buffer;
 
  538         if (c_buffer.is_open()) {
 
  555       current_slice_index = -1;
 
  560       numberOfTimeslicesProcessed           = 0;
 
  561       numberOfIncompleteTimeslicesProcessed = 0;
 
  563       number_of_packets_received  = 0;
 
  564       number_of_packets_discarded = 0;
 
  565       number_of_bytes_received    = 0;
 
  568       minFrameNumber = numeric_limits<int>::max();
 
  569       maxFrameNumber = numeric_limits<int>::min();
 
  575       logErrorRun       .reset();
 
  576       logErrorDetector  .reset();
 
  577       logErrorIndex     .reset();
 
  578       logErrorIncomplete.reset();
 
  589       os << getRunNumber() << 
' ' << parameters;
 
  599       if (!timeslices.empty()) {
 
  601         JNoticeStream(logger) << 
"Flushing " << timeslices.size() << 
" slices.";
 
  604           queueSize -= i->getSize();
 
  614         timeslices.swap(buffer);
 
  617       if (queueSize != 0) {
 
  618         JWarningStream(logger) << 
"Pending data in queue " << queueSize << 
" [B]";
 
  621       current_slice_index = -1;
 
  644       serversocket.reset();
 
  656       if (serversocket.is_valid()) {
 
  657         mask.
set(*serversocket);
 
  660       for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {      
 
  661         if (!channel->isReady()) {
 
  662           mask.
set(channel->getFileDescriptor());   
 
  672       for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
 
  676           if (mask.
has(channel->getFileDescriptor())) {
 
  680           if (channel->isReady()) {
 
  682             number_of_packets_received += 1;
 
  683             number_of_reads            += channel->getCounter();
 
  684             number_of_bytes_received   += channel->size();
 
  688               updateFrameQueue(channel);
 
  692               JErrorStream(logErrorRun) << 
"Receiving data while not running.";
 
  694               number_of_packets_discarded += 1;
 
  702         catch(
const exception& error) {
 
  704           JNoticeStream(logger) << 
"Disconnecting channel " << channel->getFileDescriptor() << 
' ' << error.what(); 
 
  708           channel = channelList.erase(channel);
 
  713       if (serversocket.is_valid()) {
 
  715         if (mask.
has(*serversocket)) {
 
  719           socket.
accept(serversocket->getFileDescriptor());
 
  734       if (!timeslices.empty() && (timeslices.front().size() >=  frames_per_slice ||
 
  735                                   timeslices.size()         >=  maxQueueDepth    ||
 
  736                                   queueSize                 >=  maxQueueSize)) {
 
  739         queueSize                            -= pending_slice.
getSize();
 
  742         minFrameNumber      = min(minFrameNumber, pending_slice.
getFrameIndex());
 
  743         maxFrameNumber      = max(maxFrameNumber, pending_slice.
getFrameIndex());
 
  745         if (pending_slice.size() > frames_per_slice) {
 
  747           JErrorStream(logger) << 
"More frames in timeslice than expected " 
  748                                << pending_slice.size() << 
" > " << frames_per_slice;
 
  750           if (pending_slice.size() <= maximum_frames_per_slice) { 
 
  752             JErrorStream(logger) << 
"Adjusting expected frames per timeslice to " << pending_slice.size();
 
  754             frames_per_slice = pending_slice.size();
 
  758         if (!pending_slice.empty()) {
 
  762           processTimeSlice(pending_slice);
 
  766           numberOfTimeslicesProcessed += 1;
 
  770           if (pending_slice.size() < frames_per_slice) {
 
  772             numberOfIncompleteTimeslicesProcessed += 1;
 
  774             JErrorStream(logErrorIncomplete) << 
"Timeout -> processed incomplete timeslice: "  
  776                                              << 
"Size of timeslice = " << pending_slice.size()          << 
';' 
  777                                              << 
"Queue depth = "       << timeslices.size()             << 
';' 
  778                                              << 
"Queue size = "        << queueSize;
 
  780             if (!timeslices.empty()) {
 
  782               JErrorStream(logger) << 
"Adjusting expected frames per timeslice from " << frames_per_slice << 
" to " << pending_slice.size();
 
  784               frames_per_slice = pending_slice.size();
 
  789         timeslices.pop_front();
 
  821       if (preamble.getLength() != channel->size()) {
 
  823         JErrorStream(logErrorRun) << 
"Size of received data does not match size reported by preamble: "  
  824                                   << 
"preamble.getLength() = " << preamble.getLength() << 
';' 
  825                                   << 
"channel->size(): "       << channel->size()      << 
';';
 
  827         number_of_packets_discarded += 1;
 
  832       if (header.getRunNumber() != getRunNumber()) {
 
  834         JErrorStream(logErrorRun) << 
"Run number "              << header.getRunNumber()
 
  835                                   << 
" unequal to current run " << getRunNumber()
 
  836                                   << 
" -> Dropping frame.";
 
  838         number_of_packets_discarded += 1;
 
  843       if (header.getFrameIndex() <= current_slice_index) {
 
  845         JErrorStream(logErrorIndex) << 
"Frame index " << header.getFrameIndex() << 
" <= " << current_slice_index
 
  846                                     << 
" -> Dropping frame.";
 
  848         number_of_packets_discarded += 1;
 
  850         if (frames_per_slice < maximum_frames_per_slice) {
 
  854           JErrorStream(logErrorIndex) << 
"Increase number of frames expected to: " << frames_per_slice;
 
  860       if (header.getFrameIndex() > current_slice_index + maximal_frame_index) {
 
  862         JErrorStream(logErrorIndex) << 
"Frame index " << header.getFrameIndex() << 
" > " << current_slice_index + maximal_frame_index 
 
  863                                     << 
" -> Dropping frame.";
 
  865         number_of_packets_discarded += 1;
 
  872       while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
 
  876       if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
 
  884         timesliceIterator = timeslices.insert(timesliceIterator, 
JDAQTimesliceL0());
 
  886         timesliceIterator->setDAQChronometer(header.getDAQChronometer());
 
  888         queueSize += timesliceIterator->getSize();
 
  893       in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
 
  895       queueSize += timesliceIterator->rbegin()->getSize();
 
  910         if (parameters.writeSummary()) {
 
  914         if (parameters.trigger3DMuon.enabled    || 
 
  915             parameters.trigger3DShower.enabled  || 
 
  916             parameters.triggerMXShower.enabled  || 
 
  917             parameters.triggerNB.enabled        || 
 
  918             parameters.writeL0.prescale         ||
 
  919             parameters.writeL1.prescale         ||
 
  920             parameters.writeL2.prescale         ||
 
  921             parameters.writeSN.prescale         ||
 
  922             c_buffer.is_enabled()) {
 
  924           timesliceRouter->configure(timeslice);
 
  933           for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
 
  935             if (moduleRouter->hasModule(frame->getModuleID())) {
 
  942                                      << 
"module = "      << frame->getModuleID()      << 
";" 
  943                                      << 
"discard and dump";
 
  945                 timesliceTX.push_back(*frame);
 
  950               const JModule&   module = moduleRouter->getModule(frame->getModuleID()); 
 
  951               JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
 
  963               if (parameters.triggerNB.enabled) {
 
  967                 if (buffer.begin() != __end) {
 
  970                                                         frame->getModuleIdentifier(),
 
  973                   (*buildL1)(buffer.begin(), __end , back_inserter(*timesliceNB.rbegin()));
 
  980                                                     frame->getModuleIdentifier(),
 
  983               (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
 
  988                                                     frame->getModuleIdentifier(),
 
  991               (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
 
  996                                                     frame->getModuleIdentifier(),
 
  999               (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
 
 1003               JErrorStream(logErrorDetector) << 
"No detector information for module " << frame->getModuleID(); 
 
 1007           if (!timesliceTX.empty()) {
 
 1008             this->put(timesliceTX);
 
 1013           if (parameters.triggerNB.enabled) {      
 
 1017             for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.
end(); ++hit) {
 
 1024                                   parameters.TMaxLocal_ns, 
 
 1025                                   parameters.triggerNB.DMax_m,
 
 1035           (*trigger3DMuon)  (trigger_input, back_inserter(trigger_output));
 
 1036           (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
 
 1037           (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
 
 1041           for (JTriggerOutput::const_iterator event = trigger_output.begin(); 
event != trigger_output.end(); ++event) {
 
 1045             if (this->put(
object)) {        
 
 1046               numberOfEvents += 1;
 
 1050           if (parameters.writeL1() || c_buffer.sizeL1 > 0) {
 
 1054             if (parameters.writeL1)  { this->put(
object);  }
 
 1055             if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
 
 1058           if (parameters.writeL2() || c_buffer.sizeL2 > 0) {
 
 1062             if (parameters.writeL2)  { this->put(
object); }
 
 1063             if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
 
 1066           if (parameters.writeSN() || c_buffer.sizeSN > 0) {
 
 1070             if (parameters.writeSN)  { this->put(
object); }
 
 1071             if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
 
 1074           if (parameters.writeL0() || c_buffer.sizeL0 > 0) {
 
 1076             if (parameters.writeL0)  { this->put(timeslice); }
 
 1077             if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
 
 1081       } 
catch(
const exception& error) {
 
 1082         JErrorStream(logger) << 
"Error = "       << error.what()              << 
";" 
 1085                              << 
"time slice not correctly processed!";
 
 1097       const double T_us = (double) timer.usec_wall;
 
 1099       JNoticeStream(logger) << 
"Elapsed real (wall) time [s] "          << T_us / 1e6;
 
 1100       JNoticeStream(logger) << 
"Elapsed user CPU time [s] "             << (double) timer.usec_ucpu/ 1e6;
 
 1101       JNoticeStream(logger) << 
"Elapsed system CPU time [s] "           << (double) timer.usec_scpu/ 1e6;
 
 1103         JNoticeStream(logger) << 
"Elapsed real time per time slice [ms] " << Qt.getMean() * 1.0e-3  << 
" +/- " << Qt.getDeviation() * 1.0e-3;
 
 1105       catch(
const std::exception&) {}
 
 1106       JNoticeStream(logger) << 
"Number of packets received/discarded "  << number_of_packets_received << 
"/" << number_of_packets_discarded;
 
 1107       JNoticeStream(logger) << 
"Number of events/MB sent "              << numberOfEvents             << 
"/" << numberOfBytes/1e6;
 
 1109       if (number_of_packets_received > 0) {
 
 1110         JNoticeStream(logger) << 
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
 
 1113       JNoticeStream(logger) << 
"Current queue depth/size " << timeslices.size() << 
"/" << queueSize;
 
 1114       JNoticeStream(logger) << 
"Current number of frames per slice expected: " << frames_per_slice;
 
 1116       JNoticeStream(logger) << 
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << 
"/" << numberOfIncompleteTimeslicesProcessed;
 
 1118       if (numberOfTimeslicesProcessed > 0) {
 
 1119         JNoticeStream(logger) << 
"Real time per timeslice [ms] "       << timer.usec_wall / 1000 / numberOfTimeslicesProcessed;
 
 1120         JNoticeStream(logger) << 
"User CPU  time per timeslice [ms] "  << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed;
 
 1121         JNoticeStream(logger) << 
"System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed;
 
 1124       const double processedSlicesTime_us   =  numberOfTimeslicesProcessed      * 
getFrameTime() / 1000;
 
 1125       const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) * 
getFrameTime() / 1000;
 
 1127       if (processedSlicesTime_us   > 0) {
 
 1128         JNoticeStream(logger) << 
"Performance factor (inaccurate estimate): "                << T_us / processedSlicesTime_us;
 
 1130       if (processedDetectorTime_us > 0) {
 
 1131         JNoticeStream(logger) << 
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
 
 1147       using namespace std;
 
 1149       JDebugStream(logger) << 
"Received message <" << tag.
toString() << 
"> \"" << string(buffer, length) << 
"\"";
 
 1153         if (c_buffer.is_open()) {
 
 1155           JNoticeStream(logger) << 
"Close circular buffer " << c_buffer;
 
 1160         if (c_buffer.is_enabled()) {
 
 1164           if (c_buffer.is_open()) {
 
 1166             JNoticeStream(logger) << 
"Created circular buffer " << c_buffer;
 
 1172             JErrorStream (logger) << 
"Failed to create circular buffer in directory <" << path << 
">; disable functionality.";
 
 1186     static const int MAXIMUM_FILE_NUMBER = 100; 
 
 1205         datawriter->put(
object);
 
 1207         numberOfBytes += 
object.getSize();
 
 1211       catch(
const std::exception& error) {
 
 1304   using namespace std;
 
 1305   using namespace JPP;
 
 1322     JParser<> zap(
"Application for real-time filtering of data.");
 
 1327     zap[
'u'] = 
make_field(client_name)     = 
"JDataFilter";
 
 1337   catch(
const exception& error) {
 
 1338     FATAL(error.what() << endl);
 
 1342   JLogger* out = NULL;
 
 1345     out = 
new JStreamLogger(cout);
 
 1347     out = 
new JControlHostLogger(logger);