Jpp  master_rocky-43-ge265d140c
the software that should make you happy
JDataWriter.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <sstream>
4 #include <iomanip>
5 #include <map>
6 
7 #include "Jeep/JParser.hh"
8 #include "Jeep/JProperties.hh"
9 #include "Jeep/JTimer.hh"
10 #include "Jeep/JTimekeeper.hh"
11 #include "Jeep/JPrint.hh"
12 #include "Jeep/JeepToolkit.hh"
13 #include "JNet/JControlHost.hh"
14 #include "JLang/JException.hh"
15 #include "JLang/JSharedPointer.hh"
18 #include "JDAQ/JDAQTags.hh"
19 #include "JDAQ/JDAQEventIO.hh"
20 #include "JDAQ/JDAQTimesliceIO.hh"
25 #include "JIO/JByteArrayIO.hh"
26 #include "JTools/JAutoMap.hh"
27 #include "JSupport/JMeta.hh"
28 #include "JSupport/JSupport.hh"
30 
31 
32 /**
33  * Type definition of auto map.
34  */
36 
37 
38 namespace JSUPPORT {
39 
40  /**
41  * Get key for given DAQ data type.
42  *
43  * \param type data type
44  * \return map element
45  */
46  template<>
47  template<class T>
49  {
50  return getTag<T>();
51  }
52 }
53 
54 
55 namespace KM3NETDAQ {
56 
57  /**
58  * Runcontrol client to write data to disk.
59  * In state running, this application will write ROOT formatted data from the data filters to disk.
60  */
61  class JDataWriter :
62  public JDAQClient
63  {
64  public:
65  /**
66  * Constructor.
67  *
68  * \param name name of client
69  * \param server name of command message server
70  * \param hostname name of data server
71  * \param logger pointer to logger
72  * \param level debug level
73  * \param path default path
74  */
75  JDataWriter(const std::string& name,
76  const std::string& server,
77  const std::string& hostname,
78  JLogger* logger,
79  const int level,
80  const std::string& path) :
81  JDAQClient(name, server, logger, level),
82  datawriter(),
83  path (path),
85  {
87 
88  JControlHost::Throw(true);
89 
90  // map ControlHost tag to TTree writer.
91 
93  }
94 
95 
96  virtual void actionInit(int length, const char* buffer) override
97  {
98  using namespace std;
99  using namespace JPP;
100 
101  // start server
102 
103  try {
104 
105  datawriter.reset(new JControlHost(hostname));
106 
107  datawriter->setReceiveBufferSize(DWRITER_RECEIVE_BUFFER_SIZE);
108 
109  datawriter->MyId(getFullName());
110 
112 
113  for (JTreeWriter_t::iterator i = writer.begin(); i != writer.end(); ++i) {
114  buffer.add(JSubscriptionAll(i->first));
115  }
116 
118 
119  datawriter->Subscribe(buffer);
120  datawriter->SendMeAlways();
121 
122  JNoticeStream(logger) << "Established connection to " << hostname;
123  }
124  catch(const std::exception& exception) {
125  JErrorStream(logger) << exception.what();
126  }
127  }
128 
129 
130  virtual void actionConfigure(int length, const char* buffer) override
131  {
132  using namespace std;
133 
134  long long int update_s = 10;
135  long long int logger_s = 5;
136 
137  JProperties properties(JEquationParameters("=", ";", "", ""));
138 
139  properties["path"] = path;
140  properties["update_s"] = update_s;
141  properties["logger_s"] = logger_s;
142 
143  properties.read(string(buffer, length));
144 
145  if (update_s <= 0) { update_s = 1; }
146  if (logger_s <= 0) { logger_s = 1; }
147 
148  setClockInterval(update_s * 1000000LL);
149 
150  JDebugStream(logger) << "Path <" << path << ">";
151  JDebugStream(logger) << "Update period [s] " << update_s;
152 
153  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
154  logErrorFile = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
155  logErrorTag = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
156  logErrorState = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
157 
158  numberOfEvents = 0;
159  numberOfBytes = 0;
160  }
161 
162 
163  virtual void actionReset(int length, const char* buffer) override
164  {
165  datawriter.reset();
166  }
167 
168 
169  virtual void actionQuit(int length, const char* buffer) override
170  {}
171 
172 
173  virtual void actionStart(int length, const char* buffer) override
174  {
175  using namespace std;
176  using namespace JPP;
177 
178  JStatusStream(logger) << "Start run " << getDetectorID() << ' ' << getRunNumber();
179 
180  if (writer.is_open()) {
181 
182  JErrorStream (logger) << "Previous file still open -> close";
183 
184  writer.close();
185  }
186 
187  ostringstream os;
188 
189  for (int i = 0; !writer.is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
190 
191  os.str("");
192 
193  os << getFullPath(path)
194  << "KM3NeT"
195  << "_" << FILL(8,'0') << getDetectorID()
196  << "_" << FILL(8,'0') << getRunNumber();
197 
198  if (i != 0) {
199  os << "_" << i;
200  }
201 
202  os << ".root";
203 
204  try {
205  writer.open(os.str().c_str());
206  }
207  catch(const std::exception& exception) {
208  JErrorStream(logger) << exception.what();
209  }
210  }
211 
212  if (writer.is_open())
213  JNoticeStream(logger) << "Output file " << os.str();
214  else
215  JErrorStream (logger) << "File not opened " << os.str();
216 
217  numberOfEvents = 0;
218  numberOfBytes = 0;
219 
221 
222  timer.reset();
223 
224  logErrorRun .reset();
225  logErrorFile .reset();
226  logErrorTag .reset();
228 
230  }
231 
232 
233  virtual void actionStop(int length, const char* buffer) override
234  {
235  typeout();
236 
237  if (timer.usec_wall > 0) {
238  JStatusStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s";
239  }
240 
241  if (!run_db.is_written(getRunNumber())) {
242  JErrorStream(logger) << "No trigger parameters written for run " << getRunNumber();
243  }
244 
245  writer.close();
246 
247  // Release resources.
248 
249  std::vector<char> null;
250 
251  this->buffer.swap(null);
252  }
253 
254 
255  virtual void setSelect(JFileDescriptorMask& mask) const override
256  {
257  if (datawriter.is_valid()) {
258  mask.set(*datawriter);
259  }
260  }
261 
262 
263  virtual void actionSelect(const JFileDescriptorMask& mask) override
264  {
265  using namespace std;
266  using namespace JPP;
267 
268  if (datawriter.is_valid() && mask.has(*datawriter)) {
269 
270  try {
271 
272  JPrefix prefix;
273 
274  try {
275 
276  datawriter->WaitHead(prefix);
277 
278  timer.start();
279 
280  buffer.resize(prefix.getSize());
281 
282  datawriter->GetFullData(buffer.data(), buffer.size());
283  }
284  catch(const JSocketException& error) {
285 
286  JErrorStream(logger) << "Fatal error receiving data: \"" << error.what() << "\" -> disconnect and trigger ev_error.";
287 
288  datawriter.reset();
289 
290  ev_error();
291  }
292 
293  if (prefix.getTag() == IO_TRIGGER_PARAMETERS) {
294 
295  try {
296  run_db.read(buffer.data(), buffer.size());
297  }
298  catch(const std::exception& error) {
299 
300  JErrorStream(logger) << "Fatal error reading trigger parameters \"" << error.what() << "\"; trigger ev_error.";
301 
302  ev_error();
303  }
304  }
305 
306 
307  if (isRunning()) {
308 
309  // Write trigger parameters for current run if not yet done
310 
312 
313  JTreeWriter_t::iterator i = writer.find(prefix.toString());
314 
315  if (i != writer.end()) {
316 
317  TFile* out = i->second->GetCurrentFile();
318 
319  if (out != NULL && out->IsOpen()) {
320 
321  JDAQPreamble preamble;
322  Version_t version;
323  JDAQHeader header;
324 
325  JByteArrayReader in(buffer.data(), buffer.size());
326 
327  in >> preamble >> version >> header;
328 
329  in.seekg(0); // rewind
330 
331  if (header.getRunNumber() == getRunNumber()) {
332 
333  const Int_t nb = i->second->copy(in);
334 
335  if (nb < (int) buffer.size() || in.tellg() != (int) buffer.size()) {
336  JWarningStream(logger) << "Inconsistency at copy of "
337  << prefix.toString() << ' '
338  << buffer.size() << ' '
339  << in.tellg() << ' '
340  << nb;
341  }
342 
343  if (prefix.getTag() == IO_EVENT)
344  numberOfEvents += 1;
345  numberOfBytes += buffer.size();
346 
347  if (prefix.getTag() == IO_EVENT && numberOfEvents == 1) {
348  typeout();
349  }
350 
351  } else {
352  JErrorStream(logErrorRun) << "Inconsistent run number "
353  << header.getRunNumber()
354  << " != "
355  << getRunNumber();
356  }
357  } else {
358  JErrorStream(logErrorFile) << "Output file not open";
359  }
360  } else {
361  if (prefix.getTag() != IO_TRIGGER_PARAMETERS) {
362  JErrorStream(logErrorTag) << "Unknown tag <" << prefix.toString() << ">, no data written";
363  }
364  }
365  } else {
366  JWarningStream(logErrorState) << "Not in running state <" << prefix.toString() << ">, no data written";
367  }
368 
369  timer.stop();
370  }
371  catch(const std::exception& error) {
372 
373  JErrorStream(logger) << "Fatal error \"" << error.what() << "\"; trigger ev_error.";
374 
375  ev_error();
376  }
377  }
378  }
379 
380 
381  virtual void actionRunning() override
382  {
383  typeout();
384  }
385 
386 
387  /**
388  * Report status of data writing.
389  */
390  void typeout()
391  {
392  std::ostringstream message;
393 
395 
396  logger.typeout(RC_LOG, message.str());
397  logger.status(message.str());
398  }
399 
400  JMeta meta; //!< meta data
401 
402  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
403 
404  private:
405 
407  std::string path; // directory for output file
408 
409  JEEP::JTimer timer; // timer for I/O measurement
410  Long64_t numberOfEvents; // total number of events
411  long long int numberOfBytes; // total number of bytes
412 
417 
418  std::string hostname; //!< host name of data server
419  JTreeWriter_t writer; //!< TTree writer
420  std::vector<char> buffer; //!< internal buffer for incoming data
421 
422 
423  /**
424  * Auxiliary data structure for I/O of trigger parameters.
425  */
426  struct JValue_t {
427  /**
428  * Default constructor.
429  */
431  count(0),
432  is_written(false)
433  {}
434 
435  JTriggerParameters parameters; //!< trigger parameters
436  int count; //!< reader count
437  bool is_written; //!< writer status
438  };
439 
440 
441  /**
442  * Map run number to trigger parameters.
443  */
444  struct JRunDB :
445  public std::map<int, JValue_t>
446  {
447  /**
448  * Remove all entries before given run.
449  *
450  * \param run run number
451  */
452  inline void reset(const int run)
453  {
454  while (!this->empty() && this->begin()->first < run) {
455  this->erase(this->begin());
456  }
457  }
458 
459  /**
460  * Check if trigger parameters have been written for given run.
461  *
462  * \param run run number
463  * \return true if written; else false.
464  */
465  inline bool is_written(const int run) const
466  {
467  const_iterator p = this->find(run);
468 
469  return p != this->end() && p->second.is_written;
470  }
471 
472  /**
473  * Read trigger parameters.
474  *
475  * \param data data
476  * \param size size
477  */
478  void read(const char* const data, const size_t size)
479  {
480  using namespace std;
481  using namespace JPP;
482 
483 
484  const string buffer(data, size);
485 
486  istringstream in(buffer);
487 
488  int run = -1;
489  JTriggerParameters parameters;
490 
491  in >> run;
492 
493  if (!in) {
494  THROW(JIOException, "Error reading run number for trigger parameters " << run << endl << in.rdbuf());
495  }
496 
497  in >> parameters;
498 
499  in.clear(std::ios::eofbit);
500 
501  if (!in) {
502  THROW(JIOException, "Error reading trigger parameters " << in.rdbuf());
503  }
504 
505  JValue_t& value = (*this)[run];
506 
507  if (value.count == 0) {
508  value.parameters = parameters;
509  }
510 
511  value.count += 1;
512 
513  if (!parameters.equals(value.parameters)) {
514  THROW(JException, "Inconsistent trigger parameters " << endl << value.parameters << " != " << endl << parameters);
515  }
516  }
517 
518  /**
519  * Write trigger parameters for given run if not yet done.
520  *
521  * \param run run number
522  * \param file pointer to ROOT file
523  */
524  inline void write(const int run, TFile* file)
525  {
526  if (file != NULL) {
527 
528  iterator p = this->find(run);
529 
530  if (p != this->end() && p->second.count != 0 && !p->second.is_written) {
531 
532  file->WriteTObject(&p->second.parameters);
533 
534  p->second.is_written = true;
535  }
536  }
537  }
538  };
539 
541  };
542 }
543 
544 
545 /**
546  * \file
547  *
548  * Application for writing real-time data to disk.
549  * \author mdejong
550  */
551 int main(int argc, char* argv[])
552 {
553  using namespace std;
554  using namespace JPP;
555  using namespace KM3NETDAQ;
556 
557  string server;
558  string logger;
559  string hostname;
560  string client_name;
561  bool use_cout;
562  string path;
563  int debug;
564 
565  try {
566 
567  JParser<> zap("Application for writing real-time data to disk.");
568 
569  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
570  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
571  zap['D'] = make_field(hostname, "host name of server for incoming data from data filter") = "localhost";
572  zap['u'] = make_field(client_name, "client name") = "%";
573  zap['c'] = make_field(use_cout, "print to terminal");
574  zap['p'] = make_field(path, "directory for permanent archival of data") = "";
575  zap['d'] = make_field(debug, "debug level") = 0;
576 
577 
578  zap(argc, argv);
579  }
580  catch(const exception &error) {
581  FATAL(error.what() << endl);
582  }
583 
584 
585  JLogger* out = NULL;
586 
587  if (use_cout)
588  out = new JStreamLogger(cout);
589  else
590  out = new JControlHostLogger(logger);
591 
592  JDataWriter dwriter(getProcessName(client_name, argv[0]), server, hostname, out, debug, path);
593 
594  dwriter.meta = JMeta(argc, argv);
595 
596  dwriter.enter();
597  dwriter.run();
598 }
Fixed parameters and ControlHost tags for KM3NeT DAQ.
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Definition: JDataWriter.cc:551
JSUPPORT::JAutoTreeWriter< JNET::JTag > JTreeWriter_t
Type definition of auto map.
Definition: JDataWriter.cc:35
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
#define FATAL(A)
Definition: JMessage.hh:67
int debug
debug level
Definition: JSirene.cc:69
ROOT I/O of application specific meta data.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2142
I/O formatting auxiliaries.
Utility class to parse parameter values.
ROOT TTree parameter settings of various packages.
Scheduling of actions via fixed latency intervals.
Auxiliary methods for handling file names, type names and environment.
int getDetectorID() const
Get detector identifier.
Definition: JDAQCHSM.hh:100
int getRunNumber() const
Get run number.
Definition: JDAQCHSM.hh:111
std::string name
Definition: JDAQCHSM.hh:165
JDAQStateMachine::ev_error_event ev_error
JDAQStateMachine::ev_configure_event ev_configure
Utility class to parse parameter values.
Definition: JProperties.hh:501
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Time keeper.
Definition: JTimekeeper.hh:34
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:33
unsigned long long usec_wall
Definition: JTimer.hh:238
void stop()
Stop timer.
Definition: JTimer.hh:127
void reset()
Reset timer.
Definition: JTimer.hh:93
void start()
Start timer.
Definition: JTimer.hh:106
Byte array binary input.
Definition: JByteArrayIO.hh:27
int tellg() const
Get read position.
void seekg(const int pos)
Set read position.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition: JException.hh:24
virtual const char * what() const override
Get error message.
Definition: JException.hh:64
Auxiliary class for method select.
void set(const int file_descriptor)
Set file descriptor.
bool has(const int file_descriptor) const
Has file descriptor.
Exception for I/O.
Definition: JException.hh:342
The template JSharedPointer class can be used to share a pointer to an object.
Exception for socket.
Definition: JException.hh:468
static void Throw(const bool option)
Enable/disable throw option.
Definition: JThrow.hh:37
Message logging based on ControlHost.
Interface for logging messages.
Definition: JLogger.hh:22
virtual void typeout(const std::string &tag, const std::string &message) override
Report message.
void status(const JMessage_t &message)
Message logger with time scheduler.
Message logging based on std::ostream.
ControlHost class.
ControlHost prefix.
Definition: JPrefix.hh:33
int getSize() const
Get size.
Definition: JPrefix.hh:62
Subscription list.
ControlHost tag.
Definition: JTag.hh:38
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
const JTag & getTag() const
Get tag.
Definition: JTag.hh:86
Utility class to parse command line options.
Definition: JParser.hh:1698
TFile * getFile() const
Get file.
Definition: JRootFile.hh:66
virtual bool is_open() const override
Check is file is open.
Definition: JRootFile.hh:77
virtual void open(const char *file_name) override
Open file.
void insert()
Insert (list of) data type(s).
virtual void close() override
Close file.
static JKey_t getKey(JType< T > type)
Get key.
int getRunNumber() const
Get run number.
Control unit client base class.
Definition: JDAQClient.hh:301
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:834
bool isRunning() const
Check if this client is in runnig state.
Definition: JDAQClient.hh:508
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:521
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
JMessageLogger logger
message logger
Definition: JDAQClient.hh:835
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
Runcontrol client to write data to disk.
Definition: JDataWriter.cc:63
virtual void actionStop(int length, const char *buffer) override
Definition: JDataWriter.cc:233
void typeout()
Report status of data writing.
Definition: JDataWriter.cc:390
std::string hostname
host name of data server
Definition: JDataWriter.cc:418
long long int numberOfBytes
Definition: JDataWriter.cc:411
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataWriter.cc:263
virtual void actionInit(int length, const char *buffer) override
Definition: JDataWriter.cc:96
JTreeWriter_t writer
TTree writer.
Definition: JDataWriter.cc:419
JLANG::JSharedPointer< JControlHost > datawriter
Definition: JDataWriter.cc:406
JMessageScheduler logErrorFile
Definition: JDataWriter.cc:414
JMessageScheduler logErrorRun
Definition: JDataWriter.cc:413
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataWriter.cc:381
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataWriter.cc:169
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataWriter.cc:255
JMessageScheduler logErrorState
Definition: JDataWriter.cc:416
std::vector< char > buffer
internal buffer for incoming data
Definition: JDataWriter.cc:420
virtual void actionReset(int length, const char *buffer) override
Definition: JDataWriter.cc:163
JMeta meta
meta data
Definition: JDataWriter.cc:400
JDataWriter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const std::string &path)
Constructor.
Definition: JDataWriter.cc:75
virtual void actionStart(int length, const char *buffer) override
Definition: JDataWriter.cc:173
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataWriter.cc:130
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
Definition: JDataWriter.cc:402
JMessageScheduler logErrorTag
Definition: JDataWriter.cc:415
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
Support classes and methods for experiment specific I/O.
Definition: JDataWriter.cc:38
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
static const JNET::JTag RC_DWRITER
Definition: JDAQTags.hh:71
static const JNET::JTag IO_EVENT
Definition: JDAQTags.hh:88
static const int DWRITER_RECEIVE_BUFFER_SIZE
socket JDataWriter.cc <- JDataFilter.cc
Definition: JDAQTags.hh:39
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:66
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:72
std::string getProcessName(const std::string &name, const std::string &process)
Get process name of run control client.
char getTokenDelimeter()
Get the token delimeter for command messages.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:90
Definition: JSTDTypes.hh:14
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:330
Type list.
Definition: JTypeList.hh:23
Auxiliary class for a type holder.
Definition: JType.hh:19
Level specific message streamers.
Auxiliary class for all subscription.
Definition: JControlHost.hh:99
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:72
void setClockInterval(const long long int interval_us)
Set interval time.
Definition: JDAQClient.hh:165
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:120
Map run number to trigger parameters.
Definition: JDataWriter.cc:446
void reset(const int run)
Remove all entries before given run.
Definition: JDataWriter.cc:452
void write(const int run, TFile *file)
Write trigger parameters for given run if not yet done.
Definition: JDataWriter.cc:524
bool is_written(const int run) const
Check if trigger parameters have been written for given run.
Definition: JDataWriter.cc:465
void read(const char *const data, const size_t size)
Read trigger parameters.
Definition: JDataWriter.cc:478
Auxiliary data structure for I/O of trigger parameters.
Definition: JDataWriter.cc:426
JValue_t()
Default constructor.
Definition: JDataWriter.cc:430
JTriggerParameters parameters
trigger parameters
Definition: JDataWriter.cc:435