67 using namespace KM3NETDAQ;
 
  116               const std::string& server,
 
  117               const std::string& hostname,
 
  128     this->backlog     = backlog;
 
  135     current_slice_index  = -1;
 
  149     catch(
const std::exception& error) {
 
  159     serversocket.reset();
 
  166     JDebugStream(logger) << 
"actionInit() " << std::string(buffer,length);
 
  181     JDebugStream(logger) << 
"actionConfigure() " << std::string(buffer,length);
 
  183     long long int logger_s =  5;
 
  194     properties[
"dataWriter"]             = hostname;
 
  195     properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
 
  197     properties[
"triggerParameters"]      = parameters;
 
  198     properties[
"queueSize"]              = maxQueueSize  = (totalCPURAM - 
GIGABYTE);  
 
  199     properties[
"queueDepth"]             = maxQueueDepth = 20;                        
 
  200     properties[
"logger_s"]               = logger_s;
 
  201     properties[
"JDataFilter"]            = dataFilters;
 
  202     properties[
"DataQueue"]              = dataQueues;
 
  203     properties[
"pmtIdCheck"]             = checkForInvalidPMTs = 
false;
 
  204     properties[
"maxHitsFrame"]           = maxHitsFrame  = 31*2000;                    
 
  207       properties.
read(
string(buffer, length));
 
  209     catch(
const exception& error) {
 
  213     hostname = 
trim(hostname);
 
  218       throw JException(
"Undefined data writer host name.");
 
  220     maximum_frames_per_slice = frames_per_slice;
 
  224     if(dataFilters.empty()) {
 
  225       JNoticeStream(logger) << 
"No DataFilters in process list, or no process list. Assuming that this process is the only process on this CPU and setting parameters accordingly.";
 
  231     unsigned int numberOfDataFiltersOnThisMachine = 0;
 
  239         numberOfDataFiltersOnThisMachine++;
 
  241         if (i->port == this->port) {
 
  247     if (numberOfDataFiltersOnThisMachine == 0) {
 
  248       JNoticeStream(logger) << 
"Zero datafilters on this machine according to process list (if it exists). Assuming one datafilter on this machine.";
 
  249       numberOfDataFiltersOnThisMachine = 1;
 
  252     if (thisProcess == dataFilters.end()) {
 
  253       JErrorStream(logger) << 
"This process cannot be found in the process list. Why do I exist?";      
 
  256     if (thisProcess != dataFilters.end() && thisProcess->index != 
getName()) {
 
  257       JErrorStream(logger) << 
"Mismatch between given process names: " 
  258                            << 
"I am called " << 
getName() << 
", but in the process list I am referred to as " << thisProcess->index;
 
  261     if (dataFilters.begin() == thisProcess || dataFilters.empty()) {                    
 
  265     if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {     
 
  267       maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
 
  269       JNoticeStream(logger) << 
"Maximum queue size is too large given the number of processes on this machine. Queue size reduced to "  
  270                             << maxQueueSize << 
" bytes." ;  
 
  284       JNoticeStream(logger) << 
"This JDataFilter process will report.";
 
  285       JNoticeStream(logger) << 
"Number of modules: "    << (*moduleRouter)->size();
 
  286       JNoticeStream(logger) << 
"Trigger parameters: "   << parameters;
 
  288       JNoticeStream(logger) << 
"Clock interval: "       << getClockInterval();
 
  299     if (buildL1.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL1."; }
 
  300     if (buildL2.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildL2."; }
 
  301     if (buildSN.get() == NULL) { 
JErrorStream(logger) << 
"Failed to allocate buildSN."; }
 
  319     current_slice_index = -1;
 
  324     numberOfTimeslicesProcessed           = 0;
 
  325     numberOfIncompleteTimeslicesProcessed = 0;
 
  327     numberOfFramesWithInvalidPMTs         = 0;
 
  329     number_of_packets_received  = 0;
 
  330     number_of_packets_discarded = 0;
 
  331     number_of_bytes_received    = 0;
 
  334     minFrameNumber = numeric_limits<int>::max();
 
  335     maxFrameNumber = numeric_limits<int>::min();
 
  341     logErrorRun        .reset();
 
  342     logErrorDetector   .reset();
 
  343     logErrorIndex      .reset();
 
  344     logErrorIncomplete .reset();
 
  345     logErrorInvalidPMTs.reset();
 
  354     os << getRunNumber() << 
' ' << parameters;
 
  362     if (!timeslices.empty()) {
 
  364       JNoticeStream(logger) << 
"Flushing " << timeslices.size() << 
" slices.";
 
  367         queueSize -= i->getSize();
 
  373     if (queueSize != 0) {
 
  374       JWarningStream(logger) << 
"Pending data in queue [B] " << queueSize;
 
  377     current_slice_index = -1;
 
  408     if (serversocket.is_valid()) {
 
  409       mask.set(*serversocket);
 
  412     for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {      
 
  413       if (!channel->isReady()) {
 
  414         mask.set(channel->getFileDescriptor());   
 
  422     for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
 
  426         if (mask.has(channel->getFileDescriptor())) {
 
  430         if (channel->isReady()) {
 
  432           number_of_packets_received += 1;
 
  433           number_of_reads            += channel->getCounter();
 
  434           number_of_bytes_received   += channel->size();
 
  438             updateFrameQueue(channel);
 
  442             JErrorStream(logErrorRun) << 
"Receiving data while not running.";
 
  444             number_of_packets_discarded += 1;
 
  454         JNoticeStream(logger) << 
"Disconnecting channel " << channel->getFileDescriptor() << 
' ' << error.
what(); 
 
  458         channel = channelList.erase(channel);
 
  463     if (serversocket.is_valid()) {
 
  465       if (mask.has(*serversocket)) {
 
  469         socket.
accept(serversocket->getFileDescriptor());
 
  484     if (!timeslices.empty() && (timeslices.front().size() >=  frames_per_slice ||
 
  485                                 timeslices.size()         >=  maxQueueDepth    ||
 
  486                                 queueSize                 >=  maxQueueSize)) {
 
  489       queueSize                     -= pending_slice.
getSize();
 
  490       timeslices.pop_front();
 
  493       minFrameNumber      = min(minFrameNumber, pending_slice.
getFrameIndex());
 
  494       maxFrameNumber      = max(maxFrameNumber, pending_slice.
getFrameIndex());
 
  496       if(pending_slice.size()>frames_per_slice) {
 
  498         JErrorStream(logger) << 
"More frames in timeslice than expected : " << pending_slice.size();
 
  500         if(pending_slice.size()<=maximum_frames_per_slice) { 
 
  502           JErrorStream(logger) << 
"Adjusting expected frames per timeslice to " << pending_slice.size();
 
  504           frames_per_slice = pending_slice.size();
 
  508       if (!pending_slice.empty()) {
 
  510         processTimeSlice(pending_slice);
 
  512         numberOfTimeslicesProcessed += 1;
 
  514         if (pending_slice.size() < frames_per_slice) {
 
  515           numberOfIncompleteTimeslicesProcessed += 1;
 
  516           JErrorStream(logErrorIncomplete) << 
"Timeout -> processed incomplete timeslice "  
  517                                            << 
"Timeslice frameindex " << pending_slice.
getFrameIndex() << 
' ' 
  518                                            << 
"Size of timeslice " << pending_slice.size() << 
' ' 
  519                                            << 
"Queue depth " << timeslices.size()             << 
' ' 
  520                                            << 
"Queue size "  << queueSize;
 
  522           if(!timeslices.empty()) {
 
  523             JErrorStream(logger) << 
"Adjusting expected frames per timeslice from " << frames_per_slice << 
" to " << pending_slice.size();
 
  524             frames_per_slice = pending_slice.size();
 
  550     if(preamble.getLength()!=channel->size()) {
 
  552       JErrorStream(logErrorRun) << 
"Size of received data does not match size reported by preamble."  
  553                                 << 
" preamble.getLength(): " << preamble.getLength() 
 
  554                                 << 
" channel->size(): " << channel->size() ;
 
  555       number_of_packets_discarded += 1;
 
  560     if (header.getRunNumber() != getRunNumber()) {
 
  562       JErrorStream(logErrorRun) << 
"Run number " << header.getRunNumber() << 
" unequal to current run " << getRunNumber() << 
" -> Dropping frame.";
 
  564       number_of_packets_discarded += 1;
 
  569     if (header.getFrameIndex() <= current_slice_index) {
 
  571       JErrorStream(logErrorIndex) << 
"FrameIndex " << header.getFrameIndex()  <<
" already processed, dropping frame.";
 
  573       number_of_packets_discarded += 1;
 
  575       if(frames_per_slice<maximum_frames_per_slice) {
 
  579         JErrorStream(logErrorIndex) << 
"Increased number of frames expected to: " << frames_per_slice;
 
  587     for (timesliceIterator  = timeslices.begin();
 
  588          timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex();
 
  589          ++timesliceIterator) ;
 
  591     if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
 
  597       timesliceIterator  = timeslices.insert(timesliceIterator, 
JDAQTimesliceL0());
 
  598       timesliceIterator->setDAQChronometer(header.getDAQChronometer());
 
  600       queueSize += timesliceIterator->getSize();
 
  604     if (checkForInvalidPMTs) {
 
  610       if(frame.
size()>maxHitsFrame) {
 
  611         JErrorStream(logger) << 
"Frame size exceeds limit. Size: " << frame.
size()<< 
" Frame discarded.";
 
  618       for (; hit!=frame.
end(); ++hit) {
 
  629       if(hit!=frame.
end()) {
 
  630         numberOfFramesWithInvalidPMTs +=1;
 
  631         JErrorStream(logErrorInvalidPMTs)<< 
"Frame with invalid PMT id or wrong time ordering discarded. Total discarded: "<< numberOfFramesWithInvalidPMTs;
 
  637           timesliceIterator->push_back(frame);
 
  638           queueSize += timesliceIterator->rbegin()->getSize();
 
  646       in >> 
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
 
  647       queueSize += timesliceIterator->rbegin()->getSize();
 
  662       if (parameters.writeSummary()) {
 
  668           datawriter->put(
object);
 
  670           numberOfBytes  += 
object.getSize();
 
  677       timesliceRouter->configure(pending_slice);
 
  679       if (parameters.trigger3DMuon.enabled    || 
 
  680           parameters.trigger3DShower.enabled  || 
 
  681           parameters.triggerMXShower.enabled  || 
 
  682           parameters.writeL1.prescale         ||
 
  683           parameters.writeL2.prescale         ||
 
  684           parameters.writeSN.prescale) {
 
  692         for (JDAQTimesliceL0::const_iterator super_frame = pending_slice.begin(); super_frame != pending_slice.end(); ++super_frame) {
 
  694           if (moduleRouter->hasModule(super_frame->getModuleID())) {
 
  696             const JModule&   module = moduleRouter->getModule(super_frame->getModuleID()); 
 
  697             JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*super_frame, module);
 
  710             timesliceL1.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
 
  711                                                   super_frame->getModuleIdentifier(),
 
  714             (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
 
  718             timesliceL2.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
 
  719                                                   super_frame->getModuleIdentifier(),
 
  722             (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
 
  726             timesliceSN.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
 
  727                                                   super_frame->getModuleIdentifier(),
 
  730             (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
 
  734             JErrorStream(logErrorDetector) << 
"No detector information for module " << super_frame->getModuleID(); 
 
  743         (*trigger3DMuon)  (trigger_input, back_inserter(trigger_output));
 
  744         (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
 
  745         (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
 
  749         for (JTriggerOutput::const_iterator 
event = trigger_output.begin(); 
event != trigger_output.end(); ++
event) {
 
  755             datawriter->put(
object);
 
  758             numberOfBytes  += 
object.getSize();
 
  765         if (parameters.writeL1()) {
 
  771             datawriter->put(
object);
 
  773             numberOfBytes  += 
object.getSize();
 
  780         if (parameters.writeL2()) {
 
  786             datawriter->put(
object);
 
  788             numberOfBytes  += 
object.getSize();
 
  795         if (parameters.writeSN()) {
 
  801             datawriter->put(
object);
 
  803             numberOfBytes  += 
object.getSize();
 
  811       if (parameters.writeTimeslices() || parameters.writeL0()) { 
 
  815           datawriter->put(pending_slice);         
 
  817           numberOfBytes  += pending_slice.
getSize();
 
  835     const double T_us = (double) timer.usec_wall;
 
  837     JNoticeStream(logger) << 
"Reporting statistics for this datafilter.";
 
  838     JNoticeStream(logger) << 
"Elapsed real (wall) time [s] "          << T_us / 1e6;
 
  839     JNoticeStream(logger) << 
"Elapsed user CPU time [s] "             << (double) timer.usec_ucpu/ 1e6;
 
  840     JNoticeStream(logger) << 
"Elapsed system CPU  time [s] "          << (double) timer.usec_scpu/ 1e6;
 
  841     JNoticeStream(logger) << 
"Number of packets received/discarded "  << number_of_packets_received << 
"/" << number_of_packets_discarded;
 
  842     JNoticeStream(logger) << 
"Number of events/MB sent "              << numberOfEvents             << 
"/" << numberOfBytes/1e6;
 
  844     if (number_of_packets_received > 0) { 
JNoticeStream(logger) << 
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received; }
 
  846     JNoticeStream(logger) << 
"Current queue depth " << timeslices.size();
 
  848     JNoticeStream(logger) << 
"Current number of frames per slice expected: " << frames_per_slice;
 
  855     JNoticeStream(logger) << 
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << 
"/" << numberOfIncompleteTimeslicesProcessed;
 
  856     JNoticeStream(logger) << 
"Number of frames with invalid PMTs dropped "      << numberOfFramesWithInvalidPMTs;
 
  858     if (numberOfTimeslicesProcessed > 0) { 
JNoticeStream(logger) << 
"Real time per timeslice [ms] "       << timer.usec_wall / 1000 / numberOfTimeslicesProcessed; }
 
  859     if (numberOfTimeslicesProcessed > 0) { 
JNoticeStream(logger) << 
"User CPU  time per timeslice [ms] "  << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed; }
 
  860     if (numberOfTimeslicesProcessed > 0) { 
JNoticeStream(logger) << 
"System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed; }
 
  862     const double processedSlicesTimeMus   =  numberOfTimeslicesProcessed      * 
getFrameTime() / 1000;
 
  863     const double processedDetectorTimeMus = (maxFrameNumber - minFrameNumber) * 
getFrameTime() / 1000;
 
  865     if (processedSlicesTimeMus   > 0) { 
JNoticeStream(logger) << 
"Performance factor (inaccurate estimate): "                << T_us / processedSlicesTimeMus;   }
 
  866     if (processedDetectorTimeMus > 0) { 
JNoticeStream(logger) << 
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTimeMus; }
 
  957 int main(
int argc, 
char* argv[])
 
  974     JParser<> zap(
"Application for real-time filtering of data.");
 
  979     zap[
'u'] = 
make_field(client_name)     = 
"JDataFilter";
 
  988   catch(
const exception& error) {
 
  989     FATAL(error.what() << endl);
 
Message logger with time scheduler. 
 
Utility class to parse command line options. 
 
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
 
virtual const char * what() const 
Get error message. 
 
bool read(const JEquation &equation)
Read equation. 
 
Data structure for all trigger parameters. 
 
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
 
virtual void actionContinue(int length, const char *buffer)
 
Data structure for a composite optical module. 
 
JBuildL2< hit_type > JBuildL2_t
 
std::string hostname
controlhost to send data to the datawriter 
 
long long int numberOfFramesWithInvalidPMTs
 
Message logging based on std::ostream. 
 
static const JNET::JTag RC_DFILTER
 
virtual void actionExit()
 
long long int numberOfEvents
 
Router for direct addressing of module data in detector data structure. 
 
Interface for logging messages. 
 
Message logging based on ControlHost. 
 
long long int number_of_packets_received
 
Utility class to parse parameter values. 
 
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
 
Simple data structure to support I/O of equations (see class JLANG::JEquation). 
 
std::string getIPaddress(const int ip)
Get IP address (decimal-dot notation). 
 
JChannelList_t channelList
 
std::vector< JSocketInputChannel_t > JChannelList_t
 
JMessageScheduler logErrorDetector
 
static const JNET::JTag IO_TRIGGER_PARAMETERS
 
JSinglePointer< JBuildL1_t > buildL1
 
JSuperFrame2D< hit_type > JSuperFrame2D_t
 
void run()
Run as run control client following command messages via JNET::JControlHost. 
 
virtual void actionStop(int length, const char *buffer)
 
JPMT_t getPMT() const 
Get PMT. 
 
vector< JDAQProcess > dataFilters
 
int getRunNumber() const 
Get run number. 
 
list< JDAQTimesliceL0 > timeslices
 
JSinglePointer< JBuildL2_t > buildL2
 
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
 
Utility class to parse parameter values. 
 
vector< JDAQProcess > dataQueues
 
unsigned int maxQueueDepth
 
int getFrameIndex() const 
Get frame index. 
 
virtual void actionQuit(int length, const char *buffer)
 
1-dimensional frame with time calibrated data from one optical module. 
 
long long int numberOfTimeslicesProcessed
 
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. 
 
Basic data structure for L0 hit. 
 
std::string index
index in process list 
 
JBuildL1< hit_type > JBuildL1_t
 
std::string trim(const std::string &buffer)
Trim string. 
 
Basic data structure for time and time over threshold information of hit. 
 
The template JSinglePointer class can be used to hold a pointer to an object. 
 
int getLength() const 
Get length. 
 
virtual void actionSelect(const JFileDescriptorMask &mask)
 
int getFileDescriptor() const 
Get file descriptor. 
 
JNET::JSocketInputChannel< KM3NETDAQ::JDAQAbstractPreamble > JSocketInputChannel_t
 
Scheduling of actions via fixed latency intervals. 
 
void accept(const int server)
Accept connection from a server. 
 
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector. 
 
virtual void actionInit(int length, const char *buffer)
 
JMessageScheduler logErrorInvalidPMTs
 
Auxiliary class for itemization of process list. 
 
JSinglePointer< JTriggerMXShower > triggerMXShower
 
JSinglePointer< JControlHost_t > datawriter
 
int backlog
serversocket port 
 
JSinglePointer< JBuildL2_t > buildSN
 
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object 
 
JMessageScheduler logErrorIndex
 
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
 
const_iterator begin() const 
 
double getFrameTime()
Get frame time duration. 
 
static const size_t buffer_size
 
virtual void actionConfigure(int length, const char *buffer)
 
JTriggerParameters parameters
 
const JDAQChronometer & getDAQChronometer() const 
Get DAQ chronometer. 
 
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto. 
 
Auxiliary class to build JDAQEvent for a triggered event. 
 
void updateFrameQueue(const JChannelList_t::const_iterator channel)
 
unsigned int frames_per_slice
 
Level specific message streamers. 
 
Exception for ControlHost. 
 
const JPosition3D & getPosition() const 
Get position. 
 
Simple datastructure for the DAQ preamble without ROOT functionality. 
 
JTimeslice< hit_type > JTimeslice_t
 
virtual void actionReset(int length, const char *buffer)
 
static void reset()
Reset counter of unique instance of this class object. 
 
General purpose messaging. 
 
Match of two events considering overlap in time. 
 
void setReceiveBufferSize(const int size)
Set receive buffer size. 
 
std::string getHostname()
Get host name. 
 
JMessageScheduler logErrorRun
 
virtual void actionPause(int length, const char *buffer)
 
virtual void setSelect(JFileDescriptorMask &mask) const 
 
vector< hit_type > JFrameL1_t
 
JSinglePointer< JTimesliceRouter > timesliceRouter
 
JSinglePointer< JTrigger3DShower > trigger3DShower
 
long long int totalCPURAM
 
virtual void actionStart(int length, const char *buffer)
 
Auxiliary class to build JDAQTimeslice for L1 timeslice. 
 
Run control client base class. 
 
virtual int getSize() const 
Get size of object. 
 
void merge(const JMatch_t &match)
Merge events. 
 
Utility class to parse command line options. 
 
Implemenation of object output through ControlHost. 
 
bool processIndexSorter(const JDAQProcess &a, const JDAQProcess &b)
 
JSinglePointer< JModuleRouter > moduleRouter
 
unsigned long long int getRAM()
Get RAM of this CPU. 
 
long long int maxQueueSize
 
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size)
Constructor. 
 
ROOT TTree parameter settings. 
 
long long int numberOfIncompleteTimeslicesProcessed
 
long long int number_of_reads
 
void setNonBlocking(const bool on)
Set non-blocking of I/O. 
 
virtual bool enter(const JArgs &args)
Enter the state machine. 
 
2-dimensional frame with time calibrated data from one optical module. 
 
JMessageScheduler logErrorIncomplete
 
const char * getName()
Get ROOT name of given data type. 
 
static const JNET::JTag RC_CMD
 
virtual void actionEnter()
 
void setKeepAlive(const bool on)
Set keep alive of socket. 
 
void processTimeSlice(const JDAQTimesliceL0 &pending_slice)
 
long long int number_of_packets_discarded
 
virtual const char * what() const 
Get error message. 
 
KM3NeT DAQ constants, bit handling, etc. 
 
JSinglePointer< JServerSocket > serversocket
 
Hostname and IP address functions. 
 
Data frame of one optical module. 
 
JSinglePointer< JTrigger3DMuon > trigger3DMuon
 
Timeslice data structure for L0 data. 
 
Basic data structure for L1 hit. 
 
unsigned int maximum_frames_per_slice
 
long long int number_of_bytes_received
 
Time slice with calibrated data. 
 
JSuperFrame1D< hit_type > JSuperFrame1D_t
 
const_iterator end() const 
 
long long int numberOfBytes
 
int main(int argc, char *argv[])