107      return number_of_frames;
 
  110    const size_t n = (size_t) (number_of_frames * factor);
 
  112    return (
n == 0 ? 1 : 
n);
 
 
  242        gErrorIgnoreLevel = kFatal;
 
  244        std::ostringstream os;
 
  249          std::remove(os.str().c_str());
 
  252        this->
open(os.str().c_str());
 
 
  272          const string file_name = this->
getFile()->GetName();
 
  304            std::remove(file_name.c_str());
 
 
  346          out << 
object.getFile()->GetName();
 
  352        out << 
object.sizeL0 << 
'/' 
  353            << 
object.sizeL1 << 
'/' 
  354            << 
object.sizeL2 << 
'/' 
  355            << 
object.sizeSN << 
'/';
 
 
 
  398                const std::string& 
server,
 
  404                const std::string& path,
 
  405                const std::string& archive) :
 
 
  442    virtual void actionInit(
int length, 
const char* buffer)
 override 
  456      catch(
const std::exception& error) {
 
 
  469      string _hostname_ = 
"";
 
  471      long long int update_s = 20;
 
  472      long long int logger_s = 10;
 
  493      properties[
"dataWriter"]             = _hostname_;
 
  501      properties[
"logger_s"]               = logger_s;
 
  502      properties[
"update_s"]               = update_s;
 
  515        properties.
read(
string(buffer, length));
 
  517      catch(
const std::exception& error) {
 
  521      if (update_s <=  0) { update_s = 20; }
 
  522      if (logger_s <=  0) { logger_s = 10; }
 
  526      _hostname_ = 
trim(_hostname_);
 
  528      if (_hostname_ != 
"" && _hostname_ != 
hostname) {
 
  542        catch (
const exception&) {
 
  560                              << 
"Assuming that this process is the only process on this CPU and setting parameters accordingly.";
 
  565      unsigned int numberOfDataFiltersOnThisMachine = 0;
 
  572        if (find(IP.begin(), IP.end(), i->hostname) != IP.end()) {
 
  574          numberOfDataFiltersOnThisMachine++;
 
  576          if (i->port == this->port) {
 
  582      if (numberOfDataFiltersOnThisMachine == 0) {
 
  583        JNoticeStream(
logger) << 
"Zero data filters on this machine according to process list (if it exists). " 
  584                              << 
"Assuming one datafilter on this machine.";
 
  585        numberOfDataFiltersOnThisMachine = 1;
 
  592        error << 
"This process cannot be found in the process list. Why do I exist?";
 
  593        error << 
" my IP addresses:";
 
  595        for (vector<string>::const_iterator i = IP.begin(); i != IP.end(); ++i) {
 
  599        error << 
" my port: " << this->
port;
 
  600        error << 
" process list";
 
  603          error << 
' ' << i->hostname << 
':' << i->port;
 
  610                             << 
", but in the process list I am referred to as " << thisProcess->index;
 
  621        JNoticeStream(
logger) << 
"Maximum queue size is too large given the number of processes on this machine. " 
  622                              << 
"Queue size reduced to "  
 
  769        for (deque<JDAQTimesliceL0>::const_iterator i = 
timeslices.begin(); i != 
timeslices.end(); ++i) {
 
  778        deque<JDAQTimesliceL0> buffer;
 
 
  801    virtual void actionStop(
int length, 
const char* buffer)
 override 
 
  817    virtual void actionQuit(
int length, 
const char* buffer)
 override 
 
  829      for (JChannelList_t::const_iterator channel = 
channelList.begin(); channel != 
channelList.end(); ++channel) {      
 
  830        if (!channel->isReady()) {
 
  831          mask.
set(channel->getFileDescriptor());   
 
 
  845          if (mask.
has(channel->getFileDescriptor())) {
 
  849          if (channel->isReady()) {
 
  860              catch(
const std::exception& error) {
 
  862                JErrorStream(
logErrorRun) << 
"Update frame queue " << channel->getFileDescriptor() << 
' ' << channel->size() << 
' ' << error.what(); 
 
  879        catch(
const std::exception& error) {
 
  882            JErrorStream(
logger) << 
"Disconnect channel " << channel->getFileDescriptor() << 
' ' << error.what(); 
 
  915        if (((
timeslices[0].size()          >=  number_of_frames &&                
 
  918             (maximum_in_queue              >=  number_of_frames &&                
 
  932          for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
 
  933            modules.insert(i->getModuleID());
 
  941            if (!pending_slice.empty()) {
 
  961                                               << 
" adjusting frames per slice to " << 
modules.size();
 
  967          if (pending_slice.size() < number_of_frames) {
 
  973            error << 
"Timeout -> processed incomplete timeslice: "  
  975                  << 
"Size of timeslice = " << pending_slice.size()          << 
';' 
  976                  << 
"Queue depth = "       << 
timeslices.size()             << 
';' 
  981            if (maximum_in_queue >= number_of_frames) {
 
  983              error << 
" intermittent problem -> continues as-is";
 
  989              for (JDAQTimesliceL0::const_iterator i = pending_slice.begin(); i != pending_slice.end(); ++i) {
 
  990                modules.insert(i->getModuleID());
 
 
 1025      using namespace std;
 
 1038                                  << 
"preamble.getLength() = " << preamble.
getLength() << 
';' 
 1039                                  << 
"channel.size(): "        << channel.
size()       << 
';';
 
 1050                                  << 
" -> Dropping frame.";
 
 1067                                      << 
" -> dropping frame;"  
 1080                                    << 
" -> Dropping frame.";
 
 1085      deque<JDAQTimesliceL0>::iterator timesliceIterator = 
timeslices.begin();
 
 1087      while (timesliceIterator != 
timeslices.end() && timesliceIterator->getFrameIndex() < header.
getFrameIndex()) {
 
 1088        ++timesliceIterator;
 
 1108      in >> 
static_cast<JDAQFrame&
>(*(timesliceIterator->rbegin()));
 
 
 1123      using namespace std;
 
 1150          for (JDAQTimesliceL0::const_iterator frame = timeslice.begin(); frame != timeslice.end(); ++frame) {
 
 1161                                       << 
"module = "      << frame->getModuleID()      << 
";" 
 1165                  timesliceTX.push_back(*frame);
 
 1188                if (buffer.begin() != __end) {
 
 1191                                                        frame->getModuleIdentifier(),
 
 1196                  (*buildL1)(buffer.begin(), __end , back_inserter(zbuf));
 
 1198                  (*buildNB)(buffer.begin() , __end, zbuf, back_inserter(*timesliceNB.rbegin()));
 
 1205                                                    frame->getModuleIdentifier(),
 
 1208              (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
 
 1213                                                    frame->getModuleIdentifier(),
 
 1216              (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
 
 1221                                                    frame->getModuleIdentifier(),
 
 1224              (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
 
 1232          if (!timesliceTX.empty()) {
 
 1236              this->
put(timesliceTX);
 
 1248            for (JTriggerInput::const_iterator hit = trigger_input.begin(); hit != trigger_input.end(); ++hit) {
 
 1253                                    getTriggerMask(
triggerNB->getTriggerBit()),
 
 1269          (*trigger3DMuon)  (trigger_input, back_inserter(trigger_output));
 
 1270          (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
 
 1271          (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
 
 1275          for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
 
 1315      } 
catch(
const std::exception& error) {
 
 1320                             << 
"time slice not correctly processed;" 
 
 1363      if (processedSlicesTime_us   > 0) {
 
 1364        JStatusStream(
logger) << 
"Performance factor (inaccurate estimate): "                << T_us / processedSlicesTime_us;
 
 1366      if (processedDetectorTime_us > 0) {
 
 1367        JStatusStream(
logger) << 
"Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTime_us;
 
 
 1383      using namespace std;
 
 
 1448      catch(
const std::exception& error) {
 
 
 1481        if (this->empty()) {
 
 1487          int min = std::numeric_limits<int>::max();
 
 1489          for (const_iterator i = this->begin(); i != this->end(); ++i) {
 
 1490            if (i->second < min) {
 
 1506        if (this->empty()) {
 
 1512          int max = std::numeric_limits<int>::lowest();
 
 1514          for (const_iterator i = this->begin(); i != this->end(); ++i) {
 
 1515            if (i->second > max) {
 
 
 1600  using namespace std;
 
 1601  using namespace JPP;
 
 1618    JParser<> zap(
"Application for real-time filtering of data.");
 
 1620    zap[
'H'] = 
make_field(server,      
"host name of server for command messages")               = 
"localhost";
 
 1621    zap[
'M'] = 
make_field(logger,      
"host name of server for logger messages")                = 
"localhost";
 
 1622    zap[
'D'] = 
make_field(hostname,    
"host name of server of data writer")                     = 
"";
 
 1623    zap[
'u'] = 
make_field(client_name, 
"client name")                                            = 
"%";
 
 1624    zap[
'P'] = 
make_field(port,        
"port to listen for incoming data from data queue");
 
 1625    zap[
'q'] = 
make_field(backlog,     
"back log")                                               =   1024;
 
 1626    zap[
'c'] = 
make_field(use_cout,    
"print to terminal");
 
 1627    zap[
'p'] = 
make_field(path,        
"directory for temporary storage of circular buffer")     = 
"/tmp/";
 
 1628    zap[
'A'] = 
make_field(archive,     
"directory for permanent archival of circular buffer")    = 
"/tmp/";
 
 1633  catch(
const std::exception& error) {
 
 1634    FATAL(error.what() << endl);
 
 
KM3NeT DAQ constants, bit handling, etc.
 
int main(int argc, char *argv[])
 
Data structure for detector geometry and calibration.
 
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
 
Basic data structure for L0 hit.
 
Basic data structure for L1 hit.
 
General purpose messaging.
 
Hostname and IP address functions.
 
Utility class to parse command line options.
 
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
 
I/O formatting auxiliaries.
 
Utility class to parse parameter values.
 
ROOT TTree parameter settings of various packages.
 
Scheduling of actions via fixed latency intervals.
 
Basic data structure for time and time over threshold information of hit.
 
Auxiliary methods to convert data members or return values of member methods of a set of objects to a...
 
void merge(const JMatch_t &match)
Merge events.
 
int getRunNumber() const
Get run number.
 
JDAQStateMachine::ev_error_event ev_error
 
JDAQStateMachine::ev_configure_event ev_configure
 
Router for direct addressing of module data in detector data structure.
 
Data structure for a composite optical module.
 
Utility class to parse parameter values.
 
bool read(const JEquation &equation)
Read equation.
 
Auxiliary class for CPU timing and usage.
 
unsigned long long usec_ucpu
 
unsigned long long usec_wall
 
unsigned long long usec_scpu
 
const JPosition3D & getPosition() const
Get position.
 
const char * data() const
Get data.
 
int size() const
Get size.
 
int getFileDescriptor() const
Get file descriptor.
 
virtual bool is_open() const =0
Check is device is open.
 
Simple data structure to support I/O of equations (see class JLANG::JEquation).
 
Auxiliary class for method select.
 
void set(const int file_descriptor)
Set file descriptor.
 
bool has(const int file_descriptor) const
Has file descriptor.
 
Exception for opening of file.
 
The template JSinglePointer class can be used to hold a pointer to an object.
 
Message logging based on ControlHost.
 
Interface for logging messages.
 
Message logger with time scheduler.
 
Message logging based on std::ostream.
 
Implemenation of object output through ControlHost.
 
void setReceiveBufferSize(const int size)
Set receive buffer size.
 
int getReceiveBufferSize() const
Set receive buffer size.
 
void setKeepAlive(const bool on)
Set keep alive of socket.
 
void setNonBlocking(const bool on)
Set non-blocking of I/O.
 
std::string toString() const
Convert tag to string.
 
Utility class to parse command line options.
 
TFile * getFile() const
Get file.
 
virtual bool is_open() const override
Check is file is open.
 
JTreeWriter object output.
 
JTreeWriter< T, JRootCreateFlatTree< T >::value > * out
 
virtual bool put(const T &object) override
Object output.
 
ROOT TTree object output.
 
virtual void close() override
 
1-dimensional frame with time calibrated data from one optical module.
 
2-dimensional frame with time calibrated data from one optical module.
 
static JSuperFrame2D< JElement_t, JAllocator_t > demultiplex
Demultiplexer.
 
std::vector< value_type >::iterator iterator
 
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
 
Auxiliary class to build JDAQTimeslice for L1 timeslice.
 
Time slice with calibrated data.
 
Auxiliary class to build JDAQEvent for a triggered event.
 
Simple data structure for the DAQ preamble required for a correct calculation of the object size for ...
 
int getLength() const
Get length.
 
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
 
int getRunNumber() const
Get run number.
 
int getFrameIndex() const
Get frame index.
 
Control unit client base class.
 
JSharedPointer< JControlHost > server
message server
 
bool isRunning() const
Check if this client is in runnig state.
 
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
 
void run()
Run as run control client following command messages via JNET::JControlHost.
 
JMessageLogger logger
message logger
 
virtual bool enter(const JArgs &args)
Enter the state machine.
 
void addSubscription(const JSubscription &subscription)
Add custom subscription.
 
int getModuleID() const
Get module identifier.
 
Auxiliary class for itemization of process list.
 
std::string index
index in process list
 
Data frame of one optical module.
 
static void reset()
Reset counter of unique instance of this class object.
 
Main class for real-time filtering of data.
 
long long int number_of_packets_received
 
JMessageScheduler logErrorRun
 
JSinglePointer< JServerSocket > serversocket
server for data queue connections
 
JMessageScheduler logErrorDetector
 
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
 
JCircularBuffer_t c_buffer
 
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
 
long long int numberOfBytes
 
long long int number_of_packets_discarded
 
JMessageScheduler logErrorIndex
 
static bool compare(const JDAQProcess &first, const JDAQProcess &second)
Sort DAQ process by index.
 
virtual void actionInit(int length, const char *buffer) override
 
void put(const T &object)
Auxiliary method to send object to data server.
 
JSocketInputChannel< JDAQAbstractPreamble > JSocketInputChannel_t
 
long long int number_of_bytes_received
 
virtual void actionContinue(int length, const char *buffer) override
 
long long int maxQueueSize
 
int port
server socket port
 
long long int totalCPURAM
 
void processTimeSlice(const JDAQTimesliceL0 ×lice)
Process time slice.
 
std::deque< JDAQTimesliceL0 > timeslices
buffer with pending time slice data
 
std::vector< JDAQProcess > dataQueues
 
JMessageScheduler logErrorOvercomplete
 
long long int number_of_reads
 
std::vector< JSocketInputChannel_t > JChannelList_t
 
virtual void actionStop(int length, const char *buffer) override
 
JSinglePointer< JBuildL2_t > buildNB
 
JSinglePointer< JBuildL2_t > buildL2
 
JBuildL1< hit_type > JBuildL1_t
 
JSinglePointer< JBuildL1_t > buildL1
 
virtual void actionExit() override
 
virtual void actionQuit(int length, const char *buffer) override
 
virtual void actionTagged(const JTag &tag, int length, const char *buffer) override
Tagged action to handle alerts.
 
std::string hostname
host name of data server
 
virtual void actionConfigure(int length, const char *buffer) override
 
long long int numberOfTimeslicesProcessed
 
void updateFrameQueue(const JSocketInputChannel_t &channel)
Update queue with data frames.
 
JChannelList_t channelList
connections to data queue
 
long long int numberOfIncompleteTimeslicesProcessed
 
JSinglePointer< JTrigger3DMuon > trigger3DMuon
 
JDataFilter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const std::string &path, const std::string &archive)
Constructor.
 
JSuperFrame1D< hit_type > JSuperFrame1D_t
 
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
 
JMessageScheduler logErrorIncomplete
 
JSuperFrame2D< hit_type > JSuperFrame2D_t
 
virtual void actionEnter() override
Interface methods for actions corresponding to state transitions.
 
JTimeslice< hit_type > JTimeslice_t
 
JSinglePointer< JModuleRouter > moduleRouter
 
JSinglePointer< JControlHost_t > datawriter
controlhost of data server (to which data writer should be connected)
 
JSinglePointer< JTimesliceRouter > timesliceRouter
 
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
 
JSinglePointer< JTriggerNB > triggerNB
 
JBuildL2< hit_type > JBuildL2_t
 
long long int numberOfEvents
 
std::vector< JDAQProcess > dataFilters
 
JSinglePointer< JTrigger3DShower > trigger3DShower
 
virtual void actionStart(int length, const char *buffer) override
 
virtual void actionPause(int length, const char *buffer) override
 
JSinglePointer< JTriggerMXShower > triggerMXShower
 
virtual void actionReset(int length, const char *buffer) override
 
JSinglePointer< JBuildL2_t > buildSN
 
JTriggerParameters parameters
 
void typeout()
Report status to message logger.
 
double getMaximalDistance(const JDetector &detector, const bool option=false)
Get maximal distance between modules in detector.
 
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
 
const array_type< JValue_t > & make_array(const JValue_t(&array)[N])
Method to create array of values.
 
T getMaximum(const array_type< T > &buffer, const T value)
Get maximum of values.
 
std::string trim(const std::string &buffer)
Trim string.
 
static const long long int GIGABYTE
Number of bytes in a mega-byte.
 
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
 
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
 
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
 
const char * getName()
Get ROOT name of given data type.
 
static const JLocalTime getLocalTime
Function object to get local time in micro seconds.
 
long long int localtime_t
Type definition of local time.
 
unsigned long long int getRAM()
Get RAM of this CPU.
 
std::vector< std::string > getListOfIPaddresses()
Get list of IP address (decimal-dot notation).
 
int rename(const std::string &inputFile, const std::string &outputFile)
Rename file across file systems.
 
static JStat getFileStatus
Function object for file status.
 
static const JChecksum checksum
Function object to perform check-sum of raw data.
 
KM3NeT DAQ data structures and auxiliaries.
 
size_t getNumberOfFrames(const size_t number_of_frames, const double factor)
Get expected number of frames according a given allowed fraction of active modules.
 
static const JNET::JTag RC_DFILTER
 
double getFrameTime()
Get frame time duration.
 
static const JNET::JTag RC_ALERT
 
static const int DFILTER_DQUEUE_BUFFER_SIZE
socket JDataFilter.cc <- DataQueue.cc
 
static const int DFILTER_RECEIVE_BUFFER_SIZE
socket JDataFilter.cc <- JLigier.cc
 
static const JNET::JTag RC_CMD
 
static const int DFILTER_SEND_BUFFER_SIZE
socket JDataFilter.cc -> JDataWriter.cc
 
static const JNET::JTag IO_TRIGGER_PARAMETERS
 
size_t getSizeof()
Definition of method to get size of data type.
 
static const int HIGH_RATE_VETO_DISABLE
Enable (disable) use of high-rate veto test if this status bit is 0 (1);.
 
Auxiliary data structure for sequence of same character.
 
Auxiliary data structure for floating point format specification.
 
Match of two events considering overlap in time and position.
 
Auxiliary class for handling status.
 
void set(const int bit)
Set PMT status.
 
Level specific message streamers.
 
Auxiliary class for all subscription.
 
Auxiliary class for date and time.
 
int getYear() const
year a.d.
 
int getDay() const
day of the month [1-31]
 
int getMonth() const
month of the year [1-12]
 
Auxiliary data structure for result of checksum.
 
@ EPMT_t
PMT number error.
 
@ TIME_t
Time order error.
 
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
 
const JTag & getUniqueTag() const
Get unique tag of this run control client.
 
const std::string & getFullName() const
Get full name of this run control client.
 
void setClockInterval(const long long int interval_us)
Set interval time.
 
long long int getClockInterval() const
Get interval time.
 
Timeslice data structure for L0 data.
 
std::string archive
Directory for permanent archival.
 
std::string path
Directory for temporary storage.
 
Long64_t sizeL1
Number of L1 time slices.
 
Long64_t sizeSN
Number of SN time slices.
 
friend std::ostream & operator<<(std::ostream &out, const JCircularBuffer_t &object)
Write circular buffer to output stream.
 
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
 
JTag tag
Unique tag of this process.
 
Long64_t sizeL0
Number of L0 time slices.
 
void close(const bool option)
Close file.
 
Long64_t sizeL2
Number of L2 time slices.
 
void disable()
Disable writing.
 
JCircularBuffer_t(const std::string &path, const std::string &archive, const JTag &tag)
Constructor.
 
bool is_enabled() const
Check whether writing of data is enabled.