67 using namespace KM3NETDAQ;
116 const std::string& server,
117 const std::string& hostname,
128 this->backlog = backlog;
135 current_slice_index = -1;
149 catch(
const std::exception& error) {
159 serversocket.reset();
166 JDebugStream(logger) <<
"actionInit() " << std::string(buffer,length);
181 JDebugStream(logger) <<
"actionConfigure() " << std::string(buffer,length);
183 long long int logger_s = 5;
194 properties[
"dataWriter"] = hostname;
195 properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
197 properties[
"triggerParameters"] = parameters;
198 properties[
"queueSize"] = maxQueueSize = (totalCPURAM -
GIGABYTE);
199 properties[
"queueDepth"] = maxQueueDepth = 20;
200 properties[
"logger_s"] = logger_s;
201 properties[
"JDataFilter"] = dataFilters;
202 properties[
"DataQueue"] = dataQueues;
203 properties[
"pmtIdCheck"] = checkForInvalidPMTs =
false;
204 properties[
"maxHitsFrame"] = maxHitsFrame = 31*2000;
207 properties.
read(
string(buffer, length));
209 catch(
const exception& error) {
213 hostname =
trim(hostname);
218 throw JException(
"Undefined data writer host name.");
220 maximum_frames_per_slice = frames_per_slice;
224 if(dataFilters.empty()) {
225 JNoticeStream(logger) <<
"No DataFilters in process list, or no process list. Assuming that this process is the only process on this CPU and setting parameters accordingly.";
231 unsigned int numberOfDataFiltersOnThisMachine = 0;
239 numberOfDataFiltersOnThisMachine++;
241 if (i->port == this->port) {
247 if (numberOfDataFiltersOnThisMachine == 0) {
248 JNoticeStream(logger) <<
"Zero datafilters on this machine according to process list (if it exists). Assuming one datafilter on this machine.";
249 numberOfDataFiltersOnThisMachine = 1;
252 if (thisProcess == dataFilters.end()) {
253 JErrorStream(logger) <<
"This process cannot be found in the process list. Why do I exist?";
256 if (thisProcess != dataFilters.end() && thisProcess->index !=
getName()) {
257 JErrorStream(logger) <<
"Mismatch between given process names: "
258 <<
"I am called " <<
getName() <<
", but in the process list I am referred to as " << thisProcess->index;
261 if (dataFilters.begin() == thisProcess || dataFilters.empty()) {
265 if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {
267 maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
269 JNoticeStream(logger) <<
"Maximum queue size is too large given the number of processes on this machine. Queue size reduced to "
270 << maxQueueSize <<
" bytes." ;
284 JNoticeStream(logger) <<
"This JDataFilter process will report.";
285 JNoticeStream(logger) <<
"Number of modules: " << (*moduleRouter)->size();
286 JNoticeStream(logger) <<
"Trigger parameters: " << parameters;
288 JNoticeStream(logger) <<
"Clock interval: " << getClockInterval();
299 if (buildL1.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL1."; }
300 if (buildL2.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL2."; }
301 if (buildSN.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
319 current_slice_index = -1;
324 numberOfTimeslicesProcessed = 0;
325 numberOfIncompleteTimeslicesProcessed = 0;
327 numberOfFramesWithInvalidPMTs = 0;
329 number_of_packets_received = 0;
330 number_of_packets_discarded = 0;
331 number_of_bytes_received = 0;
334 minFrameNumber = numeric_limits<int>::max();
335 maxFrameNumber = numeric_limits<int>::min();
341 logErrorRun .reset();
342 logErrorDetector .reset();
343 logErrorIndex .reset();
344 logErrorIncomplete .reset();
345 logErrorInvalidPMTs.reset();
354 os << getRunNumber() <<
' ' << parameters;
362 if (!timeslices.empty()) {
364 JNoticeStream(logger) <<
"Flushing " << timeslices.size() <<
" slices.";
367 queueSize -= i->getSize();
373 if (queueSize != 0) {
374 JWarningStream(logger) <<
"Pending data in queue [B] " << queueSize;
377 current_slice_index = -1;
408 if (serversocket.is_valid()) {
409 mask.set(*serversocket);
412 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
413 if (!channel->isReady()) {
414 mask.set(channel->getFileDescriptor());
422 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
426 if (mask.has(channel->getFileDescriptor())) {
430 if (channel->isReady()) {
432 number_of_packets_received += 1;
433 number_of_reads += channel->getCounter();
434 number_of_bytes_received += channel->size();
438 updateFrameQueue(channel);
442 JErrorStream(logErrorRun) <<
"Receiving data while not running.";
444 number_of_packets_discarded += 1;
454 JNoticeStream(logger) <<
"Disconnecting channel " << channel->getFileDescriptor() <<
' ' << error.
what();
458 channel = channelList.erase(channel);
463 if (serversocket.is_valid()) {
465 if (mask.has(*serversocket)) {
469 socket.
accept(serversocket->getFileDescriptor());
484 if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
485 timeslices.size() >= maxQueueDepth ||
486 queueSize >= maxQueueSize)) {
489 queueSize -= pending_slice.
getSize();
490 timeslices.pop_front();
493 minFrameNumber = min(minFrameNumber, pending_slice.
getFrameIndex());
494 maxFrameNumber = max(maxFrameNumber, pending_slice.
getFrameIndex());
496 if(pending_slice.size()>frames_per_slice) {
498 JErrorStream(logger) <<
"More frames in timeslice than expected : " << pending_slice.size();
500 if(pending_slice.size()<=maximum_frames_per_slice) {
502 JErrorStream(logger) <<
"Adjusting expected frames per timeslice to " << pending_slice.size();
504 frames_per_slice = pending_slice.size();
508 if (!pending_slice.empty()) {
510 processTimeSlice(pending_slice);
512 numberOfTimeslicesProcessed += 1;
514 if (pending_slice.size() < frames_per_slice) {
515 numberOfIncompleteTimeslicesProcessed += 1;
516 JErrorStream(logErrorIncomplete) <<
"Timeout -> processed incomplete timeslice "
517 <<
"Timeslice frameindex " << pending_slice.
getFrameIndex() <<
' '
518 <<
"Size of timeslice " << pending_slice.size() <<
' '
519 <<
"Queue depth " << timeslices.size() <<
' '
520 <<
"Queue size " << queueSize;
522 if(!timeslices.empty()) {
523 JErrorStream(logger) <<
"Adjusting expected frames per timeslice from " << frames_per_slice <<
" to " << pending_slice.size();
524 frames_per_slice = pending_slice.size();
550 if(preamble.getLength()!=channel->size()) {
552 JErrorStream(logErrorRun) <<
"Size of received data does not match size reported by preamble."
553 <<
" preamble.getLength(): " << preamble.getLength()
554 <<
" channel->size(): " << channel->size() ;
555 number_of_packets_discarded += 1;
560 if (header.getRunNumber() != getRunNumber()) {
562 JErrorStream(logErrorRun) <<
"Run number " << header.getRunNumber() <<
" unequal to current run " << getRunNumber() <<
" -> Dropping frame.";
564 number_of_packets_discarded += 1;
569 if (header.getFrameIndex() <= current_slice_index) {
571 JErrorStream(logErrorIndex) <<
"FrameIndex " << header.getFrameIndex() <<
" already processed, dropping frame.";
573 number_of_packets_discarded += 1;
575 if(frames_per_slice<maximum_frames_per_slice) {
579 JErrorStream(logErrorIndex) <<
"Increased number of frames expected to: " << frames_per_slice;
587 for (timesliceIterator = timeslices.begin();
588 timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex();
589 ++timesliceIterator) ;
591 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
597 timesliceIterator = timeslices.insert(timesliceIterator,
JDAQTimesliceL0());
598 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
600 queueSize += timesliceIterator->getSize();
604 if (checkForInvalidPMTs) {
610 if(frame.
size()>maxHitsFrame) {
611 JErrorStream(logger) <<
"Frame size exceeds limit. Size: " << frame.
size()<<
" Frame discarded.";
618 for (; hit!=frame.
end(); ++hit) {
629 if(hit!=frame.
end()) {
630 numberOfFramesWithInvalidPMTs +=1;
631 JErrorStream(logErrorInvalidPMTs)<<
"Frame with invalid PMT id or wrong time ordering discarded. Total discarded: "<< numberOfFramesWithInvalidPMTs;
637 timesliceIterator->push_back(frame);
638 queueSize += timesliceIterator->rbegin()->getSize();
646 in >>
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
647 queueSize += timesliceIterator->rbegin()->getSize();
662 if (parameters.writeSummary()) {
668 datawriter->put(
object);
670 numberOfBytes +=
object.getSize();
677 timesliceRouter->configure(pending_slice);
679 if (parameters.trigger3DMuon.enabled ||
680 parameters.trigger3DShower.enabled ||
681 parameters.triggerMXShower.enabled ||
682 parameters.writeL1.prescale ||
683 parameters.writeL2.prescale ||
684 parameters.writeSN.prescale) {
692 for (JDAQTimesliceL0::const_iterator super_frame = pending_slice.begin(); super_frame != pending_slice.end(); ++super_frame) {
694 if (moduleRouter->hasModule(super_frame->getModuleID())) {
696 const JModule& module = moduleRouter->getModule(super_frame->getModuleID());
697 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*super_frame, module);
710 timesliceL1.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
711 super_frame->getModuleIdentifier(),
714 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
718 timesliceL2.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
719 super_frame->getModuleIdentifier(),
722 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
726 timesliceSN.push_back(
JSuperFrame1D_t(super_frame->getDAQChronometer(),
727 super_frame->getModuleIdentifier(),
730 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
734 JErrorStream(logErrorDetector) <<
"No detector information for module " << super_frame->getModuleID();
743 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
744 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
745 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
749 for (JTriggerOutput::const_iterator
event = trigger_output.begin();
event != trigger_output.end(); ++
event) {
755 datawriter->put(
object);
758 numberOfBytes +=
object.getSize();
765 if (parameters.writeL1()) {
771 datawriter->put(
object);
773 numberOfBytes +=
object.getSize();
780 if (parameters.writeL2()) {
786 datawriter->put(
object);
788 numberOfBytes +=
object.getSize();
795 if (parameters.writeSN()) {
801 datawriter->put(
object);
803 numberOfBytes +=
object.getSize();
811 if (parameters.writeTimeslices() || parameters.writeL0()) {
815 datawriter->put(pending_slice);
817 numberOfBytes += pending_slice.
getSize();
835 const double T_us = (double) timer.usec_wall;
837 JNoticeStream(logger) <<
"Reporting statistics for this datafilter.";
838 JNoticeStream(logger) <<
"Elapsed real (wall) time [s] " << T_us / 1e6;
839 JNoticeStream(logger) <<
"Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
840 JNoticeStream(logger) <<
"Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
841 JNoticeStream(logger) <<
"Number of packets received/discarded " << number_of_packets_received <<
"/" << number_of_packets_discarded;
842 JNoticeStream(logger) <<
"Number of events/MB sent " << numberOfEvents <<
"/" << numberOfBytes/1e6;
844 if (number_of_packets_received > 0) {
JNoticeStream(logger) <<
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received; }
846 JNoticeStream(logger) <<
"Current queue depth " << timeslices.size();
848 JNoticeStream(logger) <<
"Current number of frames per slice expected: " << frames_per_slice;
855 JNoticeStream(logger) <<
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed <<
"/" << numberOfIncompleteTimeslicesProcessed;
856 JNoticeStream(logger) <<
"Number of frames with invalid PMTs dropped " << numberOfFramesWithInvalidPMTs;
858 if (numberOfTimeslicesProcessed > 0) {
JNoticeStream(logger) <<
"Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed; }
859 if (numberOfTimeslicesProcessed > 0) {
JNoticeStream(logger) <<
"User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed; }
860 if (numberOfTimeslicesProcessed > 0) {
JNoticeStream(logger) <<
"System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed; }
862 const double processedSlicesTimeMus = numberOfTimeslicesProcessed *
getFrameTime() / 1000;
863 const double processedDetectorTimeMus = (maxFrameNumber - minFrameNumber) *
getFrameTime() / 1000;
865 if (processedSlicesTimeMus > 0) {
JNoticeStream(logger) <<
"Performance factor (inaccurate estimate): " << T_us / processedSlicesTimeMus; }
866 if (processedDetectorTimeMus > 0) {
JNoticeStream(logger) <<
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTimeMus; }
957 int main(
int argc,
char* argv[])
974 JParser<> zap(
"Application for real-time filtering of data.");
979 zap[
'u'] =
make_field(client_name) =
"JDataFilter";
988 catch(
const exception& error) {
989 FATAL(error.what() << endl);
Message logger with time scheduler.
Utility class to parse command line options.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
virtual const char * what() const
Get error message.
bool read(const JEquation &equation)
Read equation.
Data structure for all trigger parameters.
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
virtual void actionContinue(int length, const char *buffer)
Data structure for a composite optical module.
JBuildL2< hit_type > JBuildL2_t
std::string hostname
controlhost to send data to the datawriter
long long int numberOfFramesWithInvalidPMTs
Message logging based on std::ostream.
static const JNET::JTag RC_DFILTER
virtual void actionExit()
long long int numberOfEvents
Router for direct addressing of module data in detector data structure.
Interface for logging messages.
Message logging based on ControlHost.
long long int number_of_packets_received
Utility class to parse parameter values.
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
Simple data structure to support I/O of equations (see class JLANG::JEquation).
std::string getIPaddress(const int ip)
Get IP address (decimal-dot notation).
JChannelList_t channelList
std::vector< JSocketInputChannel_t > JChannelList_t
JMessageScheduler logErrorDetector
static const JNET::JTag IO_TRIGGER_PARAMETERS
JSinglePointer< JBuildL1_t > buildL1
JSuperFrame2D< hit_type > JSuperFrame2D_t
void run()
Run as run control client following command messages via JNET::JControlHost.
virtual void actionStop(int length, const char *buffer)
JPMT_t getPMT() const
Get PMT.
vector< JDAQProcess > dataFilters
int getRunNumber() const
Get run number.
list< JDAQTimesliceL0 > timeslices
JSinglePointer< JBuildL2_t > buildL2
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
Utility class to parse parameter values.
vector< JDAQProcess > dataQueues
unsigned int maxQueueDepth
int getFrameIndex() const
Get frame index.
virtual void actionQuit(int length, const char *buffer)
1-dimensional frame with time calibrated data from one optical module.
long long int numberOfTimeslicesProcessed
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e.
Basic data structure for L0 hit.
std::string index
index in process list
JBuildL1< hit_type > JBuildL1_t
std::string trim(const std::string &buffer)
Trim string.
Basic data structure for time and time over threshold information of hit.
The template JSinglePointer class can be used to hold a pointer to an object.
int getLength() const
Get length.
virtual void actionSelect(const JFileDescriptorMask &mask)
int getFileDescriptor() const
Get file descriptor.
JNET::JSocketInputChannel< KM3NETDAQ::JDAQAbstractPreamble > JSocketInputChannel_t
Scheduling of actions via fixed latency intervals.
void accept(const int server)
Accept connection from a server.
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
virtual void actionInit(int length, const char *buffer)
JMessageScheduler logErrorInvalidPMTs
Auxiliary class for itemization of process list.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JControlHost_t > datawriter
int backlog
serversocket port
JSinglePointer< JBuildL2_t > buildSN
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
JMessageScheduler logErrorIndex
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
const_iterator begin() const
double getFrameTime()
Get frame time duration.
static const size_t buffer_size
virtual void actionConfigure(int length, const char *buffer)
JTriggerParameters parameters
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQEvent for a triggered event.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
unsigned int frames_per_slice
Level specific message streamers.
Exception for ControlHost.
const JPosition3D & getPosition() const
Get position.
Simple datastructure for the DAQ preamble without ROOT functionality.
JTimeslice< hit_type > JTimeslice_t
virtual void actionReset(int length, const char *buffer)
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Match of two events considering overlap in time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
std::string getHostname()
Get host name.
JMessageScheduler logErrorRun
virtual void actionPause(int length, const char *buffer)
virtual void setSelect(JFileDescriptorMask &mask) const
vector< hit_type > JFrameL1_t
JSinglePointer< JTimesliceRouter > timesliceRouter
JSinglePointer< JTrigger3DShower > trigger3DShower
long long int totalCPURAM
virtual void actionStart(int length, const char *buffer)
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Run control client base class.
virtual int getSize() const
Get size of object.
void merge(const JMatch_t &match)
Merge events.
Utility class to parse command line options.
Implemenation of object output through ControlHost.
bool processIndexSorter(const JDAQProcess &a, const JDAQProcess &b)
JSinglePointer< JModuleRouter > moduleRouter
unsigned long long int getRAM()
Get RAM of this CPU.
long long int maxQueueSize
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size)
Constructor.
ROOT TTree parameter settings.
long long int numberOfIncompleteTimeslicesProcessed
long long int number_of_reads
void setNonBlocking(const bool on)
Set non-blocking of I/O.
virtual bool enter(const JArgs &args)
Enter the state machine.
2-dimensional frame with time calibrated data from one optical module.
JMessageScheduler logErrorIncomplete
const char * getName()
Get ROOT name of given data type.
static const JNET::JTag RC_CMD
virtual void actionEnter()
void setKeepAlive(const bool on)
Set keep alive of socket.
void processTimeSlice(const JDAQTimesliceL0 &pending_slice)
long long int number_of_packets_discarded
virtual const char * what() const
Get error message.
KM3NeT DAQ constants, bit handling, etc.
JSinglePointer< JServerSocket > serversocket
Hostname and IP address functions.
Data frame of one optical module.
JSinglePointer< JTrigger3DMuon > trigger3DMuon
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
unsigned int maximum_frames_per_slice
long long int number_of_bytes_received
Time slice with calibrated data.
JSuperFrame1D< hit_type > JSuperFrame1D_t
const_iterator end() const
long long int numberOfBytes
int main(int argc, char *argv[])