Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Public Member Functions | Static Public Attributes | Protected Attributes | Private Types | Private Attributes | List of all members
JDataFilter Class Reference
Inheritance diagram for JDataFilter:
KM3NETDAQ::JDAQClient

Public Member Functions

 JDataFilter (const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const int port, const int backlog, const int buffer_size)
 Constructor. More...
 
virtual void actionEnter ()
 
virtual void actionExit ()
 
virtual void actionInit (int length, const char *buffer)
 
virtual void actionConfigure (int length, const char *buffer)
 
virtual void actionStart (int length, const char *buffer)
 
virtual void actionPause (int length, const char *buffer)
 
virtual void actionContinue (int length, const char *buffer)
 
virtual void actionStop (int length, const char *buffer)
 
virtual void actionReset (int length, const char *buffer)
 
virtual void actionQuit (int length, const char *buffer)
 
virtual void setSelect (JFileDescriptorMask &mask) const
 
virtual void actionSelect (const JFileDescriptorMask &mask)
 
virtual void actionRunning ()
 This method is repeatedly called when this client machine is in state Running and the clock interval time is non-zero. More...
 
void updateFrameQueue (const JChannelList_t::const_iterator channel)
 
void processTimeSlice (const JDAQTimesliceL0 &pending_slice)
 
void typeout ()
 
virtual bool enter (const JArgs &args)
 Enter the state machine. More...
 
virtual bool enter ()
 Enter the state machine. More...
 
virtual bool exit ()
 Exit the state machine. More...
 
const std::string & getFullName () const
 Get full name of this run control client. More...
 
const JNET::JTaggetUniqueTag () const
 Get unique tag of this run control client. More...
 
int getEventNumber () const
 Get last event number. More...
 
bool isRunning () const
 Check if this client is in runnig state. More...
 
void replaceEvent (const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
 Replace tag of given event in event table. More...
 
JDAQEvent_t * findEvent (const JNET::JTag &tag, const std::string &event_name)
 Find event in event table. More...
 
template<class T >
void addParameter (const char option, T &parameter)
 Add parameter to parser used in method enter(). More...
 
template<class T >
void addParameter (const char option, T &parameter, const T &value)
 Add parameter to parser used in method enter(). More...
 
long long int getClockDelay () const
 Get total delay time. More...
 
const std::string & getHostname () const
 Get hostname. More...
 
long long int getClockInterval () const
 Get interval time. More...
 
void setClockInterval (const long long int interval_us)
 Set interval time. More...
 
void resetClock ()
 Reset clock. More...
 
void setSelect ()
 Set the file descriptor mask for the select call. More...
 
virtual void setSelect (JFileDescriptorMask &mask) const
 Set the file descriptor mask for the select call. More...
 
virtual void actionSelect (const JFileDescriptorMask &mask)
 Action method following last select call. More...
 
virtual void actionInput (int length, const char *buffer)
 This method is called at ev_input. More...
 
virtual bool filter (const std::string &tag, int length, const char *buffer)
 Filter message. More...
 
void run ()
 Run as run control client following command messages via JNET::JControlHost. More...
 
void run (const int port)
 Run for ever. More...
 
void run (std::istream &in)
 Run client with commands from input stream (e.g for debugging). More...
 

Static Public Attributes

static const int TIMEOUT_S = 1
 time out of update [s] More...
 

Protected Attributes

JSharedPointer< JControlHostserver
 message server More...
 
JMessageLogger logger
 message logger More...
 

Private Types

typedef
JControlHostObjectOutput
< JDAQTypes_t
JControlHost_t
 

Private Attributes

JSinglePointer< JControlHost_tdatawriter
 
std::string hostname
 controlhost to send data to the datawriter More...
 
int port
 
int backlog
 serversocket port More...
 
int buffer_size
 
JSinglePointer< JServerSocketserversocket
 
JChannelList_t channelList
 
JTimer timer
 
list< JDAQTimesliceL0timeslices
 
int current_slice_index
 
unsigned int frames_per_slice
 
unsigned int maximum_frames_per_slice
 
JTriggerParameters parameters
 
string interfaceInput
 
JSinglePointer< JTimesliceRoutertimesliceRouter
 
JDetector detector
 
JSinglePointer< JModuleRoutermoduleRouter
 
JSinglePointer< JBuildL1_tbuildL1
 
JSinglePointer< JBuildL2_tbuildL2
 
JSinglePointer< JBuildL2_tbuildSN
 
JSinglePointer< JTrigger3DMuontrigger3DMuon
 
JSinglePointer< JTrigger3DShowertrigger3DShower
 
JSinglePointer< JTriggerMXShowertriggerMXShower
 
JMessageScheduler logErrorRun
 
JMessageScheduler logErrorDetector
 
JMessageScheduler logErrorIndex
 
JMessageScheduler logErrorIncomplete
 
JMessageScheduler logErrorInvalidPMTs
 
vector< JDAQProcessdataFilters
 
vector< JDAQProcessdataQueues
 
long long int totalCPURAM
 
unsigned int maxQueueDepth
 
long long int maxQueueSize
 
long long int queueSize
 
bool reporting
 
long long int numberOfEvents
 
long long int numberOfBytes
 
long long int numberOfTimeslicesProcessed
 
long long int numberOfIncompleteTimeslicesProcessed
 
long long int numberOfFramesWithInvalidPMTs
 
int minFrameNumber
 
int maxFrameNumber
 
long long int number_of_packets_received
 
long long int number_of_packets_discarded
 
long long int number_of_reads
 
long long int number_of_bytes_received
 
bool checkForInvalidPMTs
 
int maxHitsFrame
 

Detailed Description

Definition at line 99 of file JDataFilter.cc.

Member Typedef Documentation

Definition at line 874 of file JDataFilter.cc.

Constructor & Destructor Documentation

JDataFilter::JDataFilter ( const std::string &  name,
const std::string &  server,
const std::string &  hostname,
JLogger logger,
const int  level,
const int  port,
const int  backlog,
const int  buffer_size 
)
inline

Constructor.

Parameters
namename of client
servername of command message server
hostnamename of data server
loggerpointer to logger
leveldebug level
portserver port
backlogserver backlog
buffer_sizeserver buffer

replace tag to receive dfilter specific messages in configure

Definition at line 115 of file JDataFilter.cc.

122  :
123  JDAQClient(name,server,logger,level),
124  hostname (hostname),
125  parameters()
126  {
127  this->port = port;
128  this->backlog = backlog;
129  this->buffer_size = buffer_size;
130 
131  replaceEvent(RC_CMD, RC_DFILTER, ev_configure); /// replace tag to receive dfilter specific messages in configure
132 
133  totalCPURAM = getRAM();
134 
135  current_slice_index = -1;
136 
137  reporting = false;
138  }
int current_slice_index
Definition: JDataFilter.cc:888
std::string hostname
controlhost to send data to the datawriter
Definition: JDataFilter.cc:877
static const JNET::JTag RC_DFILTER
Definition: JDAQTags.hh:50
JDAQClient(const std::string &name)
Constructor.
Definition: JDAQClient.hh:97
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
int backlog
serversocket port
Definition: JDataFilter.cc:879
JTriggerParameters parameters
Definition: JDataFilter.cc:895
long long int totalCPURAM
Definition: JDataFilter.cc:921
unsigned long long int getRAM()
Get RAM of this CPU.
void replaceEvent(const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:294
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44

Member Function Documentation

virtual void JDataFilter::actionEnter ( )
inlinevirtual

Definition at line 141 of file JDataFilter.cc.

142  {
143  try {
144 
145  JDebugStream(logger) << "Start server.";
146 
148  }
149  catch(const std::exception& error) {
150  JErrorStream(logger) << error.what();
151  }
152  }
TCP Server socket.
int backlog
serversocket port
Definition: JDataFilter.cc:879
Level specific message streamers.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
JSinglePointer< JServerSocket > serversocket
Definition: JDataFilter.cc:882
virtual void JDataFilter::actionExit ( )
inlinevirtual

Definition at line 155 of file JDataFilter.cc.

156  {
157  JDebugStream(logger) << "Stop server.";
158 
159  serversocket.reset();
160  datawriter .reset();
161  }
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
Level specific message streamers.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
JSinglePointer< JServerSocket > serversocket
Definition: JDataFilter.cc:882
virtual void JDataFilter::actionInit ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 164 of file JDataFilter.cc.

165  {
166  JDebugStream(logger) << "actionInit() " << std::string(buffer,length);
167  }
Level specific message streamers.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
virtual void JDataFilter::actionConfigure ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 170 of file JDataFilter.cc.

171  {
172 
173  // todo:
174 
175  // 3) receive and set site id
176 
177 
178  using namespace std;
179  using namespace JPP;
180 
181  JDebugStream(logger) << "actionConfigure() " << std::string(buffer,length);
182 
183  long long int logger_s = 5;
184 
185  parameters .reset();
186 
187  dataFilters.clear();
188  dataQueues .clear();
189 
190  reporting = false;
191 
192  JProperties properties(JEquationParameters("=", ";", "", ""), 0);
193 
194  properties["dataWriter"] = hostname;
195  properties["numberOfFramesPerSlice"] = frames_per_slice = 1;
196  properties["detector"] = detector;
197  properties["triggerParameters"] = parameters;
198  properties["queueSize"] = maxQueueSize = (totalCPURAM - GIGABYTE); // leave 1 GB left ...
199  properties["queueDepth"] = maxQueueDepth = 20; // to be optimized
200  properties["logger_s"] = logger_s;
201  properties["JDataFilter"] = dataFilters;
202  properties["DataQueue"] = dataQueues;
203  properties["pmtIdCheck"] = checkForInvalidPMTs = false;
204  properties["maxHitsFrame"] = maxHitsFrame = 31*2000; // default is 20 kHz
205 
206  try {
207  properties.read(string(buffer, length));
208  }
209  catch(const exception& error) {
210  JErrorStream(logger) << error.what();
211  }
212 
214 
215  if (hostname != "")
216  datawriter.reset(new JControlHost_t(hostname)); // may throw an error
217  else
218  throw JException("Undefined data writer host name.");
219 
221 
222  // process processlist
223 
224  if(dataFilters.empty()) {
225  JNoticeStream(logger) << "No DataFilters in process list, or no process list. Assuming that this process is the only process on this CPU and setting parameters accordingly.";
226  }
227 
228 
229  sort(dataFilters.begin(), dataFilters.end(), processIndexSorter);
230 
231  unsigned int numberOfDataFiltersOnThisMachine = 0;
232  vector<JDAQProcess>::iterator thisProcess = dataFilters.end();
233 
234  for(vector<JDAQProcess>::iterator i = dataFilters.begin(); i != dataFilters.end(); ++i) {
235 
236  if(i->hostname == getHostname() ||
237  i->hostname == getIPaddress()) {
238 
239  numberOfDataFiltersOnThisMachine++;
240 
241  if (i->port == this->port) {
242  thisProcess = i;
243  }
244  }
245  }
246 
247  if (numberOfDataFiltersOnThisMachine == 0) {
248  JNoticeStream(logger) << "Zero datafilters on this machine according to process list (if it exists). Assuming one datafilter on this machine.";
249  numberOfDataFiltersOnThisMachine = 1;
250  }
251 
252  if (thisProcess == dataFilters.end()) {
253  JErrorStream(logger) << "This process cannot be found in the process list. Why do I exist?";
254  }
255 
256  if (thisProcess != dataFilters.end() && thisProcess->index != getName()) {
257  JErrorStream(logger) << "Mismatch between given process names: "
258  << "I am called " << getName() << ", but in the process list I am referred to as " << thisProcess->index;
259  }
260 
261  if (dataFilters.begin() == thisProcess || dataFilters.empty()) { // set reporting
262  reporting = true;
263  }
264 
265  if (maxQueueSize > (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine)) { // check maximum size of queue
266 
267  maxQueueSize = (totalCPURAM-GIGABYTE)/(numberOfDataFiltersOnThisMachine);
268 
269  JNoticeStream(logger) << "Maximum queue size is too large given the number of processes on this machine. Queue size reduced to "
270  << maxQueueSize << " bytes." ;
271  }
272 
273  // trigger parameters
274 
276 
280 
281  moduleRouter.reset(new JModuleRouter(detector));
282 
283  if (reporting) {
284  JNoticeStream(logger) << "This JDataFilter process will report.";
285  JNoticeStream(logger) << "Number of modules: " << (*moduleRouter)->size();
286  JNoticeStream(logger) << "Trigger parameters: " << parameters;
287  JNoticeStream(logger) << "Detector description: " << endl << detector;
288  JNoticeStream(logger) << "Clock interval: " << getClockInterval();
289  }
290 
292 
293  // set L1, L2 and SN builders
294 
295  buildL1.reset(new JBuildL1_t(parameters));
296  buildL2.reset(new JBuildL2_t(parameters.L2));
297  buildSN.reset(new JBuildL2_t(parameters.SN));
298 
299  if (buildL1.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL1."; }
300  if (buildL2.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildL2."; }
301  if (buildSN.get() == NULL) { JErrorStream(logger) << "Failed to allocate buildSN."; }
302 
303  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
304  logErrorDetector = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
305  logErrorIndex = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
306  logErrorIncomplete = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
307  logErrorInvalidPMTs = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
308  }
Message logger with time scheduler.
void reset()
Reset trigger parameters.
General exception.
Definition: JException.hh:40
JBuildL2< hit_type > JBuildL2_t
Definition: JDataFilter.cc:96
std::string hostname
controlhost to send data to the datawriter
Definition: JDataFilter.cc:877
Router for direct addressing of module data in detector data structure.
JL2Parameters L2
L2 processing.
int numberOfBins
number of bins for lookup table of timeslice
JDetector detector
Definition: JDataFilter.cc:899
Utility class to parse parameter values.
Definition: JProperties.hh:484
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
std::string getIPaddress(const int ip)
Get IP address (decimal-dot notation).
Definition: JNetwork.hh:150
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:910
JSinglePointer< JBuildL1_t > buildL1
Definition: JDataFilter.cc:901
vector< JDAQProcess > dataFilters
Definition: JDataFilter.cc:916
static const long long int GIGABYTE
Number of bytes in a megabyte.
Definition: JConstants.hh:81
JSinglePointer< JBuildL2_t > buildL2
Definition: JDataFilter.cc:902
JControlHostObjectOutput< JDAQTypes_t > JControlHost_t
Definition: JDataFilter.cc:874
vector< JDAQProcess > dataQueues
Definition: JDataFilter.cc:917
unsigned int maxQueueDepth
Definition: JDataFilter.cc:922
JBuildL1< hit_type > JBuildL1_t
Definition: JDataFilter.cc:95
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
double getMaximalDistance(const JDetector &detector)
Get maximal distance between modules in detector.
JL2Parameters SN
Supernova trigger.
Detector file.
Definition: JHead.hh:126
JMessageScheduler logErrorInvalidPMTs
Definition: JDataFilter.cc:913
JSinglePointer< JTriggerMXShower > triggerMXShower
Definition: JDataFilter.cc:907
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
JSinglePointer< JBuildL2_t > buildSN
Definition: JDataFilter.cc:903
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:911
Router for fast addressing of hits in KM3NETDAQ::JDAQTimeslice data structure as a function of the op...
JTriggerParameters parameters
Definition: JDataFilter.cc:895
unsigned int frames_per_slice
Definition: JDataFilter.cc:890
Level specific message streamers.
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:909
JSinglePointer< JTimesliceRouter > timesliceRouter
Definition: JDataFilter.cc:897
JSinglePointer< JTrigger3DShower > trigger3DShower
Definition: JDataFilter.cc:906
long long int totalCPURAM
Definition: JDataFilter.cc:921
bool processIndexSorter(const JDAQProcess &a, const JDAQProcess &b)
Definition: JDataFilter.cc:73
JSinglePointer< JModuleRouter > moduleRouter
Definition: JDataFilter.cc:900
long long int maxQueueSize
Definition: JDataFilter.cc:923
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:912
const char * getName()
Get ROOT name of given data type.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
bool checkForInvalidPMTs
Definition: JDataFilter.cc:946
long long int getClockInterval() const
Get interval time.
Definition: JDAQClient.hh:374
JSinglePointer< JTrigger3DMuon > trigger3DMuon
Definition: JDataFilter.cc:905
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:891
const std::string & getHostname() const
Get hostname.
Definition: JDAQClient.hh:363
void set(const double DMax_m=0.0)
Set dependent trigger parameters.
virtual void JDataFilter::actionStart ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 311 of file JDataFilter.cc.

312  {
313  if (reporting) {
314  JNoticeStream(logger) << "Start run " << getRunNumber();
315  }
316 
317  timeslices.clear();
318 
319  current_slice_index = -1;
320  queueSize = 0;
321 
322  numberOfEvents = 0;
323  numberOfBytes = 0;
326 
328 
332  number_of_reads = 0;
333 
334  minFrameNumber = numeric_limits<int>::max();
335  maxFrameNumber = numeric_limits<int>::min();
336 
337  // Reset global trigger counter.
338 
340 
341  logErrorRun .reset();
343  logErrorIndex .reset();
346 
347  timer.reset();
348  timer.start();
349 
350  // send trigger parameters to the datawriter
351 
352  ostringstream os;
353 
354  os << getRunNumber() << ' ' << parameters;
355 
356  datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str());
357  }
int current_slice_index
Definition: JDataFilter.cc:888
int maxFrameNumber
Definition: JDataFilter.cc:937
long long int numberOfFramesWithInvalidPMTs
Definition: JDataFilter.cc:934
int minFrameNumber
Definition: JDataFilter.cc:936
long long int numberOfEvents
Definition: JDataFilter.cc:930
long long int number_of_packets_received
Definition: JDataFilter.cc:941
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:910
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:66
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:932
JTimer timer
Definition: JDataFilter.cc:885
JMessageScheduler logErrorInvalidPMTs
Definition: JDataFilter.cc:913
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:911
JTriggerParameters parameters
Definition: JDataFilter.cc:895
static void reset()
Reset counter of unique instance of this class object.
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:909
long long int queueSize
Definition: JDataFilter.cc:924
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:933
long long int number_of_reads
Definition: JDataFilter.cc:943
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:912
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
long long int number_of_packets_discarded
Definition: JDataFilter.cc:942
long long int number_of_bytes_received
Definition: JDataFilter.cc:944
long long int numberOfBytes
Definition: JDataFilter.cc:931
virtual void JDataFilter::actionPause ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 360 of file JDataFilter.cc.

361  {
362  if (!timeslices.empty()) {
363 
364  JNoticeStream(logger) << "Flushing " << timeslices.size() << " slices.";
365 
366  for (list<JDAQTimesliceL0>::const_iterator i = timeslices.begin(); i != timeslices.begin(); ++i) {
367  queueSize -= i->getSize();
368  }
369 
370  timeslices.clear();
371  }
372 
373  if (queueSize != 0) {
374  JWarningStream(logger) << "Pending data in queue [B] " << queueSize;
375  }
376 
377  current_slice_index = -1;
378  queueSize = 0;
379 
380  timer.stop();
381  }
int current_slice_index
Definition: JDataFilter.cc:888
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
JTimer timer
Definition: JDataFilter.cc:885
long long int queueSize
Definition: JDataFilter.cc:924
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
virtual void JDataFilter::actionContinue ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 384 of file JDataFilter.cc.

385  {
386  timer.start();
387  }
JTimer timer
Definition: JDataFilter.cc:885
virtual void JDataFilter::actionStop ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 390 of file JDataFilter.cc.

391  {
392  typeout();
393  }
void typeout()
Definition: JDataFilter.cc:831
virtual void JDataFilter::actionReset ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 396 of file JDataFilter.cc.

397  {}
virtual void JDataFilter::actionQuit ( int  length,
const char *  buffer 
)
inlinevirtual

Definition at line 400 of file JDataFilter.cc.

401  {
402  datawriter.reset();
403  }
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
virtual void JDataFilter::setSelect ( JFileDescriptorMask &  mask) const
inlinevirtual

Definition at line 406 of file JDataFilter.cc.

407  {
408  if (serversocket.is_valid()) {
409  mask.set(*serversocket);
410  }
411 
412  for (JChannelList_t::const_iterator channel = channelList.begin(); channel != channelList.end(); ++channel) {
413  if (!channel->isReady()) {
414  mask.set(channel->getFileDescriptor());
415  }
416  }
417  }
void set(const int file_descriptor)
Set file descriptor.
JChannelList_t channelList
Definition: JDataFilter.cc:883
JSinglePointer< JServerSocket > serversocket
Definition: JDataFilter.cc:882
virtual void JDataFilter::actionSelect ( const JFileDescriptorMask &  mask)
inlinevirtual

Definition at line 420 of file JDataFilter.cc.

421  {
422  for (JChannelList_t::iterator channel = channelList.begin(); channel != channelList.end(); ) {
423 
424  try {
425 
426  if (mask.has(channel->getFileDescriptor())) {
427  channel->read();
428  }
429 
430  if (channel->isReady()) {
431 
433  number_of_reads += channel->getCounter();
434  number_of_bytes_received += channel->size();
435 
436  if (isRunning()) {
437 
438  updateFrameQueue(channel);
439 
440  } else {
441 
442  JErrorStream(logErrorRun) << "Receiving data while not running.";
443 
445  }
446 
447  channel->reset();
448  }
449 
450  ++channel;
451  }
452  catch(const JSocketException& error) {
453 
454  JNoticeStream(logger) << "Disconnecting channel " << channel->getFileDescriptor() << ' ' << error.what();
455 
456  channel->shutdown();
457 
458  channel = channelList.erase(channel);
459  }
460  }
461 
462 
463  if (serversocket.is_valid()) {
464 
465  if (mask.has(*serversocket)) {
466 
467  JSocket socket;
468 
469  socket.accept(serversocket->getFileDescriptor());
470 
471  //socket.setSendBufferSize (buffer_size);
473 
474  socket.setKeepAlive (true);
475  socket.setNonBlocking(true);
476 
477  JNoticeStream(logger) <<"New channel" << '[' << socket.getFileDescriptor() << ']';
478 
479  channelList.push_back(JSocketInputChannel_t(socket));
480  }
481  }
482 
483 
484  if (!timeslices.empty() && (timeslices.front().size() >= frames_per_slice ||
485  timeslices.size() >= maxQueueDepth ||
486  queueSize >= maxQueueSize)) {
487 
488  JDAQTimesliceL0 pending_slice = timeslices.front();
489  queueSize -= pending_slice.getSize();
490  timeslices.pop_front();
491 
492  current_slice_index = pending_slice.getFrameIndex();
493  minFrameNumber = min(minFrameNumber, pending_slice.getFrameIndex());
494  maxFrameNumber = max(maxFrameNumber, pending_slice.getFrameIndex());
495 
496  if(pending_slice.size()>frames_per_slice) {
497 
498  JErrorStream(logger) << "More frames in timeslice than expected : " << pending_slice.size();
499 
500  if(pending_slice.size()<=maximum_frames_per_slice) {
501 
502  JErrorStream(logger) << "Adjusting expected frames per timeslice to " << pending_slice.size();
503 
504  frames_per_slice = pending_slice.size();
505  }
506  }
507 
508  if (!pending_slice.empty()) {
509 
510  processTimeSlice(pending_slice);
511 
513 
514  if (pending_slice.size() < frames_per_slice) {
516  JErrorStream(logErrorIncomplete) << "Timeout -> processed incomplete timeslice "
517  << "Timeslice frameindex " << pending_slice.getFrameIndex() << ' '
518  << "Size of timeslice " << pending_slice.size() << ' '
519  << "Queue depth " << timeslices.size() << ' '
520  << "Queue size " << queueSize;
521 
522  if(!timeslices.empty()) {
523  JErrorStream(logger) << "Adjusting expected frames per timeslice from " << frames_per_slice << " to " << pending_slice.size();
524  frames_per_slice = pending_slice.size();
525  }
526  }
527  }
528  }
529  }
int current_slice_index
Definition: JDataFilter.cc:888
int maxFrameNumber
Definition: JDataFilter.cc:937
bool has(const int file_descriptor) const
Has file descriptor.
int minFrameNumber
Definition: JDataFilter.cc:936
long long int number_of_packets_received
Definition: JDataFilter.cc:941
JChannelList_t channelList
Definition: JDataFilter.cc:883
Socket class.
Definition: JSocket.hh:42
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
unsigned int maxQueueDepth
Definition: JDataFilter.cc:922
int getFrameIndex() const
Get frame index.
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:932
int getFileDescriptor() const
Get file descriptor.
JNET::JSocketInputChannel< KM3NETDAQ::JDAQAbstractPreamble > JSocketInputChannel_t
Definition: JDataFilter.cc:87
void accept(const int server)
Accept connection from a server.
Definition: JSocket.hh:372
void updateFrameQueue(const JChannelList_t::const_iterator channel)
Definition: JDataFilter.cc:540
unsigned int frames_per_slice
Definition: JDataFilter.cc:890
bool isRunning() const
Check if this client is in runnig state.
Definition: JDAQClient.hh:281
void setReceiveBufferSize(const int size)
Set receive buffer size.
Definition: JSocket.hh:250
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:909
virtual int getSize() const
Get size of object.
long long int queueSize
Definition: JDataFilter.cc:924
long long int maxQueueSize
Definition: JDataFilter.cc:923
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:933
Exception for socket.
Definition: JException.hh:414
long long int number_of_reads
Definition: JDataFilter.cc:943
void setNonBlocking(const bool on)
Set non-blocking of I/O.
Definition: JSocket.hh:109
JMessageScheduler logErrorIncomplete
Definition: JDataFilter.cc:912
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
void setKeepAlive(const bool on)
Set keep alive of socket.
Definition: JSocket.hh:151
void processTimeSlice(const JDAQTimesliceL0 &pending_slice)
Definition: JDataFilter.cc:653
long long int number_of_packets_discarded
Definition: JDataFilter.cc:942
virtual const char * what() const
Get error message.
Definition: JException.hh:65
JSinglePointer< JServerSocket > serversocket
Definition: JDataFilter.cc:882
Timeslice data structure for L0 data.
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:891
long long int number_of_bytes_received
Definition: JDataFilter.cc:944
virtual void JDataFilter::actionRunning ( )
inlinevirtual

This method is repeatedly called when this client machine is in state Running and the clock interval time is non-zero.

This implementation does nothing but may be redefined by the derived class. Care has to be taken so that the time needed to execute this method should be less than the specified clock interval time (see method setClockInterval()).

Reimplemented from KM3NETDAQ::JDAQClient.

Definition at line 532 of file JDataFilter.cc.

533  {
534  if (reporting) {
535  typeout();
536  }
537  }
void typeout()
Definition: JDataFilter.cc:831
void JDataFilter::updateFrameQueue ( const JChannelList_t::const_iterator  channel)
inline

Definition at line 540 of file JDataFilter.cc.

541  {
542  JIO::JByteArrayReader in(channel->data(), channel->size());
543 
544  JDAQPreamble preamble;
545  JDAQSuperFrameHeader header;
546 
547  in >> preamble;
548  in >> header;
549 
550  if(preamble.getLength()!=channel->size()) {
551 
552  JErrorStream(logErrorRun) << "Size of received data does not match size reported by preamble."
553  << " preamble.getLength(): " << preamble.getLength()
554  << " channel->size(): " << channel->size() ;
556 
557  return;
558  }
559 
560  if (header.getRunNumber() != getRunNumber()) {
561 
562  JErrorStream(logErrorRun) << "Run number " << header.getRunNumber() << " unequal to current run " << getRunNumber() << " -> Dropping frame.";
563 
565 
566  return;
567  }
568 
569  if (header.getFrameIndex() <= current_slice_index) {
570 
571  JErrorStream(logErrorIndex) << "FrameIndex " << header.getFrameIndex() <<" already processed, dropping frame.";
572 
574 
576 
578 
579  JErrorStream(logErrorIndex) << "Increased number of frames expected to: " << frames_per_slice;
580  }
581 
582  return;
583  }
584 
585  list<JDAQTimesliceL0>::iterator timesliceIterator;
586 
587  for (timesliceIterator = timeslices.begin();
588  timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() < header.getFrameIndex();
589  ++timesliceIterator) ;
590 
591  if (timesliceIterator != timeslices.end() && timesliceIterator->getFrameIndex() == header.getFrameIndex()) {
592 
593  } else {
594 
595  // This is the first frame of its slice; insert a new slice at the right position in the list
596 
597  timesliceIterator = timeslices.insert(timesliceIterator, JDAQTimesliceL0());
598  timesliceIterator->setDAQChronometer(header.getDAQChronometer());
599 
600  queueSize += timesliceIterator->getSize();
601  }
602 
603 
604  if (checkForInvalidPMTs) {
605 
606  JDAQSuperFrame frame(header);
607 
608  in >> static_cast<JDAQFrame&> (frame);
609 
610  if(frame.size()>maxHitsFrame) {
611  JErrorStream(logger) << "Frame size exceeds limit. Size: " << frame.size()<< " Frame discarded.";
612  return;
613  }
614  JDAQSuperFrame::const_iterator hit = frame.begin() ;
615 
616  // std::vector<unsigned int> prevHitT(30,0);
617 
618  for (; hit!=frame.end(); ++hit) {
619 
620  if(hit->getPMT()>30) //||
621  // hit->getT()<prevHitT[hit->getPMT()])
622  {
623  break;
624  }
625 
626  // prevHitT[hit->getPMT()]=hit->getT();
627  }
628 
629  if(hit!=frame.end()) {
631  JErrorStream(logErrorInvalidPMTs)<< "Frame with invalid PMT id or wrong time ordering discarded. Total discarded: "<< numberOfFramesWithInvalidPMTs;
632  }
633  else
634  {
635 
636 
637  timesliceIterator->push_back(frame);
638  queueSize += timesliceIterator->rbegin()->getSize();
639  }
640  }
641  else {
642 
643 
644  timesliceIterator->push_back(JDAQSuperFrame(header));
645 
646  in >> static_cast<JDAQFrame&>(*(timesliceIterator->rbegin()));
647  queueSize += timesliceIterator->rbegin()->getSize();
648  }
649 
650  }
int current_slice_index
Definition: JDataFilter.cc:888
long long int numberOfFramesWithInvalidPMTs
Definition: JDataFilter.cc:934
JPMT_t getPMT() const
Get PMT.
Definition: JDAQHit.hh:74
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
JMessageScheduler logErrorInvalidPMTs
Definition: JDataFilter.cc:913
Hit data structure.
Definition: JDAQHit.hh:36
Byte array binary input.
Definition: JByteArrayIO.hh:25
JMessageScheduler logErrorIndex
Definition: JDataFilter.cc:911
Data frame.
Definition: JDAQFrame.hh:70
unsigned int frames_per_slice
Definition: JDataFilter.cc:890
JMessageScheduler logErrorRun
Definition: JDataFilter.cc:909
long long int queueSize
Definition: JDataFilter.cc:924
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
bool checkForInvalidPMTs
Definition: JDataFilter.cc:946
long long int number_of_packets_discarded
Definition: JDataFilter.cc:942
Data frame of one optical module.
Timeslice data structure for L0 data.
unsigned int maximum_frames_per_slice
Definition: JDataFilter.cc:891
void JDataFilter::processTimeSlice ( const JDAQTimesliceL0 pending_slice)
inline

Definition at line 653 of file JDataFilter.cc.

654  {
655  //JDebugStream(logger) << "Processing slice: "
656  // << pending_slice.getRunNumber() << " "
657  // << pending_slice.getFrameIndex() << " "
658  // << pending_slice.size();
659 
660  try {
661 
662  if (parameters.writeSummary()) {
663 
664  try {
665 
666  JDAQSummaryslice object(pending_slice);
667 
668  datawriter->put(object);
669 
670  numberOfBytes += object.getSize();
671  }
672  catch(const JControlHostException& error) {
673  JErrorStream(logger) << error;
674  }
675  }
676 
677  timesliceRouter->configure(pending_slice);
678 
685 
686 
687  JTimeslice_t timesliceL0(pending_slice.getDAQChronometer());
688  JTimeslice_t timesliceL1(pending_slice.getDAQChronometer());
689  JTimeslice_t timesliceL2(pending_slice.getDAQChronometer());
690  JTimeslice_t timesliceSN(pending_slice.getDAQChronometer());
691 
692  for (JDAQTimesliceL0::const_iterator super_frame = pending_slice.begin(); super_frame != pending_slice.end(); ++super_frame) {
693 
694  if (moduleRouter->hasModule(super_frame->getModuleID())) {
695 
696  const JModule& module = moduleRouter->getModule(super_frame->getModuleID());
697  JSuperFrame2D_t& buffer = JSuperFrame2D_t::demultiplex(*super_frame, module);
698 
699  // Apply high-rate veto
700 
702 
703  // L0
704 
705  timesliceL0.push_back(JSuperFrame1D_t(buffer));
706 
707  // L1
708 
709 
710  timesliceL1.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
711  super_frame->getModuleIdentifier(),
712  module.getPosition()));
713 
714  (*buildL1)(*timesliceL0.rbegin(), back_inserter(*timesliceL1.rbegin()));
715 
716  // L2
717 
718  timesliceL2.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
719  super_frame->getModuleIdentifier(),
720  module.getPosition()));
721 
722  (*buildL2)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceL2.rbegin()));
723 
724  // SN
725 
726  timesliceSN.push_back(JSuperFrame1D_t(super_frame->getDAQChronometer(),
727  super_frame->getModuleIdentifier(),
728  module.getPosition()));
729 
730  (*buildSN)(buffer, *timesliceL1.rbegin(), back_inserter(*timesliceSN.rbegin()));
731 
732  } else {
733 
734  JErrorStream(logErrorDetector) << "No detector information for module " << super_frame->getModuleID();
735  }
736  }
737 
738  // Trigger
739 
740  JTriggerInput trigger_input(timesliceL2);
741  JTriggerOutput trigger_output;
742 
743  (*trigger3DMuon) (trigger_input, back_inserter(trigger_output));
744  (*trigger3DShower)(trigger_input, back_inserter(trigger_output));
745  (*triggerMXShower)(trigger_input, timesliceL0, back_inserter(trigger_output));
746 
747  trigger_output.merge(JEventOverlap(parameters.TMaxEvent_ns));
748 
749  for (JTriggerOutput::const_iterator event = trigger_output.begin(); event != trigger_output.end(); ++event) {
750 
751  try {
752 
754 
755  datawriter->put(object);
756 
757  numberOfEvents += 1;
758  numberOfBytes += object.getSize();
759  }
760  catch(const JControlHostException& error) {
761  JErrorStream(logger) << error;
762  }
763  }
764 
765  if (parameters.writeL1()) {
766 
767  try {
768 
770 
771  datawriter->put(object);
772 
773  numberOfBytes += object.getSize();
774  }
775  catch(const JControlHostException& error) {
776  JErrorStream(logger) << error;
777  }
778  }
779 
780  if (parameters.writeL2()) {
781 
782  try {
783 
785 
786  datawriter->put(object);
787 
788  numberOfBytes += object.getSize();
789  }
790  catch(const JControlHostException& error) {
791  JErrorStream(logger) << error;
792  }
793  }
794 
795  if (parameters.writeSN()) {
796 
797  try {
798 
800 
801  datawriter->put(object);
802 
803  numberOfBytes += object.getSize();
804  }
805  catch(const JControlHostException& error) {
806  JErrorStream(logger) << error;
807  }
808  }
809  }
810 
811  if (parameters.writeTimeslices() || parameters.writeL0()) { // backward compatible, remove later
812 
813  try {
814 
815  datawriter->put(pending_slice);
816 
817  numberOfBytes += pending_slice.getSize();
818  }
819  catch(const JControlHostException& error) {
820  JErrorStream(logger)<<error;
821  }
822  }
823 
824  } catch(const JTriggerException& error) {
825  JErrorStream(logger) << error.what();
826  JErrorStream(logger) << "Run: " << pending_slice.getRunNumber() << " frame index: " << pending_slice.getFrameIndex() << " not processed!";
827  }
828  }
virtual const char * what() const
Get error message.
bool enabled
enabled status of trigger
Data structure for a composite optical module.
Definition: JModule.hh:47
JPrescaler writeTimeslices
write JDAQTimeslice with L0 data (deprecated)
Set of triggered events.
long long int numberOfEvents
Definition: JDataFilter.cc:930
JL2Parameters L2
L2 processing.
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
JMessageScheduler logErrorDetector
Definition: JDataFilter.cc:910
int getRunNumber() const
Get run number.
double TMaxLocal_ns
maximal time difference between L0 hits for L1
int getFrameIndex() const
Get frame index.
JTimeRange getTimeRange(const Evt &event)
Get time range (i.e.
JTriggerMXShower_t::JParameters triggerMXShower
long long int prescale
Definition: JPrescaler.hh:86
double TMaxEvent_ns
maximal time before and after event for snapshot
JL2Parameters SN
Supernova trigger.
JSinglePointer< JControlHost_t > datawriter
Definition: JDataFilter.cc:875
JPrescaler writeSummary
write JDAQSummaryslice
double highRateVeto_Hz
high-rate veto [Hz]
JTrigger3DShower_t::JParameters trigger3DShower
JTriggerParameters parameters
Definition: JDataFilter.cc:895
const JDAQChronometer & getDAQChronometer() const
Get DAQ chronometer.
void applyHighRateVeto(const double rate_Hz)
Apply high-rate veto.
Auxiliary class to build JDAQEvent for a triggered event.
Exception for ControlHost.
Definition: JException.hh:432
const JPosition3D & getPosition() const
Get position.
Definition: JPosition3D.hh:129
JPrescaler writeL2
write JDAQTimeslice with L2 data
bool enabled
enabled status of trigger
Match of two events considering overlap in time.
JPrescaler writeL1
write JDAQTimeslice with L1 data
JPrescaler writeL0
write JDAQTimeslice with L0 data
JSinglePointer< JTimesliceRouter > timesliceRouter
Definition: JDataFilter.cc:897
Auxiliary class to build JDAQTimeslice for L1 timeslice.
Definition: JTimesliceL1.hh:36
virtual int getSize() const
Get size of object.
void merge(const JMatch_t &match)
Merge events.
bool enabled
enabled status of trigger
JSinglePointer< JModuleRouter > moduleRouter
Definition: JDataFilter.cc:900
double TMaxLocal_ns
maximal time difference [ns]
2-dimensional frame with time calibrated data from one optical module.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
Data structure for input to trigger algorithm.
JTrigger3DMuon_t::JParameters trigger3DMuon
JPrescaler writeSN
write JDAQTimeslice with SN data
Time slice with calibrated data.
Definition: JTimeslice.hh:26
JSuperFrame1D< hit_type > JSuperFrame1D_t
Definition: JDataFilter.cc:92
long long int numberOfBytes
Definition: JDataFilter.cc:931
void JDataFilter::typeout ( )
inline

Definition at line 831 of file JDataFilter.cc.

832  {
833  timer.stop();
834 
835  const double T_us = (double) timer.usec_wall;
836 
837  JNoticeStream(logger) << "Reporting statistics for this datafilter.";
838  JNoticeStream(logger) << "Elapsed real (wall) time [s] " << T_us / 1e6;
839  JNoticeStream(logger) << "Elapsed user CPU time [s] " << (double) timer.usec_ucpu/ 1e6;
840  JNoticeStream(logger) << "Elapsed system CPU time [s] " << (double) timer.usec_scpu/ 1e6;
841  JNoticeStream(logger) << "Number of packets received/discarded " << number_of_packets_received << "/" << number_of_packets_discarded;
842  JNoticeStream(logger) << "Number of events/MB sent " << numberOfEvents << "/" << numberOfBytes/1e6;
843 
844  if (number_of_packets_received > 0) { JNoticeStream(logger) << "Number of reads/packet " << (double) number_of_reads / (double) number_of_packets_received; }
845 
846  JNoticeStream(logger) << "Current queue depth " << timeslices.size();
847  JNoticeStream(logger) << "Current queue size " << queueSize;
848  JNoticeStream(logger) << "Current number of frames per slice expected: " << frames_per_slice;
849 
850  //if (T_us > 0) { JNoticeStream(logger) << "Packet rate [kHz] " << number_of_packets_received * 1000 / T_us; }
851  //if (T_us > 0) { JNoticeStream(logger) << "Data rate [MB/s] " << number_of_bytes_received / T_us; }
852  //if (T_us > 0) { JNoticeStream(logger) << "Event rate [Hz] " << numberOfEvents * 1e6 / T_us; }
853  //if (T_us > 0) { JNoticeStream(logger) << "Output data rate [MB/s] " << numberOfBytes / T_us; }
854 
855  JNoticeStream(logger) << "Number of timeslices processed total/incomplete " << numberOfTimeslicesProcessed << "/" << numberOfIncompleteTimeslicesProcessed;
856  JNoticeStream(logger) << "Number of frames with invalid PMTs dropped " << numberOfFramesWithInvalidPMTs;
857 
858  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "Real time per timeslice [ms] " << timer.usec_wall / 1000 / numberOfTimeslicesProcessed; }
859  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "User CPU time per timeslice [ms] " << timer.usec_ucpu / 1000 / numberOfTimeslicesProcessed; }
860  if (numberOfTimeslicesProcessed > 0) { JNoticeStream(logger) << "System CPU time per timeslice [ms] " << timer.usec_scpu / 1000 / numberOfTimeslicesProcessed; }
861 
862  const double processedSlicesTimeMus = numberOfTimeslicesProcessed * getFrameTime() / 1000;
863  const double processedDetectorTimeMus = (maxFrameNumber - minFrameNumber) * getFrameTime() / 1000;
864 
865  if (processedSlicesTimeMus > 0) { JNoticeStream(logger) << "Performance factor (inaccurate estimate): " << T_us / processedSlicesTimeMus; }
866  if (processedDetectorTimeMus > 0) { JNoticeStream(logger) << "Performance factor whole detector (inaccurate estimate): " << T_us / processedDetectorTimeMus; }
867 
868  timer.start();
869  }
int maxFrameNumber
Definition: JDataFilter.cc:937
long long int numberOfFramesWithInvalidPMTs
Definition: JDataFilter.cc:934
int minFrameNumber
Definition: JDataFilter.cc:936
long long int numberOfEvents
Definition: JDataFilter.cc:930
long long int number_of_packets_received
Definition: JDataFilter.cc:941
list< JDAQTimesliceL0 > timeslices
Definition: JDataFilter.cc:887
long long int numberOfTimeslicesProcessed
Definition: JDataFilter.cc:932
JTimer timer
Definition: JDataFilter.cc:885
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
unsigned int frames_per_slice
Definition: JDataFilter.cc:890
long long int queueSize
Definition: JDataFilter.cc:924
long long int numberOfIncompleteTimeslicesProcessed
Definition: JDataFilter.cc:933
long long int number_of_reads
Definition: JDataFilter.cc:943
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
long long int number_of_packets_discarded
Definition: JDataFilter.cc:942
long long int numberOfBytes
Definition: JDataFilter.cc:931
virtual bool KM3NETDAQ::JDAQClient::enter ( const JArgs args)
inlinevirtualinherited

Enter the state machine.

This overloaded method enter reproduces the constructor. All necessary input is parsed from the list of arguments. In case of an error, the state machine is not entered.

Parameters
argsarray of command line arguments

Definition at line 143 of file JDAQClient.hh.

144  {
145  using namespace std;
146 
147  string server;
148  string logger;
149  int level;
150  bool use_cout;
151 
152  try {
153 
154  parser['H'] = make_field(server) = "localhost";
155  parser['M'] = make_field(logger) = "localhost";
156  parser['d'] = make_field(level) = 0;
157  parser['c'] = make_field(use_cout);
158 
159  if (parser.read(args) != 0) {
160  return false;
161  }
162  }
163  catch(const exception &error) {
164  cerr << error.what() << endl;
165  return false;
166  }
167 
168  try {
169 
170  JLogger* out = NULL;
171 
172  if (use_cout)
173  out = new JStreamLogger(cout);
174  else
175  out = new JControlHostLogger(logger);
176 
177  this->logger = JMessageLogger(out, getName(), level);
178 
179  this->server.reset(new JControlHost(server));
180 
181  return enter();
182  }
183  catch(const JControlHostException& error) {
184  cerr << error << endl;
185  return false;
186  }
187  }
JParser parser
parser method enter()
Definition: JDAQClient.hh:834
ControlHost class.
Message logging based on std::ostream.
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
virtual bool enter()
Enter the state machine.
Definition: JDAQClient.hh:195
Exception for ControlHost.
Definition: JException.hh:432
int read(const int argc, const char *const argv[])
Parse the program&#39;s command line options.
Definition: JParser.hh:1673
const char * getName()
Get ROOT name of given data type.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
virtual bool KM3NETDAQ::JDAQClient::enter ( )
inlinevirtualinherited

Enter the state machine.

This method activates the subscription to JNET::JControlHost messages. In case of an error, the state machine is not entered.

Reimplemented in KM3NETDAQ::JDAQDriver.

Definition at line 195 of file JDAQClient.hh.

196  {
197  using namespace std;
198 
199  if (server.is_valid() && logger.is_valid()) {
200 
201  try {
202 
203  server->Subscribe(getSubscription(eventTable));
204  server->SendMeAlways();
205  server->MyId(getFullName());
206 
207  return CHSM::machine::enter();
208  }
209  catch(const JControlHostException& error) {
210  JErrorStream(logger) << error;
211  }
212 
213  } else {
214  cerr << "Message server or logger not properly initialised." << endl;
215  }
216 
217  return false;
218  }
JNET::JSubscriptionList getSubscription(const JEventTable &event_table)
Convert event table to ControlHost subscription.
Definition: JEventTable.hh:125
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
Exception for ControlHost.
Definition: JException.hh:432
bool is_valid() const
Check validity of logger object.
JEventTable eventTable
event table
Definition: JDAQClient.hh:828
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:248
virtual bool KM3NETDAQ::JDAQClient::exit ( )
inlinevirtualinherited

Exit the state machine.

This method releases the various resources.

Definition at line 225 of file JDAQClient.hh.

226  {
227  try {
228  if (server.is_valid()) server.reset(NULL);
229  }
230  catch(const JControlHostException& error) {
231  }
232 
233  try {
234  if (logger.is_valid()) logger.reset(NULL);
235  }
236  catch(const JControlHostException& error) {
237  }
238 
239  return CHSM::machine::exit();
240  }
void reset(JLogger *__logger=NULL)
Reset logger.
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
Exception for ControlHost.
Definition: JException.hh:432
bool is_valid() const
Check validity of logger object.
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
const std::string& KM3NETDAQ::JDAQClient::getFullName ( ) const
inlineinherited

Get full name of this run control client.

Returns
full name

Definition at line 248 of file JDAQClient.hh.

249  {
250  return full_name;
251  }
std::string full_name
Definition: JDAQClient.hh:830
const JNET::JTag& KM3NETDAQ::JDAQClient::getUniqueTag ( ) const
inlineinherited

Get unique tag of this run control client.

Returns
unique tag

Definition at line 259 of file JDAQClient.hh.

260  {
261  return unique_tag;
262  }
int KM3NETDAQ::JDAQClient::getEventNumber ( ) const
inlineinherited

Get last event number.

Returns
event number

Definition at line 270 of file JDAQClient.hh.

271  {
272  return event_number;
273  }
int event_number
number of last event
Definition: JDAQClient.hh:835
bool KM3NETDAQ::JDAQClient::isRunning ( ) const
inlineinherited

Check if this client is in runnig state.

Returns
true if running; else false

Definition at line 281 of file JDAQClient.hh.

282  {
283  return Main.RunControl.Operational.Running.active();
284  }
void KM3NETDAQ::JDAQClient::replaceEvent ( const JNET::JTag oldTag,
const JNET::JTag newTag,
JDAQEvent_t event 
)
inlineinherited

Replace tag of given event in event table.

Parameters
oldTagold tag
newTagnew tag
eventevent

Definition at line 294 of file JDAQClient.hh.

297  {
298  eventTable.replace(oldTag, newTag, event);
299  }
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
JEventTable eventTable
event table
Definition: JDAQClient.hh:828
void replace(const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
Replace entry in table.
Definition: JEventTable.hh:93
JDAQEvent_t* KM3NETDAQ::JDAQClient::findEvent ( const JNET::JTag tag,
const std::string &  event_name 
)
inlineinherited

Find event in event table.

Parameters
tagtag
event_nameevent name
Returns
pointer to event or NULL

Definition at line 309 of file JDAQClient.hh.

310  {
311  JEventTable::const_iterator i = eventTable.find(tag, event_name);
312 
313  if (i != eventTable.end())
314  return i->second;
315  else
316  return NULL;
317  }
const_iterator find(const JNET::JTag &tag, const std::string &event_name) const
Find entry.
Definition: JEventTable.hh:112
JEventTable eventTable
event table
Definition: JDAQClient.hh:828
template<class T >
void KM3NETDAQ::JDAQClient::addParameter ( const char  option,
T &  parameter 
)
inlineinherited

Add parameter to parser used in method enter().

Parameters
optionoption
parameterparameter

Definition at line 327 of file JDAQClient.hh.

328  {
329  parser[option] = make_field(parameter);
330  }
JParser parser
parser method enter()
Definition: JDAQClient.hh:834
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
template<class T >
void KM3NETDAQ::JDAQClient::addParameter ( const char  option,
T &  parameter,
const T &  value 
)
inlineinherited

Add parameter to parser used in method enter().

Parameters
optionoption
parameterparameter
valuedefault value

Definition at line 341 of file JDAQClient.hh.

342  {
343  parser[option] = make_field(parameter) = value;
344  }
JParser parser
parser method enter()
Definition: JDAQClient.hh:834
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
long long int KM3NETDAQ::JDAQClient::getClockDelay ( ) const
inlineinherited

Get total delay time.

Returns
delay time [us]

Definition at line 352 of file JDAQClient.hh.

353  {
354  return clock.getDelay();
355  }
long long int getDelay() const
Get total delay time.
Definition: JTimekeeper.hh:78
JTimekeeper clock
central clock
Definition: JDAQClient.hh:832
const std::string& KM3NETDAQ::JDAQClient::getHostname ( ) const
inlineinherited

Get hostname.

Returns
host name

Definition at line 363 of file JDAQClient.hh.

364  {
365  return hostname;
366  }
std::string hostname
Definition: JDAQClient.hh:829
long long int KM3NETDAQ::JDAQClient::getClockInterval ( ) const
inlineinherited

Get interval time.

Returns
interval time [us]

Definition at line 374 of file JDAQClient.hh.

375  {
376  return clock.getInterval();
377  }
JTimekeeper clock
central clock
Definition: JDAQClient.hh:832
long long int getInterval() const
Get interval time.
Definition: JTimekeeper.hh:67
void KM3NETDAQ::JDAQClient::setClockInterval ( const long long int  interval_us)
inlineinherited

Set interval time.

Parameters
interval_usinterval time [us]

Definition at line 385 of file JDAQClient.hh.

386  {
387  clock.setInterval(interval_us);
388  }
JTimekeeper clock
central clock
Definition: JDAQClient.hh:832
void setInterval(const long long int interval_us)
Set interval time.
Definition: JTimekeeper.hh:89
void KM3NETDAQ::JDAQClient::resetClock ( )
inlineinherited

Reset clock.

Definition at line 394 of file JDAQClient.hh.

395  {
396  clock.reset();
397  }
void reset(const long long int t0)
Reset time.
Definition: JTimekeeper.hh:100
JTimekeeper clock
central clock
Definition: JDAQClient.hh:832
void KM3NETDAQ::JDAQClient::setSelect ( )
inlineinherited

Set the file descriptor mask for the select call.

Definition at line 403 of file JDAQClient.hh.

404  {
405  select.reset();
406 
408 
410  }
JSelectReader select
select call
Definition: JDAQClient.hh:833
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
JFileDescriptorMask & getReaderMask()
Get reader mask.
void setReaderMask(const JAbstractFile &file)
Set reader mask.
void reset()
Reset.
void setSelect()
Set the file descriptor mask for the select call.
Definition: JDAQClient.hh:403
virtual void KM3NETDAQ::JDAQClient::setSelect ( JFileDescriptorMask mask) const
inlinevirtualinherited

Set the file descriptor mask for the select call.

This implementation does nothing but may be redefined by the derived class.

Parameters
maskfile descriptor mask

Reimplemented in KM3NETDAQ::JDataWriter, and KM3NETDAQ::MonitorRouter.

Definition at line 419 of file JDAQClient.hh.

420  {}
virtual void KM3NETDAQ::JDAQClient::actionSelect ( const JFileDescriptorMask mask)
inlinevirtualinherited

Action method following last select call.

This implementation does nothing but may be redefined by the derived class.

Parameters
maskfile descriptor mask

Reimplemented in KM3NETDAQ::DataQueue, KM3NETDAQ::JDataWriter, and KM3NETDAQ::MonitorRouter.

Definition at line 429 of file JDAQClient.hh.

430  {}
virtual void KM3NETDAQ::JDAQClient::actionInput ( int  length,
const char *  buffer 
)
inlinevirtualinherited

This method is called at ev_input.

Parameters
lengthlength of data
bufferpointer to data

Reimplemented in AcousticDataFilter.

Definition at line 450 of file JDAQClient.hh.

451  {
452  using namespace std;
453 
454  JProperties properties(JEquationParameters("=", ";", "", ""), 1);
455 
456  int level = this->logger.getLevel();
457 
458  properties["debug"] = level;
459 
460  properties.read(string(buffer, length));
461 
462  this->logger.setLevel(level);
463  }
Utility class to parse parameter values.
Definition: JProperties.hh:484
Simple data structure to support I/O of equations (see class JLANG::JEquation).
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
void setLevel(const int __level)
Set debug level.
JLevel_t getLevel()
Get debug level.
virtual bool KM3NETDAQ::JDAQClient::filter ( const std::string &  tag,
int  length,
const char *  buffer 
)
inlinevirtualinherited

Filter message.

The filter method can be overloaded so that a specific action is made before the corresponding message is processed by the state machine. The message is ignored if true is returned, else it is normally processed.

Parameters
tagtag
lengthnumber of characters
buffermessage
Returns
skip message or not

Definition at line 477 of file JDAQClient.hh.

478  {
479  return false;
480  }
void KM3NETDAQ::JDAQClient::run ( )
inlineinherited

Run as run control client following command messages via JNET::JControlHost.

This method can be called once the state machine is entered. It returns when the state machine is exited. If the clock interval is non-zero, the method actionRunning() is repeatedly called when this client machine is in state Running. The file descriptor mask can be set to interrupt the timeout of the select call and clock method wait() in this calling sequence (see methods setSelect() and actionSelect()).

Definition at line 493 of file JDAQClient.hh.

494  {
495  using namespace std;
496 
497  while (active()) {
498 
499  try {
500 
501  setSelect();
502 
503  if (select(JTimeval(TIMEOUT_S,0)) > 0) {
504 
505  if (select.hasReaderMask(*server)) {
506  update();
507  }
508 
510 
511  } else {
512 
513  continue;
514  }
515 
516 
517  if (isRunning() && clock.getInterval() != 0LL) {
518 
519  long long int numberOfCalls = 0;
520 
521  clock.reset();
522 
523  do {
524 
525  ++numberOfCalls;
526 
527  setSelect();
528 
529  if (clock.wait(select.getReaderMask())) {
530 
531  if (select.hasReaderMask(*server)) {
532  update();
533  }
534 
536 
537  } else {
538 
539  try {
540  actionRunning();
541  }
542  catch(const exception& error) {
543  logger.error(error.what());
544  }
545  }
546 
547  } while (isRunning());
548 
549  if (numberOfCalls != 0) {
550  JNoticeStream(logger) << "Delay per call " << clock.getDelay() / numberOfCalls / 1000 << " ms";
551  }
552  }
553  }
554  catch(const exception& error) {
555  JErrorStream(logger) << "method run(): " << error.what();
556  }
557  }
558  }
long long int getDelay() const
Get total delay time.
Definition: JTimekeeper.hh:78
JSelectReader select
select call
Definition: JDAQClient.hh:833
void reset(const long long int t0)
Reset time.
Definition: JTimekeeper.hh:100
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
JFileDescriptorMask & getReaderMask()
Get reader mask.
JTimekeeper clock
central clock
Definition: JDAQClient.hh:832
void wait() const
Wait until the number of time intervals has elapsed since the last call to the reset method...
Definition: JTimekeeper.hh:145
Auxiliary class for time values.
Definition: JTimeval.hh:26
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDAQClient.hh:440
bool isRunning() const
Check if this client is in runnig state.
Definition: JDAQClient.hh:281
void update()
Update state machine.
Definition: JDAQClient.hh:643
long long int getInterval() const
Get interval time.
Definition: JTimekeeper.hh:67
static const int TIMEOUT_S
time out of update [s]
Definition: JDAQClient.hh:631
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
void error(const JMessage_t &message)
void setSelect()
Set the file descriptor mask for the select call.
Definition: JDAQClient.hh:403
virtual void actionSelect(const JFileDescriptorMask &mask)
Action method following last select call.
Definition: JDAQClient.hh:429
bool hasReaderMask(const JAbstractFile &file) const
Has reader file.
void KM3NETDAQ::JDAQClient::run ( const int  port)
inlineinherited

Run for ever.

This method can be used when the run control client is started before the run control (e.g. at boot time of the host processor). This method should be called before the state machine is entered. It launches a server which accepts a JNET::JControlHost connection from a designated application e.g. the JDAQClientStarter.cc program. The state machine is entered using the available data in the JNET::JControlHost message. After the state machine is exited, it accepts a new a JNET::JControlHost connection.

Parameters
portport number

Definition at line 573 of file JDAQClient.hh.

574  {
576 
577  for ( ; ; ) {
578 
579  JControlHost* ps = server.AcceptClient();
580 
581  ps->Connected();
582 
583  JNET::JPrefix prefix;
584 
585  ps->WaitHead(prefix);
586 
587  const int length = prefix.getSize();
588 
589  char* buffer = new char[length];
590 
591  ps->GetFullData(buffer, length);
592  ps->PutFullData(prefix.toString(), buffer, length);
593 
594  delete ps;
595 
596  enter(JArgs(std::string(buffer, length)));
597 
598  delete [] buffer;
599 
600  run();
601 
602  exit();
603  }
604  }
ControlHost prefix.
Definition: JPrefix.hh:31
ControlHost class.
int Connected()
Send version.
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:493
int getSize() const
Get size.
Definition: JPrefix.hh:63
virtual bool enter()
Enter the state machine.
Definition: JDAQClient.hh:195
virtual bool exit()
Exit the state machine.
Definition: JDAQClient.hh:225
int WaitHead(JPrefix &prefix)
Wait for header.
std::string toString() const
Convert tag to string.
Definition: JTag.hh:167
int GetFullData(void *buffer, int length)
Receive data.
Light-weight wrapper class around server socket.
int PutFullData(const JTag &tag, const void *buffer, const int length)
Send data.
void KM3NETDAQ::JDAQClient::run ( std::istream &  in)
inlineinherited

Run client with commands from input stream (e.g for debugging).

Example input format:

<tag> <event name>[#data];
<tag> <event name>[#data];
Parameters
ininput stream

Definition at line 618 of file JDAQClient.hh.

619  {
620  using std::string;
621 
622  string tag;
623  string buffer;
624 
625  while (in >> tag && getline(in, buffer, ';')) {
626  update(tag, buffer.length(), buffer.data());
627  }
628  }
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:468
void update()
Update state machine.
Definition: JDAQClient.hh:643

Member Data Documentation

JSinglePointer<JControlHost_t> JDataFilter::datawriter
private

Definition at line 875 of file JDataFilter.cc.

std::string JDataFilter::hostname
private

controlhost to send data to the datawriter

host name of data server

Definition at line 877 of file JDataFilter.cc.

int JDataFilter::port
private

Definition at line 878 of file JDataFilter.cc.

int JDataFilter::backlog
private

serversocket port

Definition at line 879 of file JDataFilter.cc.

int JDataFilter::buffer_size
private

Definition at line 880 of file JDataFilter.cc.

JSinglePointer<JServerSocket> JDataFilter::serversocket
private

Definition at line 882 of file JDataFilter.cc.

JChannelList_t JDataFilter::channelList
private

Definition at line 883 of file JDataFilter.cc.

JTimer JDataFilter::timer
private

Definition at line 885 of file JDataFilter.cc.

list<JDAQTimesliceL0> JDataFilter::timeslices
private

Definition at line 887 of file JDataFilter.cc.

int JDataFilter::current_slice_index
private

Definition at line 888 of file JDataFilter.cc.

unsigned int JDataFilter::frames_per_slice
private

Definition at line 890 of file JDataFilter.cc.

unsigned int JDataFilter::maximum_frames_per_slice
private

Definition at line 891 of file JDataFilter.cc.

JTriggerParameters JDataFilter::parameters
private

Definition at line 895 of file JDataFilter.cc.

string JDataFilter::interfaceInput
private

Definition at line 896 of file JDataFilter.cc.

JSinglePointer<JTimesliceRouter> JDataFilter::timesliceRouter
private

Definition at line 897 of file JDataFilter.cc.

JDetector JDataFilter::detector
private

Definition at line 899 of file JDataFilter.cc.

JSinglePointer<JModuleRouter> JDataFilter::moduleRouter
private

Definition at line 900 of file JDataFilter.cc.

JSinglePointer<JBuildL1_t> JDataFilter::buildL1
private

Definition at line 901 of file JDataFilter.cc.

JSinglePointer<JBuildL2_t> JDataFilter::buildL2
private

Definition at line 902 of file JDataFilter.cc.

JSinglePointer<JBuildL2_t> JDataFilter::buildSN
private

Definition at line 903 of file JDataFilter.cc.

JSinglePointer<JTrigger3DMuon> JDataFilter::trigger3DMuon
private

Definition at line 905 of file JDataFilter.cc.

JSinglePointer<JTrigger3DShower> JDataFilter::trigger3DShower
private

Definition at line 906 of file JDataFilter.cc.

JSinglePointer<JTriggerMXShower> JDataFilter::triggerMXShower
private

Definition at line 907 of file JDataFilter.cc.

JMessageScheduler JDataFilter::logErrorRun
private

Definition at line 909 of file JDataFilter.cc.

JMessageScheduler JDataFilter::logErrorDetector
private

Definition at line 910 of file JDataFilter.cc.

JMessageScheduler JDataFilter::logErrorIndex
private

Definition at line 911 of file JDataFilter.cc.

JMessageScheduler JDataFilter::logErrorIncomplete
private

Definition at line 912 of file JDataFilter.cc.

JMessageScheduler JDataFilter::logErrorInvalidPMTs
private

Definition at line 913 of file JDataFilter.cc.

vector<JDAQProcess> JDataFilter::dataFilters
private

Definition at line 916 of file JDataFilter.cc.

vector<JDAQProcess> JDataFilter::dataQueues
private

Definition at line 917 of file JDataFilter.cc.

long long int JDataFilter::totalCPURAM
private

Definition at line 921 of file JDataFilter.cc.

unsigned int JDataFilter::maxQueueDepth
private

Definition at line 922 of file JDataFilter.cc.

long long int JDataFilter::maxQueueSize
private

Definition at line 923 of file JDataFilter.cc.

long long int JDataFilter::queueSize
private

Definition at line 924 of file JDataFilter.cc.

bool JDataFilter::reporting
private

Definition at line 928 of file JDataFilter.cc.

long long int JDataFilter::numberOfEvents
private

Definition at line 930 of file JDataFilter.cc.

long long int JDataFilter::numberOfBytes
private

Definition at line 931 of file JDataFilter.cc.

long long int JDataFilter::numberOfTimeslicesProcessed
private

Definition at line 932 of file JDataFilter.cc.

long long int JDataFilter::numberOfIncompleteTimeslicesProcessed
private

Definition at line 933 of file JDataFilter.cc.

long long int JDataFilter::numberOfFramesWithInvalidPMTs
private

Definition at line 934 of file JDataFilter.cc.

int JDataFilter::minFrameNumber
private

Definition at line 936 of file JDataFilter.cc.

int JDataFilter::maxFrameNumber
private

Definition at line 937 of file JDataFilter.cc.

long long int JDataFilter::number_of_packets_received
private

Definition at line 941 of file JDataFilter.cc.

long long int JDataFilter::number_of_packets_discarded
private

Definition at line 942 of file JDataFilter.cc.

long long int JDataFilter::number_of_reads
private

Definition at line 943 of file JDataFilter.cc.

long long int JDataFilter::number_of_bytes_received
private

Definition at line 944 of file JDataFilter.cc.

bool JDataFilter::checkForInvalidPMTs
private

Definition at line 946 of file JDataFilter.cc.

int JDataFilter::maxHitsFrame
private

Definition at line 947 of file JDataFilter.cc.

const int KM3NETDAQ::JDAQClient::TIMEOUT_S = 1
staticinherited

time out of update [s]

Definition at line 631 of file JDAQClient.hh.

JSharedPointer<JControlHost> KM3NETDAQ::JDAQClient::server
protectedinherited

message server

Definition at line 635 of file JDAQClient.hh.

JMessageLogger KM3NETDAQ::JDAQClient::logger
protectedinherited

message logger

Definition at line 636 of file JDAQClient.hh.


The documentation for this class was generated from the following file: