Jpp  18.3.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataWriter.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <sstream>
4 #include <iomanip>
5 #include <map>
6 
7 #include "Jeep/JParser.hh"
8 #include "Jeep/JProperties.hh"
9 #include "Jeep/JTimer.hh"
10 #include "Jeep/JTimekeeper.hh"
11 #include "Jeep/JPrint.hh"
12 #include "Jeep/JeepToolkit.hh"
13 #include "JNet/JControlHost.hh"
14 #include "JLang/JException.hh"
15 #include "JLang/JSharedPointer.hh"
18 #include "JDAQ/JDAQTags.hh"
19 #include "JDAQ/JDAQEventIO.hh"
20 #include "JDAQ/JDAQTimesliceIO.hh"
25 #include "JIO/JByteArrayIO.hh"
26 #include "JTools/JAutoMap.hh"
27 #include "JSupport/JMeta.hh"
28 #include "JSupport/JSupport.hh"
30 
31 
32 /**
33  * Type definition of auto map.
34  */
36 
37 
38 namespace JSUPPORT {
39 
40  /**
41  * Get key for given DAQ data type.
42  *
43  * \param type data type
44  * \return map element
45  */
46  template<>
47  template<class T>
49  {
50  return getTag<T>();
51  }
52 }
53 
54 
55 namespace KM3NETDAQ {
56 
57  /**
58  * Runcontrol client to write data to disk.
59  * In state running, this application will write ROOT formatted data from the data filters to disk.
60  */
61  class JDataWriter :
62  public JDAQClient
63  {
64  public:
65  /**
66  * Constructor.
67  *
68  * \param name name of client
69  * \param server name of command message server
70  * \param hostname name of data server
71  * \param logger pointer to logger
72  * \param level debug level
73  * \param path default path
74  */
76  const std::string& server,
77  const std::string& hostname,
78  JLogger* logger,
79  const int level,
80  const std::string& path) :
81  JDAQClient(name, server, logger, level),
82  datawriter(),
83  path (path),
84  hostname (hostname)
85  {
87 
88  JControlHost::Throw(true);
89 
90  // map ControlHost tag to TTree writer.
91 
93  }
94 
95 
96  virtual void actionInit(int length, const char* buffer) override
97  {
98  using namespace std;
99  using namespace JPP;
100 
101  // start server
102 
103  try {
104 
105  datawriter.reset(new JControlHost(hostname));
106 
107  datawriter->setReceiveBufferSize(DWRITER_RECEIVE_BUFFER_SIZE);
108 
109  datawriter->MyId(getFullName());
110 
112 
113  for (JTreeWriter_t::iterator i = writer.begin(); i != writer.end(); ++i) {
114  buffer.add(JSubscriptionAll(i->first));
115  }
116 
118 
119  datawriter->Subscribe(buffer);
120  datawriter->SendMeAlways();
121 
122  JNoticeStream(logger) << "Established connection to " << hostname;
123  }
124  catch(const std::exception& exception) {
125  JErrorStream(logger) << exception.what();
126  }
127  }
128 
129 
130  virtual void actionConfigure(int length, const char* buffer) override
131  {
132  using namespace std;
133 
134  long long int update_s = 10;
135  long long int logger_s = 5;
136 
137  JProperties properties(JEquationParameters("=", ";", "", ""));
138 
139  properties["path"] = path;
140  properties["update_s"] = update_s;
141  properties["logger_s"] = logger_s;
142 
143  properties.read(string(buffer, length));
144 
145  if (update_s <= 0) { update_s = 1; }
146  if (logger_s <= 0) { logger_s = 1; }
147 
148  setClockInterval(update_s * 1000000LL);
149 
150  JDebugStream(logger) << "Path <" << path << ">";
151  JDebugStream(logger) << "Update period [s] " << update_s;
152 
157 
158  numberOfEvents = 0;
159  numberOfBytes = 0;
160  }
161 
162 
163  virtual void actionReset(int length, const char* buffer) override
164  {
165  datawriter.reset();
166  }
167 
168 
169  virtual void actionQuit(int length, const char* buffer) override
170  {}
171 
172 
173  virtual void actionStart(int length, const char* buffer) override
174  {
175  using namespace std;
176  using namespace JPP;
177 
178  JStatusStream(logger) << "Start run " << getDetectorID() << ' ' << getRunNumber();
179 
180  if (writer.is_open()) {
181 
182  JErrorStream (logger) << "Previous file still open -> close";
183 
184  writer.close();
185  }
186 
187  ostringstream os;
188 
189  for (int i = 0; !writer.is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
190 
191  os.str("");
192 
193  os << getFullPath(path)
194  << "KM3NeT"
195  << "_" << FILL(8,'0') << getDetectorID()
196  << "_" << FILL(8,'0') << getRunNumber();
197 
198  if (i != 0) {
199  os << "_" << i;
200  }
201 
202  os << ".root";
203 
204  try {
205  writer.open(os.str().c_str());
206  }
207  catch(const std::exception& exception) {
208  JErrorStream(logger) << exception.what();
209  }
210  }
211 
212  if (writer.is_open())
213  JNoticeStream(logger) << "Output file " << os.str();
214  else
215  JErrorStream (logger) << "File not opened " << os.str();
216 
217  numberOfEvents = 0;
218  numberOfBytes = 0;
219 
221 
222  timer.reset();
223 
224  logErrorRun .reset();
225  logErrorFile .reset();
226  logErrorTag .reset();
228 
230  }
231 
232 
233  virtual void actionStop(int length, const char* buffer) override
234  {
235  typeout();
236 
237  if (timer.usec_wall > 0) {
238  JStatusStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s";
239  }
240 
241  if (!run_db.is_written(getRunNumber())) {
242  JErrorStream(logger) << "No trigger parameters written for run " << getRunNumber();
243  }
244 
245  writer.close();
246 
247  // Release resources.
248 
250 
251  this->buffer.swap(null);
252  }
253 
254 
255  virtual void setSelect(JFileDescriptorMask& mask) const override
256  {
257  if (datawriter.is_valid()) {
258  mask.set(*datawriter);
259  }
260  }
261 
262 
263  virtual void actionSelect(const JFileDescriptorMask& mask) override
264  {
265  using namespace std;
266  using namespace JPP;
267 
268  if (datawriter.is_valid() && mask.has(*datawriter)) {
269 
270  try {
271 
272  JPrefix prefix;
273 
274  datawriter->WaitHead(prefix);
275 
276  timer.start();
277 
278  buffer.resize(prefix.getSize());
279 
280  datawriter->GetFullData(buffer.data(), buffer.size());
281 
282 
283  if (prefix.getTag() == IO_TRIGGER_PARAMETERS) {
284 
285  try {
286  run_db.read(buffer.data(), buffer.size());
287  }
288  catch(const std::exception& error) {
289 
290  JErrorStream(logger) << "Fatal error reading trigger parameters \"" << error.what() << "\"; trigger ev_error.";
291 
292  ev_error();
293  }
294  }
295 
296 
297  if (isRunning()) {
298 
299  // Write trigger parameters for current run if not yet done
300 
302 
303  JTreeWriter_t::iterator i = writer.find(prefix.toString());
304 
305  if (i != writer.end()) {
306 
307  TFile* out = i->second->GetCurrentFile();
308 
309  if (out != NULL && out->IsOpen()) {
310 
311  JDAQPreamble preamble;
312  Version_t version;
313  JDAQHeader header;
314 
315  JByteArrayReader in(buffer.data(), buffer.size());
316 
317  in >> preamble >> version >> header;
318 
319  in.seekg(0); // rewind
320 
321  if (header.getRunNumber() == getRunNumber()) {
322 
323  const Int_t nb = i->second->copy(in);
324 
325  if (nb < (int) buffer.size() || in.tellg() != (int) buffer.size()) {
326  JWarningStream(logger) << "Inconsistency at copy of "
327  << prefix.toString() << ' '
328  << buffer.size() << ' '
329  << in.tellg() << ' '
330  << nb;
331  }
332 
333  if (prefix.getTag() == IO_EVENT)
334  numberOfEvents += 1;
335  numberOfBytes += buffer.size();
336 
337  if (prefix.getTag() == IO_EVENT && numberOfEvents == 1) {
338  typeout();
339  }
340 
341  } else {
342  JErrorStream(logErrorRun) << "Inconsistent run number "
343  << header.getRunNumber()
344  << " != "
345  << getRunNumber();
346  }
347  } else {
348  JErrorStream(logErrorFile) << "Output file not open";
349  }
350  } else {
351  if (prefix.getTag() != IO_TRIGGER_PARAMETERS) {
352  JErrorStream(logErrorTag) << "Unknown tag <" << prefix.toString() << ">, no data written";
353  }
354  }
355  } else {
356  JWarningStream(logErrorState) << "Not in running state <" << prefix.toString() << ">, no data written";
357  }
358 
359  timer.stop();
360  }
361  catch(const std::exception& error) {
362 
363  JErrorStream(logger) << "Fatal error \"" << error.what() << "\"; trigger ev_error.";
364 
365  ev_error();
366  }
367  }
368  }
369 
370 
371  virtual void actionRunning() override
372  {
373  typeout();
374  }
375 
376 
377  /**
378  * Report status of data writing.
379  */
380  void typeout()
381  {
382  std::ostringstream message;
383 
385 
386  logger.typeout(RC_LOG, message.str());
387  logger.status(message.str());
388  }
389 
390  JMeta meta; //!< meta data
391 
392  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
393 
394  private:
395 
397  std::string path; // directory for output file
398 
399  JEEP::JTimer timer; // timer for I/O measurement
400  Long64_t numberOfEvents; // total number of events
401  long long int numberOfBytes; // total number of bytes
402 
407 
408  std::string hostname; //!< host name of data server
409  JTreeWriter_t writer; //!< TTree writer
410  std::vector<char> buffer; //!< internal buffer for incoming data
411 
412 
413  /**
414  * Auxiliary data structure for I/O of trigger parameters.
415  */
416  struct JValue_t {
417  /**
418  * Default constructor.
419  */
421  count(0),
422  is_written(false)
423  {}
424 
425  JTriggerParameters parameters; //!< trigger parameters
426  int count; //!< reader count
427  bool is_written; //!< writer status
428  };
429 
430 
431  /**
432  * Map run number to trigger parameters.
433  */
434  struct JRunDB :
435  public std::map<int, JValue_t>
436  {
437  /**
438  * Remove all entries before given run.
439  *
440  * \param run run number
441  */
442  inline void reset(const int run)
443  {
444  while (!this->empty() && this->begin()->first < run) {
445  this->erase(this->begin());
446  }
447  }
448 
449  /**
450  * Check if trigger parameters have been written for given run.
451  *
452  * \param run run number
453  * \return true if written; else false.
454  */
455  inline bool is_written(const int run) const
456  {
457  const_iterator p = this->find(run);
458 
459  return p != this->end() && p->second.is_written;
460  }
461 
462  /**
463  * Read trigger parameters.
464  *
465  * \param data data
466  * \param size size
467  */
468  void read(const char* const data, const size_t size)
469  {
470  using namespace std;
471  using namespace JPP;
472 
473 
474  const string buffer(data, size);
475 
476  istringstream in(buffer);
477 
478  int run = -1;
480 
481  in >> run;
482 
483  if (!in) {
484  THROW(JIOException, "Error reading run number for trigger parameters " << run << endl << in.rdbuf());
485  }
486 
487  in >> parameters;
488 
489  in.clear(std::ios::eofbit);
490 
491  if (!in) {
492  THROW(JIOException, "Error reading trigger parameters " << in.rdbuf());
493  }
494 
495  JValue_t& value = (*this)[run];
496 
497  if (value.count == 0) {
498  value.parameters = parameters;
499  }
500 
501  value.count += 1;
502 
503  if (!parameters.equals(value.parameters)) {
504  THROW(JException, "Inconsistent trigger parameters " << endl << value.parameters << " != " << endl << parameters);
505  }
506  }
507 
508  /**
509  * Write trigger parameters for given run if not yet done.
510  *
511  * \param run run number
512  * \param file pointer to ROOT file
513  */
514  inline void write(const int run, TFile* file)
515  {
516  if (file != NULL) {
517 
518  iterator p = this->find(run);
519 
520  if (p != this->end() && p->second.count != 0 && !p->second.is_written) {
521 
522  file->WriteTObject(&p->second.parameters);
523 
524  p->second.is_written = true;
525  }
526  }
527  }
528  };
529 
531  };
532 }
533 
534 
535 /**
536  * \file
537  *
538  * Application for writing real-time data to disk.
539  * \author mdejong
540  */
541 int main(int argc, char* argv[])
542 {
543  using namespace std;
544  using namespace JPP;
545  using namespace KM3NETDAQ;
546 
547  string server;
548  string logger;
549  string hostname;
550  string client_name;
551  bool use_cout;
552  string path;
553  int debug;
554 
555  try {
556 
557  JParser<> zap("Application for writing real-time data to disk.");
558 
559  zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
560  zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
561  zap['D'] = make_field(hostname, "host name of server for incoming data from data filter") = "localhost";
562  zap['u'] = make_field(client_name, "client name") = "%";
563  zap['c'] = make_field(use_cout, "print to terminal");
564  zap['p'] = make_field(path, "directory for permanent archival of data") = "";
565  zap['d'] = make_field(debug, "debug level") = 0;
566 
567 
568  zap(argc, argv);
569  }
570  catch(const exception &error) {
571  FATAL(error.what() << endl);
572  }
573 
574 
575  JLogger* out = NULL;
576 
577  if (use_cout)
578  out = new JStreamLogger(cout);
579  else
580  out = new JControlHostLogger(logger);
581 
582  JDataWriter dwriter(getProcessName(client_name, argv[0]), server, hostname, out, debug, path);
583 
584  dwriter.meta = JMeta(argc, argv);
585 
586  dwriter.enter();
587  dwriter.run();
588 }
JDataWriter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const std::string &path)
Constructor.
Definition: JDataWriter.cc:75
ControlHost prefix.
Definition: JPrefix.hh:31
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Utility class to parse command line options.
Definition: JParser.hh:1514
virtual void actionStop(int length, const char *buffer) override
Definition: JDataWriter.cc:233
General exception.
Definition: JException.hh:24
Runcontrol client to write data to disk.
Definition: JDataWriter.cc:61
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:679
Exceptions.
std::vector< char > buffer
internal buffer for incoming data
Definition: JDataWriter.cc:410
void status(const JMessage_t &message)
int main(int argc, char *argv[])
Definition: Main.cc:15
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
Definition: JDataWriter.cc:392
ROOT TTree parameter settings of various packages.
virtual void actionQuit(int length, const char *buffer) override
Definition: JDataWriter.cc:169
JDAQStateMachine::ev_configure_event ev_configure
JDAQStateMachine::ev_error_event ev_error
ControlHost class.
then usage $script[< detector identifier >< run range >]< QA/QCfile > nExample script to produce data quality plots nWhen a detector identifier and run range are data are downloaded from the database nand subsequently stored in the given QA QC file
Definition: JDataQuality.sh:19
std::string getProcessName(const std::string &name, const std::string &process)
Get process name of run control client.
Message logging based on std::ostream.
version
Definition: JEditTuneHV.sh:5
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
void read(const char *const data, const size_t size)
Read trigger parameters.
Definition: JDataWriter.cc:468
void reset(const int run)
Remove all entries before given run.
Definition: JDataWriter.cc:442
JMessageScheduler logErrorState
Definition: JDataWriter.cc:406
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
Utility class to parse parameter values.
Definition: JProperties.hh:497
void setClockInterval(const long long int interval_us)
Set interval time.
Definition: JDAQClient.hh:165
Auxiliary data structure for I/O of trigger parameters.
Definition: JDataWriter.cc:416
std::string hostname
host name of data server
Definition: JDataWriter.cc:408
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:84
JTreeWriter_t writer
TTree writer.
Definition: JDataWriter.cc:409
std::string name
Definition: JDAQCHSM.chsm:154
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:521
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:834
Auxiliary class for a type holder.
Definition: JType.hh:19
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:690
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
Definition: JeepToolkit.hh:168
bool is_written(const int run) const
Check if trigger parameters have been written for given run.
Definition: JDataWriter.cc:455
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
Definition: JDataWriter.cc:255
int getSize() const
Get size.
Definition: JPrefix.hh:62
Subscription list.
void stop()
Stop timer.
Definition: JTimer.hh:113
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
Definition: JDataWriter.cc:263
Utility class to parse parameter values.
virtual bool is_open() const override
Check is file is open.
Definition: JRootFile.hh:77
then echo The file $DIR KM3NeT_00000001_00000000 root already please rename or remove it first
virtual void actionConfigure(int length, const char *buffer) override
Definition: JDataWriter.cc:130
long long int numberOfBytes
Definition: JDataWriter.cc:401
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataWriter.cc:371
void reset()
Reset timer.
Definition: JTimer.hh:76
Map run number to trigger parameters.
Definition: JDataWriter.cc:434
JMessageScheduler logErrorRun
Definition: JDataWriter.cc:403
Type list.
Definition: JTypeList.hh:22
int getDetectorID() const
Get detector identifier.
Definition: JDAQCHSM.chsm:89
static JKey_t getKey(JType< T > type)
Get key.
TFile * getFile() const
Get file.
Definition: JRootFile.hh:66
Scheduling of actions via fixed latency intervals.
virtual void actionInit(int length, const char *buffer) override
Definition: JDataWriter.cc:96
I/O formatting auxiliaries.
The template JSharedPointer class can be used to share a pointer to an object.
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:66
JMessageScheduler logErrorFile
Definition: JDataWriter.cc:404
JSUPPORT::JAutoTreeWriter< JNET::JTag > JTreeWriter_t
Type definition of auto map.
Definition: JDataWriter.cc:35
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
Byte array binary input.
Definition: JByteArrayIO.hh:25
JValue_t()
Default constructor.
Definition: JDataWriter.cc:420
Auxiliary methods for handling file names, type names and environment.
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
then awk string
JMessageScheduler logErrorTag
Definition: JDataWriter.cc:405
virtual void typeout(const std::string &tag, const std::string &message) override
Report message.
JSubscriptionList & add(const JSubscription &subscription)
Add subscription.
Level specific message streamers.
JTriggerParameters parameters
trigger parameters
Definition: JDataWriter.cc:425
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
char getTokenDelimeter()
Get the token delimeter for command messages.
bool isRunning() const
Check if this client is in runnig state.
Definition: JDAQClient.hh:508
virtual void close() override
Close file.
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
virtual void open(const char *file_name) override
Open file.
const JTag & getTag() const
Get tag.
Definition: JTag.hh:86
Auxiliary data structure for sequence of same character.
Definition: JManip.hh:328
JMeta meta
meta data
Definition: JDataWriter.cc:390
static const JNET::JTag RC_DWRITER
Definition: JDAQTags.hh:65
#define FATAL(A)
Definition: JMessage.hh:67
virtual void actionStart(int length, const char *buffer) override
Definition: JDataWriter.cc:173
void write(const int run, TFile *file)
Write trigger parameters for given run if not yet done.
Definition: JDataWriter.cc:514
std::string toString() const
Convert tag to string.
Definition: JTag.hh:171
int getRunNumber() const
Get run number.
Definition: JDAQCHSM.chsm:100
void insert()
Insert (list of) data type(s).
void typeout()
Report status of data writing.
Definition: JDataWriter.cc:380
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:120
Control unit client base class.
Definition: JDAQClient.hh:298
Utility class to parse command line options.
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:392
JMessageLogger logger
message logger
Definition: JDAQClient.hh:835
Fixed parameters and ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
static const JNET::JTag IO_EVENT
Definition: JDAQTags.hh:82
virtual void actionReset(int length, const char *buffer) override
Definition: JDataWriter.cc:163
ControlHost tag.
Definition: JTag.hh:38
KM3NeT DAQ constants, bit handling, etc.
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
JLANG::JSharedPointer< JControlHost > datawriter
Definition: JDataWriter.cc:396
static void Throw(const bool option)
Enable/disable throw option.
Definition: JThrow.hh:37
esac $JPP_BIN JLogger sh $LOGGER until pgrep JGetMessage</dev/null > dev null
int debug
debug level
unsigned long long usec_wall
Definition: JTimer.hh:224
static const int DWRITER_RECEIVE_BUFFER_SIZE
socket JDataWriter.cc &lt;- JDataFilter.cc
Definition: JDAQTags.hh:33
void start()
Start timer.
Definition: JTimer.hh:89
Exception for I/O.
Definition: JException.hh:340