Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDataWriter.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <sstream>
4 #include <iomanip>
5 #include <map>
6 
7 #include "Jeep/JParser.hh"
8 #include "Jeep/JProperties.hh"
9 #include "Jeep/JTimer.hh"
10 #include "Jeep/JTimekeeper.hh"
11 #include "JNet/JControlHost.hh"
12 #include "JLang/JException.hh"
13 #include "JLang/JSharedPointer.hh"
15 #include "JDAQ/JDAQ.hh"
16 #include "JDAQ/JDAQTags.hh"
17 #include "JDAQ/JDAQEvent.hh"
18 #include "JDAQ/JDAQTimeslice.hh"
19 #include "JDAQ/JDAQSummaryslice.hh"
23 #include "JIO/JByteArrayIO.hh"
24 #include "JTools/JAutoMap.hh"
25 #include "JSupport/JMeta.hh"
26 #include "JSupport/JSupport.hh"
28 
29 
30 /**
31  * Type definition of auto map.
32  */
34 
35 
36 namespace JSUPPORT {
37 
38  /**
39  * Get key for given DAQ data type.
40  *
41  * \param type data type
42  * \return map element
43  */
44  template<>
45  template<class T>
47  {
48  return getTag<T>();
49  }
50 }
51 
52 
53 namespace KM3NETDAQ {
54 
55  /**
56  * Runcontrol client to write data to disk.
57  * In state running, this application will write ROOT formatted data from the data filters to disk.
58  */
59  class JDataWriter :
60  public JDAQClient
61  {
62  public:
63  /**
64  * Constructor.
65  *
66  * \param name name of client
67  * \param server name of command message server
68  * \param hostname name of data server
69  * \param logger pointer to logger
70  * \param level debug level
71  * \param path default path
72  */
73  JDataWriter(const std::string& name,
74  const std::string& server,
75  const std::string& hostname,
76  JLogger* logger,
77  const int level,
78  const std::string& path) :
79  JDAQClient(name, server, logger, level),
80  datawriter(),
81  path (path),
82  hostname (hostname)
83  {
84  replaceEvent(RC_CMD, RC_DWRITER, ev_configure);
85 
86  JControlHost::Throw(true);
87 
88  // map ControlHost tag to TTree writer.
89 
91  }
92 
93 
94  virtual void actionInit(int length, const char* buffer)
95  {
96  using namespace std;
97  using namespace JPP;
98 
99  // start server
100 
101  try {
102 
103  datawriter.reset(new JControlHost(hostname));
104 
106 
107  for (JTreeWriter_t::iterator i = writer.begin(); i != writer.end(); ++i) {
108  buffer.add(JSubscriptionAll(i->first));
109  }
110 
112 
113  datawriter->Subscribe(buffer);
114  datawriter->SendMeAlways();
115 
116  JNoticeStream(logger) << "Established connection to " << hostname;
117  }
118  catch(const JControlHostException& exception) {
119  JErrorStream(logger) << exception;
120  }
121  }
122 
123 
124  virtual void actionConfigure(int length, const char* buffer)
125  {
126  using namespace std;
127 
128  long long int logger_s = 5;
129  long long int update_s = 10;
130 
131  JProperties properties(JEquationParameters("=", ";", "", ""));
132 
133  properties["path"] = path;
134  properties["update_s"] = update_s;
135  properties["logger_s"] = logger_s;
136 
137  properties.read(string(buffer, length));
138 
139  if (path == "") path = "./";
140  if (update_s <= 0) update_s = 1;
141  if (logger_s <= 0) logger_s = 1;
142 
143  setClockInterval(update_s * 1000000LL);
144 
145  JDebugStream(logger) << "Path <" << path << ">";
146  JDebugStream(logger) << "Update period [s] " << update_s;
147 
148  logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
149  logErrorFile = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
150  logErrorTag = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
151  logErrorState = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
152 
153  numberOfEvents = 0;
154  numberOfBytes = 0;
155  }
156 
157 
158  virtual void actionReset(int length, const char* buffer)
159  {
160  datawriter.reset();
161  }
162 
163 
164  virtual void actionQuit(int length, const char* buffer)
165  {}
166 
167 
168  virtual void actionStart(int length, const char* buffer)
169  {
170  using namespace std;
171  using namespace JPP;
172 
173  JNoticeStream(logger) << string(buffer, length);
174 
175  ostringstream os;
176 
177  for (int i = 0; !writer.is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
178 
179  os.str("");
180 
181  os << path << '/'
182  << "KM3NeT"
183  << "_" << setw(8) << setfill('0') << getDetectorID()
184  << "_" << setw(8) << setfill('0') << getRunNumber();
185 
186  if (i != 0) {
187  os << "_" << i;
188  }
189 
190  os << ".root";
191 
192  try {
193  writer.open(os.str().c_str());
194  }
195  catch(JException& exception) {
196  JErrorStream(logger) << exception;
197  }
198  }
199 
200  if (writer.is_open())
201  JNoticeStream(logger) << "Output file " << os.str();
202  else
203  JErrorStream (logger) << "File not opened " << os.str();
204 
205  numberOfEvents = 0;
206  numberOfBytes = 0;
207 
209 
210  timer.reset();
211 
212  logErrorRun .reset();
213  logErrorFile .reset();
214  logErrorTag .reset();
216 
217  run_db.reset(getRunNumber());
218  }
219 
220 
221  virtual void actionStop(int length, const char* buffer)
222  {
223  typeout();
224 
225  if (timer.usec_wall > 0) {
226  JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s";
227  }
228 
229  if (!run_db.is_written(getRunNumber())) {
230  JErrorStream(logger) << "No trigger parameters written for run " << getRunNumber();
231  }
232 
233  writer.close();
234 
235  // Release resources.
236 
238 
239  this->buffer.swap(null);
240  }
241 
242 
243  virtual void setSelect(JFileDescriptorMask& mask) const
244  {
245  if (datawriter.is_valid()) {
246  mask.set(*datawriter);
247  }
248  }
249 
250 
251  virtual void actionSelect(const JFileDescriptorMask& mask)
252  {
253  using namespace std;
254  using namespace JPP;
255 
256  if (datawriter.is_valid() && mask.has(*datawriter)) {
257 
258  try {
259 
260  JPrefix prefix;
261 
262  datawriter->WaitHead(prefix);
263 
264  timer.start();
265 
266  buffer.resize(prefix.getSize());
267 
268  datawriter->GetFullData(buffer.data(), buffer.size());
269 
270 
271  if (prefix.getTag() == IO_TRIGGER_PARAMETERS) {
272 
273  const string __str__(buffer.data(), buffer.size());
274 
275  istringstream in(__str__);
276 
277  if (!run_db.read(in)) {
278 
279  JErrorStream(logger) << "Fatal error reading trigger parameters " << __str__;
280 
281  ev_error();
282  }
283  }
284 
285 
286  if (isRunning()) {
287 
288  // Write trigger parameters for current run if not yet done
289 
290  run_db.write(getRunNumber(), writer.getFile());
291 
292  JTreeWriter_t::iterator i = writer.find(prefix.toString());
293 
294  if (i != writer.end()) {
295 
296  TFile* out = i->second->GetCurrentFile();
297 
298  if (out != NULL && out->IsOpen()) {
299 
300  JDAQPreamble preamble;
301  JDAQHeader header;
302 
303  JByteArrayReader in(buffer.data(), buffer.size());
304 
305  in >> preamble >> header;
306 
307  in.seekg(0); // rewind
308 
309  if (header.getRunNumber() == getRunNumber()) {
310 
311  const Int_t nb = i->second->copy(in);
312 
313  if (nb < (int) buffer.size() || in.tellg() != (int) buffer.size()) {
314  JWarningStream(logger) << "Inconsistency at copy of "
315  << prefix.toString() << ' '
316  << buffer.size() << ' '
317  << in.tellg() << ' '
318  << nb;
319  }
320 
321  if (prefix.getTag() == IO_EVENT)
322  numberOfEvents += 1;
323  numberOfBytes += buffer.size();
324 
325  if (prefix.getTag() == IO_EVENT && numberOfEvents == 1) {
326  typeout();
327  }
328 
329  } else {
330  JErrorStream(logErrorRun) << "Inconsistent run number "
331  << header.getRunNumber()
332  << " != "
333  << getRunNumber();
334  }
335  } else {
336  JErrorStream(logErrorFile) << "Output file not open";
337  }
338  } else {
339  if (prefix.getTag() != IO_TRIGGER_PARAMETERS) {
340  JErrorStream(logErrorTag) << "Unknown tag <" << prefix.toString() << ">, no data written";
341  }
342  }
343  } else {
344  JErrorStream(logErrorState) << "Not in running state <" << prefix.toString() << ">, no data written";
345  }
346 
347  timer.stop();
348  }
349  catch(const JControlHostException& exception) {
350  JErrorStream(logger) << exception;
351  }
352  }
353  }
354 
355 
356  virtual void actionRunning()
357  {
358  typeout();
359  }
360 
361 
362  /**
363  * Report status of data writing.
364  */
365  void typeout()
366  {
367  std::ostringstream message;
368 
370 
371  logger.typeout(RC_LOG, message.str());
372  logger.status(message.str());
373  }
374 
375  JMeta meta; //!< meta data
376 
377  static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
378 
379  private:
380 
382  std::string path; // directory for output file
383 
384  JEEP::JTimer timer; // timer for I/O measurement
385  Long64_t numberOfEvents; // total number of events
386  long long int numberOfBytes; // total number of bytes
387 
392 
393  std::string hostname; //!< host name of data server
394  JTreeWriter_t writer; //!< TTree writer
395  std::vector<char> buffer; //!< internal buffer for incoming data
396 
397 
398  /**
399  * Auxiliary data structure for I/O of trigger parameters.
400  */
401  struct JValue_t {
402  /**
403  * Default constructor.
404  */
406  count(0),
407  is_written(false)
408  {}
409 
410  JTriggerParameters parameters; //!< trigger parameters
411  int count; //!< reader count
412  bool is_written; //!< writer status
413  };
414 
415 
416  /**
417  * Map run number to trigger parameters.
418  */
419  struct JRunDB :
420  public std::map<int, JValue_t>
421  {
422  /**
423  * Remove all entries before given run.
424  *
425  * \param run run number
426  */
427  inline void reset(const int run)
428  {
429  while (!this->empty() && this->begin()->first < run) {
430  this->erase(this->begin());
431  }
432  }
433 
434  /**
435  * Check if trigger parameters have been written for given run.
436  *
437  * \param run run number
438  * \return true if written; else false.
439  */
440  inline bool is_written(const int run) const
441  {
442  const_iterator p = this->find(run);
443 
444  return p != this->end() && p->second.is_written;
445  }
446 
447  /**
448  * Read trigger parameters.
449  *
450  * \param in input stream
451  * \return true if OK; else not
452  */
453  bool read(std::istream& in)
454  {
455  int run = -1;
456  JTriggerParameters parameters;
457 
458  in >> run >> parameters;
459 
460  in.clear(std::ios::eofbit);
461 
462  if (in && run != -1) {
463 
464  JValue_t& value = (*this)[run];
465 
466  if (value.count == 0) {
467  value.parameters = parameters;
468  }
469 
470  value.count += 1;
471 
472  return parameters.equals(value.parameters);
473  }
474 
475  return false;
476  }
477 
478  /**
479  * Write trigger parameters for given run if not yet done.
480  *
481  * \param run run number
482  * \param file pointer to ROOT file
483  */
484  inline void write(const int run, TFile* file)
485  {
486  if (file != NULL) {
487 
488  iterator p = this->find(run);
489 
490  if (p != this->end() && p->second.count != 0 && !p->second.is_written) {
491 
492  file->WriteTObject(&p->second.parameters);
493 
494  p->second.is_written = true;
495  }
496  }
497  }
498  };
499 
501  };
502 }
503 
504 
505 /**
506  * \file
507  *
508  * Application for writing real-time data to disk.
509  * \author mdejong
510  */
511 int main(int argc, char* argv[])
512 {
513  using namespace std;
514 
515  string server;
516  string logger;
517  string hostname;
518  string client_name;
519  bool use_cout;
520  string path;
521  int debug;
522 
523  try {
524 
525  JParser<> zap("Application for writing real-time data to disk.");
526 
527  zap['H'] = make_field(server) = "localhost";
528  zap['M'] = make_field(logger) = "localhost";
529  zap['D'] = make_field(hostname) = "localhost";
530  zap['u'] = make_field(client_name) = "%";
531  zap['c'] = make_field(use_cout);
532  zap['p'] = make_field(path) = "";
533  zap['d'] = make_field(debug) = 3;
534 
535  zap(argc, argv);
536  }
537  catch(const exception &error) {
538  FATAL(error.what() << endl);
539  }
540 
541 
542  using namespace KM3NETDAQ;
543  using namespace JPP;
544 
545  JLogger* out = NULL;
546 
547  if (use_cout)
548  out = new JStreamLogger(cout);
549  else
550  out = new JControlHostLogger(logger);
551 
552  JDataWriter dwriter(getProcessName(client_name, argv[0]), server, hostname, out, debug, path);
553 
554  dwriter.meta = JMeta(argc, argv);
555 
556  dwriter.enter();
557  dwriter.run();
558 }
JDataWriter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const std::string &path)
Constructor.
Definition: JDataWriter.cc:73
ControlHost prefix.
Definition: JPrefix.hh:31
Message logger with time scheduler.
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:71
Utility class to parse command line options.
Definition: JParser.hh:1410
General exception.
Definition: JException.hh:40
Runcontrol client to write data to disk.
Definition: JDataWriter.cc:59
bool read(const JEquation &equation)
Read equation.
Definition: JProperties.hh:665
Exceptions.
Data structure for all trigger parameters.
std::vector< char > buffer
internal buffer for incoming data
Definition: JDataWriter.cc:395
void status(const JMessage_t &message)
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
Definition: JDataWriter.cc:377
ControlHost class.
virtual void actionInit(int length, const char *buffer)
Definition: JDataWriter.cc:94
std::string getProcessName(const std::string &name, const std::string &process)
Get process name of run control client.
bool has(const int file_descriptor) const
Has file descriptor.
void set(const int file_descriptor)
Set file descriptor.
virtual void setSelect(JFileDescriptorMask &mask) const
Set the file descriptor mask for the select call.
Definition: JDataWriter.cc:243
void reset(const int run)
Remove all entries before given run.
Definition: JDataWriter.cc:427
virtual void close()
Close file.
JMessageScheduler logErrorState
Definition: JDataWriter.cc:391
Interface for logging messages.
Definition: JLogger.hh:22
Utility class to parse parameter values.
Definition: JProperties.hh:484
void setClockInterval(const long long int interval_us)
Set interval time.
Definition: JDAQClient.hh:385
Auxiliary data structure for I/O of trigger parameters.
Definition: JDataWriter.cc:401
std::string hostname
host name of data server
Definition: JDataWriter.cc:393
Simple data structure to support I/O of equations (see class JLANG::JEquation).
Time keeper.
Definition: JTimekeeper.hh:34
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition: JDAQTags.hh:66
JTreeWriter_t writer
TTree writer.
Definition: JDataWriter.cc:394
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
Auxiliary class for a type holder.
Definition: JType.hh:19
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:493
bool is_written(const int run) const
Check if trigger parameters have been written for given run.
Definition: JDataWriter.cc:440
int getSize() const
Get size.
Definition: JPrefix.hh:63
Subscription list.
void stop()
Stop timer.
Definition: JTimer.hh:113
Utility class to parse parameter values.
virtual void actionReset(int length, const char *buffer)
Definition: JDataWriter.cc:158
long long int numberOfBytes
Definition: JDataWriter.cc:386
void reset()
Reset timer.
Definition: JTimer.hh:76
Map run number to trigger parameters.
Definition: JDataWriter.cc:419
JMessageScheduler logErrorRun
Definition: JDataWriter.cc:388
virtual void actionConfigure(int length, const char *buffer)
Definition: JDataWriter.cc:124
static JKey_t getKey(JType< T > type)
Get key.
TFile * getFile() const
Get file.
Definition: JRootFile.hh:65
virtual void actionQuit(int length, const char *buffer)
Definition: JDataWriter.cc:164
Scheduling of actions via fixed latency intervals.
The template JSharedPointer class can be used to share a pointer to an object.
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:49
JMessageScheduler logErrorFile
Definition: JDataWriter.cc:389
virtual void actionStart(int length, const char *buffer)
Definition: JDataWriter.cc:168
JSUPPORT::JAutoTreeWriter< JNET::JTag > JTreeWriter_t
Type definition of auto map.
Definition: JDataWriter.cc:33
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1836
Byte array binary input.
Definition: JByteArrayIO.hh:25
virtual void open(const char *file_name)
Open file.
JValue_t()
Default constructor.
Definition: JDataWriter.cc:405
Auxiliary class for method select.
Auxiliary class for CPU timing and usage.
Definition: JTimer.hh:32
ROOT I/O of application specific meta data.
JMessageScheduler logErrorTag
Definition: JDataWriter.cc:390
virtual void actionStop(int length, const char *buffer)
Definition: JDataWriter.cc:221
JSubscriptionList & add(const JSubscription &subscription)
Add subscription.
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:432
int debug
debug level
Definition: JSirene.cc:59
JTriggerParameters parameters
trigger parameters
Definition: JDataWriter.cc:410
char getTokenDelimeter()
Get the token delimeter for command messages.
bool isRunning() const
Check if this client is in runnig state.
Definition: JDAQClient.hh:281
Auxiliary class for all subscription.
Definition: JControlHost.hh:95
const JTag & getTag() const
Get tag.
Definition: JTag.hh:82
JMeta meta
meta data
Definition: JDataWriter.cc:375
static const JNET::JTag RC_DWRITER
Definition: JDAQTags.hh:48
#define FATAL(A)
Definition: JMessage.hh:65
virtual bool is_open() const
Check is file is open.
Definition: JRootFile.hh:76
void write(const int run, TFile *file)
Write trigger parameters for given run if not yet done.
Definition: JDataWriter.cc:484
std::string toString() const
Convert tag to string.
Definition: JTag.hh:167
void insert()
Insert (list of) data type(s).
void typeout()
Report status of data writing.
Definition: JDataWriter.cc:365
Run control client base class.
Definition: JDAQClient.hh:88
Utility class to parse command line options.
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: JDataWriter.cc:356
JTYPELIST< JDAQTimesliceTypes_t, JDAQEvent, JDAQSummaryslice >::typelist JDAQTypes_t
Type list of DAQ data types for I/O.
Definition: JSupport.hh:275
ROOT TTree parameter settings.
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:143
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
void replaceEvent(const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:294
Fixed parameters for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
static const JNET::JTag IO_EVENT
Definition: JDAQTags.hh:64
virtual void typeout(const std::string &tag, const std::string &message)
Report message.
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:248
KM3NeT DAQ constants, bit handling, etc.
static JNullStream null
Null I/O stream.
Definition: JNullStream.hh:51
JLANG::JSharedPointer< JControlHost > datawriter
Definition: JDataWriter.cc:381
virtual void actionSelect(const JFileDescriptorMask &mask)
Action method following last select call.
Definition: JDataWriter.cc:251
static void Throw(const bool option)
Enable/disable throw option.
Definition: JThrow.hh:37
unsigned long long usec_wall
Definition: JTimer.hh:224
void start()
Start timer.
Definition: JTimer.hh:89
bool read(std::istream &in)
Read trigger parameters.
Definition: JDataWriter.cc:453
bool putObject(TDirectory *dir, const T &object)
Write object to ROOT directory.
int main(int argc, char *argv[])
Definition: Main.cpp:15