107 return number_of_frames;
110 const size_t n = (size_t) (number_of_frames * factor);
112 return (n == 0 ? 1 : n);
211 static const int MAXIMUM_FILE_NUMBER = 100;
242 gErrorIgnoreLevel = kFatal;
244 std::ostringstream os;
249 std::remove(os.str().c_str());
252 this->
open(os.str().c_str());
270 if (this->is_open()) {
272 const string file_name = this->
getFile()->GetName();
278 for (
int i = 0;
i != MAXIMUM_FILE_NUMBER; ++
i) {
284 <<
"_" << cal.getYear() <<
'-' <<
FILL(2,
'0') << cal.getMonth() <<
'-' <<
FILL(2,
'0') << cal.getDay()
304 std::remove(file_name.c_str());
329 return (sizeL0 > 0 ||
345 if (
object.is_open())
346 out <<
object.getFile()->GetName();
352 out <<
object.sizeL0 <<
'/'
353 <<
object.sizeL1 <<
'/'
354 <<
object.sizeL2 <<
'/'
355 <<
object.sizeSN <<
'/';
398 const std::string& server,
399 const std::string& hostname,
404 const std::string&
path,
417 current_slice_index = -1;
418 dataqueue_slice_index.clear();
431 if (c_buffer.is_open()) {
433 JStatusStream(logger) <<
"Close and remove circular buffer " << c_buffer;
435 c_buffer.close(
false);
442 virtual void actionInit(
int length,
const char* buffer)
override
444 JDebugStream(logger) <<
"actionInit() " << std::string(buffer,length);
450 if (serversocket.is_valid()) {
451 serversocket->shutdown();
456 catch(
const std::exception& error) {
457 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
467 JDebugStream(logger) <<
"actionConfigure() " << endl << std::string(buffer,length);
469 string _hostname_ =
"";
480 dumpLimit = numeric_limits<int>::max();
493 properties[
"dataWriter"] = _hostname_;
494 properties[
"numberOfFramesPerSlice"] = frames_per_slice = 1;
495 properties[
"factorOfFramesPerSlice"] = factor_per_slice = 1.0;
498 properties[
"queueSize"] = maxQueueSize = (totalCPURAM -
GIGABYTE);
499 properties[
"queueDepth"] = maxQueueDepth = 20;
500 properties[
"frameIndex"] = maximal_frame_index = 100000;
503 properties[
"JDataFilter"] = dataFilters;
504 properties[
"DataQueue"] = dataQueues;
505 properties[
"path"] = c_buffer.path;
506 properties[
"archive"] = c_buffer.archive;
507 properties[
"c_sizeL0"] = c_buffer.sizeL0;
508 properties[
"c_sizeL1"] = c_buffer.sizeL1;
509 properties[
"c_sizeL2"] = c_buffer.sizeL2;
510 properties[
"c_sizeSN"] = c_buffer.sizeSN;
511 properties[
"dumpLimit"] = dumpLimit;
512 properties[
"dumpMask"] = dumpMask;
515 properties.
read(
string(buffer, length));
517 catch(
const std::exception& error) {
524 setClockInterval(
update_s * 1000000LL);
526 _hostname_ =
trim(_hostname_);
528 if (_hostname_ !=
"" && _hostname_ != hostname) {
532 hostname = _hostname_;
535 bool status = datawriter.is_valid();
540 status = datawriter->Connected() == 0;
542 catch (
const exception&) {
558 if (dataFilters.empty()) {
559 JNoticeStream(logger) <<
"No DataFilters in process list, or no process list. "
560 <<
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
563 sort(dataFilters.begin(), dataFilters.end(), compare);
565 unsigned int numberOfDataFiltersOnThisMachine = 0;
572 if (find(IP.begin(), IP.end(),
i->hostname) != IP.end()) {
574 numberOfDataFiltersOnThisMachine++;
576 if (
i->port ==
this->port) {
582 if (numberOfDataFiltersOnThisMachine == 0) {
583 JNoticeStream(logger) <<
"Zero data filters on this machine according to process list (if it exists). "
584 <<
"Assuming one datafilter on this machine.";
585 numberOfDataFiltersOnThisMachine = 1;
588 if (thisProcess == dataFilters.end()) {
592 error <<
"This process cannot be found in the process list. Why do I exist?";
593 error <<
" my IP addresses:";
599 error <<
" my port: " << this->port;
600 error <<
" process list";
603 error <<
' ' <<
i->hostname <<
':' <<
i->port;
607 if (thisProcess != dataFilters.end() && thisProcess->index !=
getName()) {
608 JErrorStream(logger) <<
"Mismatch between given process names: "
610 <<
", but in the process list I am referred to as " << thisProcess->index;
613 if (dataFilters.begin() == thisProcess || dataFilters.empty()) {
617 if (maxQueueSize > (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine)) {
619 maxQueueSize = (totalCPURAM-
GIGABYTE)/(numberOfDataFiltersOnThisMachine);
621 JNoticeStream(logger) <<
"Maximum queue size is too large given the number of processes on this machine. "
622 <<
"Queue size reduced to "
623 << maxQueueSize <<
" bytes." ;
630 JNoticeStream(logger) <<
"Disabling high-rate veto of all PMTs.";
647 JNoticeStream(logger) <<
"This data filter process will report.";
651 JNoticeStream(logger) <<
"Update period [s]: " << getClockInterval();
663 if (buildL1.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL1."; }
664 if (buildL2.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildL2."; }
665 if (buildSN.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
666 if (buildNB.get() == NULL) {
JErrorStream(logger) <<
"Failed to allocate buildSN."; }
674 if (c_buffer.is_enabled()) {
676 if (!c_buffer.is_open()) {
680 if (c_buffer.is_open()) {
684 JStatusStream(logger) <<
"Created circular buffer " << c_buffer;
688 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << c_buffer.path <<
">; disable functionality.";
693 JNoticeStream(logger) <<
"Continue using circular buffer " << c_buffer;
697 if (c_buffer.is_open()) {
718 current_slice_index = -1;
719 dataqueue_slice_index.clear();
724 numberOfTimeslicesProcessed = 0;
725 numberOfIncompleteTimeslicesProcessed = 0;
727 number_of_packets_received = 0;
728 number_of_packets_discarded = 0;
729 number_of_bytes_received = 0;
732 minFrameNumber = numeric_limits<int>::max();
733 maxFrameNumber = numeric_limits<int>::min();
739 logErrorRun .reset();
740 logErrorDetector .reset();
741 logErrorIndex .reset();
742 logErrorIncomplete .reset();
743 logErrorOvercomplete.reset();
765 if (!timeslices.empty()) {
767 JNoticeStream(logger) <<
"Flushing " << timeslices.size() <<
" slices.";
769 for (deque<JDAQTimesliceL0>::const_iterator
i = timeslices.begin();
i != timeslices.end(); ++
i) {
778 deque<JDAQTimesliceL0> buffer;
780 timeslices.swap(buffer);
787 current_slice_index = -1;
788 dataqueue_slice_index.clear();
801 virtual void actionStop(
int length,
const char* buffer)
override
809 if (serversocket.is_valid()) {
810 serversocket->shutdown();
813 serversocket.reset();
817 virtual void actionQuit(
int length,
const char* buffer)
override
825 if (serversocket.is_valid()) {
826 mask.
set(*serversocket);
829 for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
830 if (!channel->isReady()) {
831 mask.
set(channel->getFileDescriptor());
841 for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
845 if (mask.
has(channel->getFileDescriptor())) {
849 if (channel->isReady()) {
851 number_of_packets_received += 1;
852 number_of_reads += channel->getCounter();
853 number_of_bytes_received += channel->size();
858 updateFrameQueue(*channel);
860 catch(
const std::exception& error) {
862 JErrorStream(logErrorRun) <<
"Update frame queue " << channel->getFileDescriptor() <<
' ' << channel->size() <<
' ' << error.what();
864 number_of_packets_discarded += 1;
869 JErrorStream(logErrorRun) <<
"Receiving data while not running.";
871 number_of_packets_discarded += 1;
879 catch(
const std::exception& error) {
882 JErrorStream(logger) <<
"Disconnect channel " << channel->getFileDescriptor() <<
' ' << error.what();
887 channel = channelList.erase(channel);
892 if (serversocket.is_valid()) {
894 if (mask.
has(*serversocket)) {
896 JTCPSocket socket(serversocket->getFileDescriptor());
900 socket.setKeepAlive (
true);
901 socket.setNonBlocking(
false);
903 JStatusStream(logger) <<
"New channel" <<
'[' << socket.getFileDescriptor() <<
']' <<
' ' << socket.getReceiveBufferSize();
910 if (!timeslices.empty()) {
912 const size_t number_of_frames =
getNumberOfFrames(frames_per_slice, factor_per_slice);
913 const size_t maximum_in_queue =
getMaximum(
make_array(next(timeslices.begin()), timeslices.end(), &JDAQTimesliceL0::size), (
size_t) 0);
915 if (((timeslices[0].size() >= number_of_frames &&
916 timeslices[0].
getFrameIndex() < dataqueue_slice_index.min()) ||
918 (maximum_in_queue >= number_of_frames &&
919 timeslices[0].
getFrameIndex() < dataqueue_slice_index.min()) ||
921 (timeslices.size() >= maxQueueDepth) ||
929 minFrameNumber = min(minFrameNumber, pending_slice.
getFrameIndex());
930 maxFrameNumber = max(maxFrameNumber, pending_slice.
getFrameIndex());
932 for (JDAQTimesliceL0::const_iterator
i = pending_slice.begin();
i != pending_slice.end(); ++
i) {
933 modules.insert(
i->getModuleID());
941 if (!pending_slice.empty()) {
942 processTimeSlice(pending_slice);
947 numberOfTimeslicesProcessed += 1;
953 JErrorStream(logErrorRun) <<
"Skip processing of data while not running.";
957 if (modules.size() > frames_per_slice) {
959 JErrorStream(logErrorOvercomplete) <<
"More active modules than expected "
960 << modules.size() <<
" > " << frames_per_slice
961 <<
" adjusting frames per slice to " << modules.size();
963 frames_per_slice = modules.size();
967 if (pending_slice.size() < number_of_frames) {
969 numberOfIncompleteTimeslicesProcessed += 1;
973 error <<
"Timeout -> processed incomplete timeslice: "
975 <<
"Size of timeslice = " << pending_slice.size() <<
';'
976 <<
"Queue depth = " << timeslices.size() <<
';'
978 <<
"DataQueue min = " << dataqueue_slice_index.min() <<
';'
979 <<
"DataQueue max = " << dataqueue_slice_index.max() <<
';';
981 if (maximum_in_queue >= number_of_frames) {
983 error <<
" intermittent problem -> continues as-is";
989 for (JDAQTimesliceL0::const_iterator
i = pending_slice.begin();
i != pending_slice.end(); ++
i) {
990 modules.insert(
i->getModuleID());
993 error <<
" adjusting frames per timeslice from " << frames_per_slice <<
" to " << modules.size();
995 frames_per_slice = modules.size();
1002 timeslices.pop_front();
1025 using namespace std;
1035 if (preamble.getLength() != channel.
size()) {
1037 JErrorStream(logErrorRun) <<
"Size of received data does not match size reported by preamble: "
1038 <<
"preamble.getLength() = " << preamble.getLength() <<
';'
1039 <<
"channel.size(): " << channel.
size() <<
';';
1041 number_of_packets_discarded += 1;
1048 JErrorStream(logErrorRun) <<
"Run number " << header.getRunNumber()
1050 <<
" -> Dropping frame.";
1052 number_of_packets_discarded += 1;
1057 if (header.getFrameIndex() <= current_slice_index) {
1059 number_of_packets_discarded += 1;
1061 if (modules.insert(header.getModuleID()).second) {
1063 frames_per_slice = modules.size();
1065 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" <= " << current_slice_index
1066 <<
" module " << header.getModuleID()
1067 <<
" -> dropping frame;"
1068 <<
" increase number of frames expected to: " << frames_per_slice;
1074 if (current_slice_index != -1 && header.getFrameIndex() > current_slice_index + maximal_frame_index) {
1076 number_of_packets_discarded += 1;
1078 JErrorStream(logErrorIndex) <<
"Frame index " << header.getFrameIndex() <<
" > " << current_slice_index + maximal_frame_index
1079 <<
" module " << header.getModuleID()
1080 <<
" -> Dropping frame.";
1085 deque<JDAQTimesliceL0>::iterator timesliceIterator = timeslices.begin();
1087 while (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex()) {
1088 ++timesliceIterator;
1091 if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
1099 timesliceIterator = timeslices.insert(timesliceIterator,
JDAQTimesliceL0());
1101 timesliceIterator->setDAQChronometer(header.getDAQChronometer());
1108 in >>
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
1123 using namespace std;
1127 timesliceRouter->configure(timeslice);
1141 c_buffer.is_enabled()) {
1150 for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
1152 if (moduleRouter->hasModule(frame->getModuleID())) {
1161 <<
"module = " << frame->getModuleID() <<
";"
1162 <<
"discard" << (dumpCount < dumpLimit ?
" and dump" :
"");
1164 if (dumpCount < dumpLimit && result.
has(dumpMask)) {
1165 timesliceTX.push_back(*frame);
1171 const JModule& module = moduleRouter->getModule(frame->getModuleID());
1172 JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*frame, module);
1188 if (buffer.begin() != __end) {
1191 frame->getModuleIdentifier(),
1196 (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
1198 (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
1205 frame->getModuleIdentifier(),
1208 (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
1213 frame->getModuleIdentifier(),
1216 (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
1221 frame->getModuleIdentifier(),
1224 (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
1228 JErrorStream(logErrorDetector) <<
"No detector information for module " << frame->getModuleID();
1232 if (!timesliceTX.empty()) {
1234 if (dumpCount < dumpLimit) {
1236 this->put(timesliceTX);
1248 for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
1269 (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
1270 (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
1271 (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
1275 for (JTriggerOutput::const_iterator event = trigger_output.begin();
event != trigger_output.end(); ++event) {
1281 numberOfEvents += 1;
1284 if (
parameters.writeL1() || c_buffer.sizeL1 > 0) {
1288 if (
parameters.writeL1) { this->put(
object); }
1289 if (c_buffer.sizeL1 > 0) { c_buffer.put(
object); }
1292 if (
parameters.writeL2() || c_buffer.sizeL2 > 0) {
1296 if (
parameters.writeL2) { this->put(
object); }
1297 if (c_buffer.sizeL2 > 0) { c_buffer.put(
object); }
1300 if (
parameters.writeSN() || c_buffer.sizeSN > 0) {
1304 if (
parameters.writeSN) { this->put(
object); }
1305 if (c_buffer.sizeSN > 0) { c_buffer.put(
object); }
1308 if (
parameters.writeL0() || c_buffer.sizeL0 > 0) {
1310 if (
parameters.writeL0) { this->put(timeslice); }
1311 if (c_buffer.sizeL0 > 0) { c_buffer.put(timeslice); }
1315 }
catch(
const std::exception& error) {
1317 JErrorStream(logger) <<
"Error = " << error.what() <<
";"
1320 <<
"time slice not correctly processed;"
1321 <<
"discard" << (dumpCount < dumpLimit ?
" and dump" :
"");
1323 if (dumpCount < dumpLimit) {
1325 this->put(static_cast<const JDAQTimeslice&>(timeslice));
1331 timesliceRouter->reset();
1342 const double T_us = (double) timer.usec_wall;
1344 JStatusStream(logger) <<
"Elapsed real (wall) time [s] " << T_us / 1e6;
1345 JStatusStream(logger) <<
"Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
1346 JStatusStream(logger) <<
"Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
1347 JStatusStream(logger) <<
"Elapsed real time per time slice [ms] " << Qt.getMean(0.0) * 1.0e-3 <<
" <= " << Qt.getXmax() * 1.0e-3;
1348 JStatusStream(logger) <<
"Number of packets received/discarded " << number_of_packets_received <<
"/" << number_of_packets_discarded;
1349 JStatusStream(logger) <<
"Number of events/MB/us " << numberOfEvents <<
"/" << numberOfBytes/1e6 <<
"/" << Qx.getMean(0.0);
1351 if (number_of_packets_received > 0) {
1352 JStatusStream(logger) <<
"Number of reads/packet " << (double) number_of_reads / (
double) number_of_packets_received;
1356 JStatusStream(logger) <<
"Current number of frames per slice expected: " << frames_per_slice <<
' ' <<
FIXED(5,3) << factor_per_slice;
1358 JStatusStream(logger) <<
"Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed <<
"/" << numberOfIncompleteTimeslicesProcessed;
1360 const double processedSlicesTime_us = numberOfTimeslicesProcessed *
getFrameTime() / 1000;
1361 const double processedDetectorTime_us = (maxFrameNumber - minFrameNumber) *
getFrameTime() / 1000;
1363 if (processedSlicesTime_us > 0) {
1364 JStatusStream(logger) <<
"Performance factor (inaccurate estimate): " << T_us / processedSlicesTime_us;
1366 if (processedDetectorTime_us > 0) {
1367 JStatusStream(logger) <<
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
1383 using namespace std;
1385 JDebugStream(logger) <<
"Received message <" << tag.
toString() <<
"> \"" << string(buffer, length) <<
"\"";
1389 if (c_buffer.is_open()) {
1391 JStatusStream(logger) <<
"Archive circular buffer in <" << c_buffer.archive <<
">";
1393 c_buffer.close(
true);
1396 if (c_buffer.is_enabled()) {
1400 if (c_buffer.is_open()) {
1402 JStatusStream(logger) <<
"Created circular buffer " << c_buffer;
1408 JErrorStream (logger) <<
"Failed to create circular buffer in directory <" << c_buffer.path <<
">; disable functionality.";
1440 datawriter->put(
object);
1448 catch(
const std::exception& error) {
1449 JErrorStream(logger) <<
"Error \"" << error.what() <<
"\"; trigger ev_error.";
1481 if (this->empty()) {
1487 int min = std::numeric_limits<int>::max();
1489 for (const_iterator
i = this->begin();
i != this->end(); ++
i) {
1490 if (
i->second < min) {
1506 if (this->empty()) {
1512 int max = std::numeric_limits<int>::lowest();
1514 for (const_iterator
i = this->begin();
i != this->end(); ++
i) {
1515 if (
i->second > max) {
1524 } dataqueue_slice_index;
1600 using namespace std;
1601 using namespace JPP;
1602 using namespace KM3NETDAQ;
1618 JParser<> zap(
"Application for real-time filtering of data.");
1620 zap[
'H'] =
make_field(server,
"host name of server for command messages") =
"localhost";
1621 zap[
'M'] =
make_field(logger,
"host name of server for logger messages") =
"localhost";
1622 zap[
'D'] =
make_field(hostname,
"host name of server of data writer") =
"";
1623 zap[
'u'] =
make_field(client_name,
"client name") =
"%";
1624 zap[
'P'] =
make_field(port,
"port to listen for incoming data from data queue");
1625 zap[
'q'] =
make_field(backlog,
"back log") = 1024;
1626 zap[
'c'] =
make_field(use_cout,
"print to terminal");
1627 zap[
'p'] =
make_field(
path,
"directory for temporary storage of circular buffer") =
"/tmp/";
1628 zap[
'A'] =
make_field(
archive,
"directory for permanent archival of circular buffer") =
"/tmp/";
1633 catch(
const std::exception& error) {
1634 FATAL(error.what() << endl);
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Exception for opening of file.
Message logger with time scheduler.
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
Utility class to parse command line options.
void processTimeSlice(const JDAQTimesliceL0 ×lice)
Process time slice.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
bool read(const JEquation &equation)
Read equation.
JTriggerMask_t getTriggerMask(const unsigned int bit)
Convert trigger bit to trigger mask.
int main(int argc, char *argv[])
void close(const bool option)
Close file.
ROOT TTree parameter settings of various packages.
Data structure for a composite optical module.
JSinglePointer< JTriggerNB > triggerNB
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
long long int numberOfTimeslicesProcessed
JBuildL2< hit_type > JBuildL2_t
Message logging based on std::ostream.
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
JMessageScheduler logErrorIndex
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
JSuperFrame2D< hit_type > JSuperFrame2D_t
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
Long64_t sizeL1
Number of L1 time slices.
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_DFILTER
Match of two events considering overlap in time.
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
Router for direct addressing of module data in detector data structure.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Interface for logging messages.
Message logging based on ControlHost.
Utility class to parse parameter values.
JSinglePointer< JServerSocket > serversocket
server for data queue connections
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
ROOT TTree object output.
JMessageScheduler logErrorIncomplete
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Simple data structure to support I/O of equations (see class JLANG::JEquation).
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
static const JNET::JTag IO_TRIGGER_PARAMETERS
JTriggerParameters parameters
Auxiliary data structure for floating point format specification.
Main class for real-time filtering of data.
void run()
Run as run control client following command messages via JNET::JControlHost.
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Basic data structure for time and time over threshold information of hit.
long long int number_of_packets_discarded
int getRunNumber() const
Get run number.
Data structure for detector geometry and calibration.
T * open(const std::string &file_name)
Open file.
Utility class to parse parameter values.
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
static const JChecksum checksum
Function object to perform check-sum of raw data.
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
std::string path
Directory for temporary storage.
void disable()
Disable writing.
int getFrameIndex() const
Get frame index.
JBuildL1< hit_type > JBuildL1_t
1-dimensional frame with time calibrated data from one optical module.
JCircularBuffer_t c_buffer
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e. time between earliest and latest hit) of Monte Carlo event.
void typeout()
Report status to message logger.
bool is_enabled() const
Check whether writing of data is enabled.
Basic data structure for L0 hit.
std::string index
index in process list
JSuperFrame1D< hit_type > JSuperFrame1D_t
std::string trim(const std::string &buffer)
Trim string.
JSinglePointer< JBuildL2_t > buildL2
int getLength() const
Get length.
JSinglePointer< JTriggerMXShower > triggerMXShower
JSinglePointer< JBuildL1_t > buildL1
then fatal Invalid path $argv[1] fi setopt extendedglob typeset Z8 ID function archive()
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
int getFileDescriptor() const
Get file descriptor.
long long int totalCPURAM
JTimeslice< hit_type > JTimeslice_t
JSinglePointer< JTrigger3DShower > trigger3DShower
Scheduling of actions via fixed latency intervals.
then echo Variable JPP_DIR undefined exit fi source $JPP_DIR setenv sh $JPP_DIR set_variable DIR $JPP_DIR examples JPhysics set_variable DEBUG set_variable darkcount_Hz set_variable rateL0_Hz set_array rateL1_Hz if do_usage *then usage $script[working directory] fi function typeout()
TFile * getFile(const std::string &file_name, const std::string &option="exist")
Get TFile pointer corresponding to give file name.
I/O formatting auxiliaries.
virtual void actionContinue(int length, const char *buffer) override
long long int numberOfBytes
Auxiliary class for itemization of process list.
std::vector< JDAQProcess > dataFilters
void merge(const JMatch_t &match)
Merge events.
std::vector< JSocketInputChannel_t > JChannelList_t
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
bool has(const JStatus &mask) const
Check for errors with given error mask.
event< ev_daq > ev_configure
int getFrameIndex(const double t_ns)
Get frame index for a given time in ns.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
void close(std::istream *pf)
Close file.
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
double getFrameTime()
Get frame time duration.
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
do set_variable OUTPUT_DIRECTORY $WORKDIR T
int getRunNumber(const std::string &file_name)
Get run number for given file name of data taking run.
JMessageScheduler logErrorDetector
Auxiliary class for handling status.
Auxiliary class for method select.
void put(const T &object)
Auxiliary method to send object to data server.
JSinglePointer< JBuildL2_t > buildSN
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
long long int numberOfIncompleteTimeslicesProcessed
Auxiliary class to build JDAQEvent for a triggered event.
JTreeWriter object output.
JSinglePointer< JBuildL2_t > buildNB
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
JTag tag
Unique tag of this process.
Level specific message streamers.
long long int number_of_packets_received
$WORKDIR driver txt done cat $WORKDIR driver txt<< EOFprocess ${DATAFILTER}$FILTER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataFilter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAFILTER}-P $PORT</dev/null > &/dev/null &))';process ${DATAWRITER}$WRITER_HOST csh-c '(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&(JDataWriter-H\$SERVER\$-M\$LOGGER\$-d $DEBUG-u ${DATAWRITER}</dev/null > &/dev/null &))';print enterevent ev_init{RC_CMD}event ev_reset{RC_CMD}event ev_init{RC_CMD}event ev_configure{RC_DFLTR%<$WORKDIR/ev_configure_datafilter.txt > RC_DQSIM<$WORKDIR/ev_configure_dqsimulator.txt > RC_DWRT path
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
const JPosition3D & getPosition() const
Get position.
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
virtual void actionQuit(int length, const char *buffer) override
Long64_t sizeSN
Number of SN time slices.
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
Auxiliary class for all subscription.
int port
server socket port
static void reset()
Reset counter of unique instance of this class object.
General purpose messaging.
Auxiliary data structure for sequence of same character.
long long int localtime_t
Type definition of local time.
void setReceiveBufferSize(const int size)
Set receive buffer size.
then fatal The output file must have the wildcard in the name
std::string hostname
host name of data server
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
static JStat getFileStatus
Function object for file status.
std::vector< JDAQProcess > dataQueues
long long int maxQueueSize
virtual void actionStart(int length, const char *buffer) override
const char * data() const
Get data.
std::string toString() const
Convert tag to string.
std::vector< value_type >::iterator iterator
virtual void actionReset(int length, const char *buffer) override
virtual void actionExit() override
JSinglePointer< JTimesliceRouter > timesliceRouter
static const long long int GIGABYTE
Number of bytes in a mega-byte.
Auxiliary class to build JDAQTimeslice for L1 timeslice.
JChannelList_t channelList
connections to data queue
Control unit client base class.
then fatal The output file must have the wildcard in the e g root fi eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Auxiliary class for date and time.
Utility class to parse command line options.
static const JNET::JTag RC_ALERT
Implemenation of object output through ControlHost.
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
unsigned long long int getRAM()
Get RAM of this CPU.
Long64_t sizeL0
Number of L0 time slices.
JSinglePointer< JTrigger3DMuon > trigger3DMuon
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
JMessageScheduler logErrorOvercomplete
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
JMessageScheduler logErrorRun
virtual bool enter(const JArgs &args)
Enter the state machine.
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
JSinglePointer< JModuleRouter > moduleRouter
2-dimensional frame with time calibrated data from one optical module.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
const char * getName()
Get ROOT name of given data type.
static const JNET::JTag RC_CMD
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
long long int number_of_bytes_received
virtual void actionStop(int length, const char *buffer) override
do set_variable DETECTOR_TXT $WORKDIR detector
KM3NeT DAQ constants, bit handling, etc.
std::string getFullName(const std::string &hostname, const std::string &name)
Get full name of run control client.
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
virtual void actionInit(int length, const char *buffer) override
Auxiliary data structure for result of checksum.
size_t getSizeof(const JDAQEvent &object)
Get size of object.
virtual void actionPause(int length, const char *buffer) override
Hostname and IP address functions.
Long64_t sizeL2
Number of L2 time slices.
Data frame of one optical module.
long long int number_of_reads
long long int numberOfEvents
Timeslice data structure for L0 data.
Basic data structure for L1 hit.
std::string archive
Directory for permanent archival.
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
virtual void actionConfigure(int length, const char *buffer) override
Time slice with calibrated data.
int size() const
Get size.