179 static const int MAXIMUM_FILE_NUMBER = 100;
210 gErrorIgnoreLevel = kFatal;
212 std::ostringstream os;
217 std::remove(os.str().c_str());
220 this->
open(os.str().c_str());
238 if (this->is_open()) {
240 const string file_name = this->
getFile()->GetName();
246 for (
int i = 0;
i != MAXIMUM_FILE_NUMBER; ++
i) {
252 <<
"_" << cal.getYear() <<
'-' <<
FILL(2,
'0') << cal.getMonth() <<
'-' <<
FILL(2,
'0') << cal.getDay()
272 std::remove(file_name.c_str());
297 return (sizeL0 > 0 ||
313 if (
object.is_open())
314 out <<
object.getFile()->GetName();
320 out <<
object.sizeL0 <<
'/'
321 <<
object.sizeL1 <<
'/'
322 <<
object.sizeL2 <<
'/'
323 <<
object.sizeSN <<
'/';
385 current_slice_index = -1;
398 if (c_buffer.is_open()) {
400 JStatusStream(logger) <<
"Close and remove circular buffer " << c_buffer;
402 c_buffer.close(
false);
409 virtual void actionInit(
int length,
const char* buffer)
override
417 if (serversocket.is_valid()) {
418 serversocket->shutdown();
423 catch(
const std::exception& error) {
424 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
436 string _hostname_ =
"";
447 dumpLimit = numeric_limits<int>::max();
453 properties[
"dataWriter"] = _hostname_;
454 properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
457 properties[
"queueSize"] = maxQueueSize = (totalCPURAM -
GIGABYTE);
458 properties[
"queueDepth"] = maxQueueDepth = 20;
459 properties[
"frameIndex"] = maximal_frame_index = 100000;
462 properties[
"JDataFilter"] = dataFilters;
463 properties[
"DataQueue"] = dataQueues;
464 properties[
"path"] = c_buffer.path;
465 properties[
"archive"] = c_buffer.archive;
466 properties[
"c_sizeL0"] = c_buffer.sizeL0;
467 properties[
"c_sizeL1"] = c_buffer.sizeL1;
468 properties[
"c_sizeL2"] = c_buffer.sizeL2;
469 properties[
"c_sizeSN"] = c_buffer.sizeSN;
470 properties[
"dumpLimit"] = dumpLimit;
473 properties.
read(
string(buffer, length));
475 catch(
const std::exception& error) {
482 setClockInterval(
update_s * 1000000LL);
484 _hostname_ =
trim(_hostname_);
486 if (_hostname_ !=
"" && _hostname_ != hostname) {
490 hostname = _hostname_;
493 if (!datawriter.is_valid()) {
502 maximum_frames_per_slice = frames_per_slice;
506 if (dataFilters.empty()) {
507 JNoticeStream(logger) <<
"No DataFilters in process list, or no process list. "
508 <<
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
511 sort(dataFilters.begin(), dataFilters.end(), compare);
513 unsigned int numberOfDataFiltersOnThisMachine = 0;
521 notice <<
"My IP addresses:";
530 JDebugStream(logger) <<
"Test IP address \"" <<
i->hostname <<
"\" " << (find(IP.begin(), IP.end(),
i->hostname) != IP.end());
532 if (find(IP.begin(), IP.end(),
i->hostname) != IP.end()) {
534 numberOfDataFiltersOnThisMachine++;
536 if (
i->port ==
this->port) {
542 if (numberOfDataFiltersOnThisMachine == 0) {
543 JNoticeStream(logger) <<
"Zero data filters on this machine according to process list (if it exists). "
544 <<
"Assuming one datafilter on this machine.";
545 numberOfDataFiltersOnThisMachine = 1;
548 if (thisProcess == dataFilters.end()) {
549 JErrorStream(logger) <<
"This process cannot be found in the process list. Why do I exist?";
552 if (thisProcess != dataFilters.end() && thisProcess->index !=
getName()) {
553 JErrorStream(logger) <<
"Mismatch between given process names: "
555 <<
", but in the process list I am referred to as " << thisProcess->index;
558 if (dataFilters.begin() == thisProcess || dataFilters.empty()) {
562 if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {
564 maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
566 JNoticeStream(logger) <<
"Maximum queue size is too large given the number of processes on this machine. "
567 <<
"Queue size reduced to "
568 << maxQueueSize <<
" bytes." ;
575 JNoticeStream(logger) <<
"Disabling high-rate veto of all PMTs.";
592 JNoticeStream(logger) <<
"This data filter process will report.";
596 JNoticeStream(logger) <<
"Update period [s]: " << getClockInterval();
608 if (buildL1.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL1."; }
609 if (buildL2.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL2."; }
610 if (buildSN.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
611 if (buildNB.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
618 if (c_buffer.is_enabled()) {
620 if (!c_buffer.is_open()) {
624 if (c_buffer.is_open()) {
628 JStatusStream(logger) <<
"Created circular buffer " << c_buffer;
632 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << c_buffer.path <<
">; disable functionality.";
637 JNoticeStream(logger) <<
"Continue using circular buffer " << c_buffer;
641 if (c_buffer.is_open()) {
662 current_slice_index = -1;
667 numberOfTimeslicesProcessed = 0;
668 numberOfIncompleteTimeslicesProcessed = 0;
670 number_of_packets_received = 0;
671 number_of_packets_discarded = 0;
672 number_of_bytes_received = 0;
675 minFrameNumber = numeric_limits<int>::max();
676 maxFrameNumber = numeric_limits<int>::min();
682 logErrorRun .reset();
683 logErrorDetector .reset();
684 logErrorIndex .reset();
685 logErrorIncomplete.reset();
707 if (!timeslices.empty()) {
709 JNoticeStream(logger) <<
"Flushing " << timeslices.size() <<
" slices.";
711 for (deque<JDAQTimesliceL0>::const_iterator
i = timeslices.begin();
i != timeslices.end(); ++
i) {
720 deque<JDAQTimesliceL0> buffer;
722 timeslices.swap(buffer);
729 current_slice_index = -1;
742 virtual void actionStop(
int length,
const char* buffer)
override
752 if (serversocket.is_valid()) {
753 serversocket->shutdown();
756 serversocket.reset();
760 virtual void actionQuit(
int length,
const char* buffer)
override
768 if (serversocket.is_valid()) {
769 mask.
set(*serversocket);
772 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
773 if (!channel->isReady()) {
774 mask.
set(channel->getFileDescriptor());
784 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
788 if (mask.
has(channel->getFileDescriptor())) {
792 if (channel->isReady()) {
794 number_of_packets_received += 1;
795 number_of_reads += channel->getCounter();
796 number_of_bytes_received += channel->size();
801 updateFrameQueue(channel);
803 catch(
const std::exception& error) {
805 JErrorStream(logErrorRun) <<
"Update frame queue " << channel->getFileDescriptor() <<
' ' << channel->size() <<
' ' << error.what();
807 number_of_packets_discarded += 1;
812 JErrorStream(logErrorRun) <<
"Receiving data while not running.";
814 number_of_packets_discarded += 1;
822 catch(
const std::exception& error) {
825 JErrorStream(logger) <<
"Disconnect channel " << channel->getFileDescriptor() <<
' ' << error.what();
830 channel = channelList.erase(channel);
835 if (serversocket.is_valid()) {
837 if (mask.
has(*serversocket)) {
839 JTCPSocket socket(serversocket->getFileDescriptor());
843 socket.setKeepAlive (
true);
844 socket.setNonBlocking(
true);
846 JStatusStream(logger) <<
"New channel" <<
'[' << socket.getFileDescriptor() <<
']';
853 if (!timeslices.empty() && ((timeslices[0].size() >= frames_per_slice) ||
855 (timeslices.size() >= 2
u &&
856 timeslices[1].size() >= frames_per_slice) ||
858 (timeslices.size() >= maxQueueDepth) ||
865 minFrameNumber = min(minFrameNumber, pending_slice.
getFrameIndex());
866 maxFrameNumber = max(maxFrameNumber, pending_slice.
getFrameIndex());
868 if (pending_slice.size() > frames_per_slice) {
872 error <<
"More frames in timeslice than expected "
873 << pending_slice.size() <<
" > " << frames_per_slice;
875 if (pending_slice.size() <= maximum_frames_per_slice) {
877 error <<
" adjusting expected frames per timeslice";
879 frames_per_slice = pending_slice.size();
885 if (!pending_slice.empty()) {
891 processTimeSlice(pending_slice);
895 numberOfTimeslicesProcessed += 1;
901 JErrorStream(logErrorRun) <<
"Skip processing of data while not running.";
904 if (pending_slice.size() < frames_per_slice) {
906 numberOfIncompleteTimeslicesProcessed += 1;
910 error <<
"Timeout -> processed incomplete timeslice: "
912 <<
"Size of timeslice = " << pending_slice.size() <<
';'
913 <<
"Queue depth = " << timeslices.size() <<
';'
916 if ((timeslices.size() >= 2
u &&
917 timeslices[1].size() >= frames_per_slice)) {
919 error <<
" intermittent problem -> continues as-is";
923 error <<
" adjusting expected frames per timeslice from " << frames_per_slice <<
" to " << pending_slice.size();
925 frames_per_slice = pending_slice.size();
932 timeslices.pop_front();
964 if (preamble.getLength() != channel->size()) {
966 JErrorStream(logErrorRun) <<
"Size of received data does not match size reported by preamble: "
967 <<
"preamble.getLength() = " << preamble.getLength() <<
';'
968 <<
"channel->size(): " << channel->size() <<
';';
970 number_of_packets_discarded += 1;
977 JErrorStream(logErrorRun) <<
"Run number " << header.getRunNumber()
979 <<
" -> Dropping frame.";
981 number_of_packets_discarded += 1;
986 if (header.getFrameIndex() <= current_slice_index) {
988 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" <= " << current_slice_index
989 <<
" module " << header.getModuleID()
990 <<
" -> Dropping frame.";
992 number_of_packets_discarded += 1;
994 if (frames_per_slice < maximum_frames_per_slice) {
998 JErrorStream(logErrorIndex) <<
"Increase number of frames expected to: " << frames_per_slice;
1004 if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
1006 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" > " << current_slice_index + maximal_frame_index
1007 <<
" module " << header.getModuleID()
1008 <<
" -> Dropping frame.";
1010 number_of_packets_discarded += 1;
1015 deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1017 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1018 ++timesliceIterator;
1021 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1029 timesliceIterator = timeslices.insert(timesliceIterator,
JDAQTimesliceL0());
1031 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1038 in >>
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
1051 using namespace std;
1055 timesliceRouter->configure(timeslice);
1069 c_buffer.is_enabled()) {
1078 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1080 if (moduleRouter->hasModule(frame->getModuleID())) {
1087 <<
"module = " << frame->getModuleID() <<
";"
1088 <<
"discard" << (dumpCount < dumpLimit ?
" and dump" :
"");
1090 if (dumpCount < dumpLimit) {
1091 timesliceTX.push_back(*frame);
1097 const JModule& module = moduleRouter->getModule(frame->getModuleID());
1098 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1114 if (buffer.begin() != __end) {
1117 frame->getModuleIdentifier(),
1122 (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1124 (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1131 frame->getModuleIdentifier(),
1134 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1139 frame->getModuleIdentifier(),
1142 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1147 frame->getModuleIdentifier(),
1150 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1154 JErrorStream(logErrorDetector) <<
"No detector information for module " << frame->getModuleID();
1158 if (!timesliceTX.empty()) {
1160 if (dumpCount < dumpLimit) {
1162 this->put(timesliceTX);
1174 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1195 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1196 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1197 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1201 for (JTriggerOutput::const_iterator event = trigger_output.begin();
event != trigger_output.end(); ++event) {
1207 numberOfEvents += 1;
1210 if (
parameters.writeL1() || c_buffer.sizeL1 > 0) {
1214 if (
parameters.writeL1) { this->put(
object); }
1215 if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
1218 if (
parameters.writeL2() || c_buffer.sizeL2 > 0) {
1222 if (
parameters.writeL2) { this->put(
object); }
1223 if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
1226 if (
parameters.writeSN() || c_buffer.sizeSN > 0) {
1230 if (
parameters.writeSN) { this->put(
object); }
1231 if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
1234 if (
parameters.writeL0() || c_buffer.sizeL0 > 0) {
1236 if (
parameters.writeL0) { this->put(timeslice); }
1237 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1241 }
catch(
const std::exception& error) {
1243 JErrorStream(logger) <<
"Error = " << error.what() <<
";"
1246 <<
"time slice not correctly processed;"
1247 <<
"discard" << (dumpCount < dumpLimit ?
" and dump" :
"");
1249 if (dumpCount < dumpLimit) {
1251 this->put(static_cast<const JDAQTimeslice&>(timeslice));
1257 timesliceRouter->reset();
1268 const double T_us = (double) timer.usec_wall;
1270 JStatusStream(logger) <<
"Elapsed real (wall) time [s] " << T_us / 1e6;
1271 JStatusStream(logger) <<
"Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1272 JStatusStream(logger) <<
"Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1273 JStatusStream(logger) <<
"Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 <<
" <= " << Qt.getXmax() * 1.0e-3;
1274 JStatusStream(logger) <<
"Number of packets received/discarded " << number_of_packets_received <<
"/" << number_of_packets_discarded;
1275 JStatusStream(logger) <<
"Number of events/MB/us " << numberOfEvents <<
"/" << numberOfBytes/1e6 <<
"/" << Qx.getMean(0.0);
1277 if (number_of_packets_received > 0) {
1278 JStatusStream(logger) <<
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
1282 JStatusStream(logger) <<
"Current number of frames per slice expected: " << frames_per_slice;
1284 JStatusStream(logger) <<
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed <<
"/" << numberOfIncompleteTimeslicesProcessed;
1286 const double processedSlicesTime_us = numberOfTimeslicesProcessed *
getFrameTime() / 1000;
1287 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) *
getFrameTime() / 1000;
1289 if (processedSlicesTime_us > 0) {
1290 JStatusStream(logger) <<
"Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1292 if (processedDetectorTime_us > 0) {
1293 JStatusStream(logger) <<
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1309 using namespace std;
1315 if (c_buffer.is_open()) {
1317 JStatusStream(logger) <<
"Archive circular buffer in <" << c_buffer.archive <<
">";
1319 c_buffer.close(
true);
1322 if (c_buffer.is_enabled()) {
1326 if (c_buffer.is_open()) {
1328 JStatusStream(logger) <<
"Created circular buffer " << c_buffer;
1334 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << c_buffer.path <<
">; disable functionality.";
1366 datawriter->put(
object);
1374 catch(
const std::exception& error) {
1375 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
1468 using namespace std;
1469 using namespace JPP;
1470 using namespace KM3NETDAQ;
1486 JParser<> zap(
"Application for real-time filtering of data.");
1488 zap[
'H'] =
make_field(server,
"host name of server for command messages") =
"localhost";
1489 zap[
'M'] =
make_field(logger,
"host name of server for logger messages") =
"localhost";
1490 zap[
'D'] =
make_field(hostname,
"host name of server of data writer") =
"";
1491 zap[
'u'] =
make_field(client_name,
"client name") =
"%";
1492 zap[
'P'] =
make_field(port,
"port to listen for incoming data from data queue");
1493 zap[
'q'] =
make_field(backlog,
"back log") = 1024;
1494 zap[
'c'] =
make_field(use_cout,
"print to terminal");
1495 zap[
'p'] =
make_field(
path,
"directory for temporary storage of circular buffer") =
"/tmp/";
1496 zap[
'A'] =
make_field(
archive,
"directory for permanent archival of circular buffer") =
"/tmp/";
1501 catch(
const std::exception& error) {
1502 FATAL(error.what() << endl);
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Exception for opening of file.
Message logger with time scheduler.
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Utility class to parse command line options.
void processTimeSlice(const JDAQTimesliceL0 ×lice)
Process time slice.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
bool read(const JEquation &equation)
Read equation.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
int main(int argc, char *argv[])
void close(const bool option)
Close file.
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
JSuperFrame2D< hit_type > JSuperFrame2D_t
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Long64_t sizeL1
Number of L1 time slices.
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Interface for logging messages.
Message logging based on ControlHost.
then echo Enter input within $TIMEOUT_S seconds echo n User name
Utility class to parse parameter values.
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
unsigned int maximum_frames_per_slice
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Simple data structure to support I/O of equations (see class JLANG::JEquation).
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
JTriggerParameters parameters
Main class for real-time filtering of data.
void run()
Run as run control client following command messages via JNET::JControlHost.
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
static const JChecksum checksum
Function object to perform check-sum of raw data.
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
std::string path
Directory for temporary storage.
void disable()
Disable writing.
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
std::string trim(const std::string &buffer)
Trim string.
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
virtual void actionContinue(int length, const char *buffer) override
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
event< ev_daq > ev_configure
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
void close(std::istream *pf)
Close file.
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
double getFrameTime()
Get frame time duration.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Auxiliary class to build JDAQEvent for a triggered event.
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Update queue with data frames.
JTreeWriter object output.
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
JTag tag
Unique tag of this process.
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Long64_t sizeSN
Number of SN time slices.
Auxiliary class for all subscription.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
std::string hostname
host name of data server
unsigned int maxQueueDepth
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
static JStat getFileStatus
Function object for file status.
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
std::string toString() const
Convert tag to string.
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
virtual void actionExit() override
JSinglePointer< JTimesliceRouter > timesliceRouter
Auxiliary data structure for average.
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
JChannelList_t channelList
connections to data queue
Control unit client base class.
Auxiliary class for date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
const char * getName()
Get ROOT name of given data type.
static const JNET::JTag RC_CMD
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
virtual void actionInit(int length, const char *buffer) override
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Match of two events considering overlap in time.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
virtual void actionPause(int length, const char *buffer) override
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
unsigned int frames_per_slice
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
std::string archive
Directory for permanent archival.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
virtual void actionConfigure(int length, const char *buffer) override
Time slice with calibrated data.