Jpp 19.3.0-rc.3
the software that should make you happy
Loading...
Searching...
No Matches
JDataWriter.cc
Go to the documentation of this file.
1#include <string>
2#include <iostream>
3#include <sstream>
4#include <iomanip>
5#include <map>
6
7#include "Jeep/JParser.hh"
8#include "Jeep/JProperties.hh"
9#include "Jeep/JTimer.hh"
10#include "Jeep/JTimekeeper.hh"
11#include "Jeep/JPrint.hh"
12#include "Jeep/JeepToolkit.hh"
13#include "JNet/JControlHost.hh"
14#include "JLang/JException.hh"
18#include "JDAQ/JDAQTags.hh"
19#include "JDAQ/JDAQEventIO.hh"
25#include "JIO/JByteArrayIO.hh"
26#include "JTools/JAutoMap.hh"
27#include "JSupport/JMeta.hh"
28#include "JSupport/JSupport.hh"
30
31
32/**
33 * Type definition of auto map.
34 */
36
37
38namespace JSUPPORT {
39
40 /**
41 * Get key for given DAQ data type.
42 *
43 * \param type data type
44 * \return map element
45 */
46 template<>
47 template<class T>
49 {
50 return getTag<T>();
51 }
52}
53
54
55namespace KM3NETDAQ {
56
57 /**
58 * Runcontrol client to write data to disk.
59 * In state running, this application will write ROOT formatted data from the data filters to disk.
60 */
62 public JDAQClient
63 {
64 public:
65 /**
66 * Constructor.
67 *
68 * \param name name of client
69 * \param server name of command message server
70 * \param hostname name of data server
71 * \param logger pointer to logger
72 * \param level debug level
73 * \param path default path
74 */
75 JDataWriter(const std::string& name,
76 const std::string& server,
77 const std::string& hostname,
79 const int level,
80 const std::string& path) :
81 JDAQClient(name, server, logger, level),
82 datawriter(),
83 path (path),
85 {
87
89
90 // map ControlHost tag to TTree writer.
91
93 }
94
95
96 virtual void actionInit(int length, const char* buffer) override
97 {
98 using namespace std;
99 using namespace JPP;
100
101 // start server
102
103 try {
104
105 datawriter.reset(new JControlHost(hostname));
106
107 datawriter->setReceiveBufferSize(DWRITER_RECEIVE_BUFFER_SIZE);
108
109 datawriter->MyId(getFullName());
110
112
113 for (JTreeWriter_t::iterator i = writer.begin(); i != writer.end(); ++i) {
114 buffer.add(JSubscriptionAll(i->first));
115 }
116
118
119 datawriter->Subscribe(buffer);
120 datawriter->SendMeAlways();
121
122 JNoticeStream(logger) << "Established connection to " << hostname;
123 }
124 catch(const std::exception& exception) {
125 JErrorStream(logger) << exception.what();
126 }
127 }
128
129
130 virtual void actionConfigure(int length, const char* buffer) override
131 {
132 using namespace std;
133
134 long long int update_s = 10;
135 long long int logger_s = 5;
136
137 JProperties properties(JEquationParameters("=", ";", "", ""));
138
139 properties["path"] = path;
140 properties["update_s"] = update_s;
141 properties["logger_s"] = logger_s;
142
143 properties.read(string(buffer, length));
144
145 if (update_s <= 0) { update_s = 1; }
146 if (logger_s <= 0) { logger_s = 1; }
147
148 setClockInterval(update_s * 1000000LL);
149
150 JDebugStream(logger) << "Path <" << path << ">";
151 JDebugStream(logger) << "Update period [s] " << update_s;
152
153 logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
154 logErrorFile = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
155 logErrorTag = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
156 logErrorState = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL));
157
158 numberOfEvents = 0;
159 numberOfBytes = 0;
160 }
161
162
163 virtual void actionReset(int length, const char* buffer) override
164 {
165 datawriter.reset();
166 }
167
168
169 virtual void actionQuit(int length, const char* buffer) override
170 {}
171
172
173 virtual void actionStart(int length, const char* buffer) override
174 {
175 using namespace std;
176 using namespace JPP;
177
178 JStatusStream(logger) << "Start run " << getDetectorID() << ' ' << getRunNumber();
179
180 if (writer.is_open()) {
181
182 JErrorStream (logger) << "Previous file still open -> close";
183
184 writer.close();
185 }
186
187 ostringstream os;
188
189 for (int i = 0; !writer.is_open() && i != MAXIMUM_FILE_NUMBER; ++i) {
190
191 os.str("");
192
193 os << getFullPath(path)
194 << "KM3NeT"
195 << "_" << FILL(8,'0') << getDetectorID()
196 << "_" << FILL(8,'0') << getRunNumber();
197
198 if (i != 0) {
199 os << "_" << i;
200 }
201
202 os << ".root";
203
204 try {
205 writer.open(os.str().c_str());
206 }
207 catch(const std::exception& exception) {
208 JErrorStream(logger) << exception.what();
209 }
210 }
211
212 if (writer.is_open())
213 JNoticeStream(logger) << "Output file " << os.str();
214 else
215 JErrorStream (logger) << "File not opened " << os.str();
216
217 numberOfEvents = 0;
218 numberOfBytes = 0;
219
221
222 timer.reset();
223
228
230 }
231
232
233 virtual void actionStop(int length, const char* buffer) override
234 {
235 typeout();
236
237 if (timer.usec_wall > 0) {
238 JStatusStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s";
239 }
240
242 JErrorStream(logger) << "No trigger parameters written for run " << getRunNumber();
243 }
244
245 writer.close();
246
247 // Release resources.
248
250
251 this->buffer.swap(null);
252 }
253
254
255 virtual void setSelect(JFileDescriptorMask& mask) const override
256 {
257 if (datawriter.is_valid()) {
258 mask.set(*datawriter);
259 }
260 }
261
262
263 virtual void actionSelect(const JFileDescriptorMask& mask) override
264 {
265 using namespace std;
266 using namespace JPP;
267
268 if (datawriter.is_valid() && mask.has(*datawriter)) {
269
270 try {
271
272 JPrefix prefix;
273
274 try {
275
276 datawriter->WaitHead(prefix);
277
278 timer.start();
279
280 buffer.resize(prefix.getSize());
281
282 datawriter->GetFullData(buffer.data(), buffer.size());
283 }
284 catch(const JSocketException& error) {
285
286 JErrorStream(logger) << "Fatal error receiving data: \"" << error.what() << "\" -> disconnect and trigger ev_error.";
287
288 datawriter.reset();
289
290 ev_error();
291 }
292
293 if (prefix.getTag() == IO_TRIGGER_PARAMETERS) {
294
295 try {
296 run_db.read(buffer.data(), buffer.size());
297 }
298 catch(const std::exception& error) {
299
300 JErrorStream(logger) << "Fatal error reading trigger parameters \"" << error.what() << "\"; trigger ev_error.";
301
302 ev_error();
303 }
304 }
305
306
307 if (isRunning()) {
308
309 // Write trigger parameters for current run if not yet done
310
312
313 JTreeWriter_t::iterator i = writer.find(prefix.toString());
314
315 if (i != writer.end()) {
316
317 TFile* out = i->second->GetCurrentFile();
318
319 if (out != NULL && out->IsOpen()) {
320
321 JDAQPreamble preamble;
322 Version_t version;
323 JDAQHeader header;
324
325 JByteArrayReader in(buffer.data(), buffer.size());
326
327 in >> preamble >> version >> header;
328
329 in.seekg(0); // rewind
330
331 if (header.getRunNumber() == getRunNumber()) {
332
333 const Int_t nb = i->second->copy(in);
334
335 if (nb < (int) buffer.size() || in.tellg() != (int) buffer.size()) {
336 JWarningStream(logger) << "Inconsistency at copy of "
337 << prefix.toString() << ' '
338 << buffer.size() << ' '
339 << in.tellg() << ' '
340 << nb;
341 }
342
343 if (prefix.getTag() == IO_EVENT)
344 numberOfEvents += 1;
345 numberOfBytes += buffer.size();
346
347 if (prefix.getTag() == IO_EVENT && numberOfEvents == 1) {
348 typeout();
349 }
350
351 } else {
352 JErrorStream(logErrorRun) << "Inconsistent run number "
353 << header.getRunNumber()
354 << " != "
355 << getRunNumber();
356 }
357 } else {
358 JErrorStream(logErrorFile) << "Output file not open";
359 }
360 } else {
361 if (prefix.getTag() != IO_TRIGGER_PARAMETERS) {
362 JErrorStream(logErrorTag) << "Unknown tag <" << prefix.toString() << ">, no data written";
363 }
364 }
365 } else {
366 JWarningStream(logErrorState) << "Not in running state <" << prefix.toString() << ">, no data written";
367 }
368
369 timer.stop();
370 }
371 catch(const std::exception& error) {
372
373 JErrorStream(logger) << "Fatal error \"" << error.what() << "\"; trigger ev_error.";
374
375 ev_error();
376 }
377 }
378 }
379
380
381 virtual void actionRunning() override
382 {
383 typeout();
384 }
385
386
387 /**
388 * Report status of data writing.
389 */
390 void typeout()
391 {
392 std::ostringstream message;
393
395
396 logger.typeout(RC_LOG, message.str());
397 logger.status(message.str());
398 }
399
400 JMeta meta; //!< meta data
401
402 static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection.
403
404 private:
405
407 std::string path; // directory for output file
408
409 JEEP::JTimer timer; // timer for I/O measurement
410 Long64_t numberOfEvents; // total number of events
411 long long int numberOfBytes; // total number of bytes
412
417
418 std::string hostname; //!< host name of data server
419 JTreeWriter_t writer; //!< TTree writer
420 std::vector<char> buffer; //!< internal buffer for incoming data
421
422
423 /**
424 * Auxiliary data structure for I/O of trigger parameters.
425 */
426 struct JValue_t {
427 /**
428 * Default constructor.
429 */
431 count(0),
432 is_written(false)
433 {}
434
435 JTriggerParameters parameters; //!< trigger parameters
436 int count; //!< reader count
437 bool is_written; //!< writer status
438 };
439
440
441 /**
442 * Map run number to trigger parameters.
443 */
444 struct JRunDB :
445 public std::map<int, JValue_t>
446 {
447 /**
448 * Remove all entries before given run.
449 *
450 * \param run run number
451 */
452 inline void reset(const int run)
453 {
454 while (!this->empty() && this->begin()->first < run) {
455 this->erase(this->begin());
456 }
457 }
458
459 /**
460 * Check if trigger parameters have been written for given run.
461 *
462 * \param run run number
463 * \return true if written; else false.
464 */
465 inline bool is_written(const int run) const
466 {
467 const_iterator p = this->find(run);
468
469 return p != this->end() && p->second.is_written;
470 }
471
472 /**
473 * Read trigger parameters.
474 *
475 * \param data data
476 * \param size size
477 */
478 void read(const char* const data, const size_t size)
479 {
480 using namespace std;
481 using namespace JPP;
482
483
484 const string buffer(data, size);
485
486 istringstream in(buffer);
487
488 int run = -1;
489 JTriggerParameters parameters;
490
491 in >> run;
492
493 if (!in) {
494 THROW(JIOException, "Error reading run number for trigger parameters " << run << endl << in.rdbuf());
495 }
496
497 in >> parameters;
498
499 in.clear(std::ios::eofbit);
500
501 if (!in) {
502 THROW(JIOException, "Error reading trigger parameters " << in.rdbuf());
503 }
504
505 JValue_t& value = (*this)[run];
506
507 if (value.count == 0) {
508 value.parameters = parameters;
509 }
510
511 value.count += 1;
512
513 if (!parameters.equals(value.parameters)) {
514 THROW(JException, "Inconsistent trigger parameters " << endl << value.parameters << " != " << endl << parameters);
515 }
516 }
517
518 /**
519 * Write trigger parameters for given run if not yet done.
520 *
521 * \param run run number
522 * \param file pointer to ROOT file
523 */
524 inline void write(const int run, TFile* file)
525 {
526 if (file != NULL) {
527
528 iterator p = this->find(run);
529
530 if (p != this->end() && p->second.count != 0 && !p->second.is_written) {
531
532 file->WriteTObject(&p->second.parameters);
533
534 p->second.is_written = true;
535 }
536 }
537 }
538 };
539
541 };
542}
543
544
545/**
546 * \file
547 *
548 * Application for writing real-time data to disk.
549 * \author mdejong
550 */
551int main(int argc, char* argv[])
552{
553 using namespace std;
554 using namespace JPP;
555 using namespace KM3NETDAQ;
556
557 string server;
558 string logger;
559 string hostname;
560 string client_name;
561 bool use_cout;
562 string path;
563 int debug;
564
565 try {
566
567 JParser<> zap("Application for writing real-time data to disk.");
568
569 zap['H'] = make_field(server, "host name of server for command messages") = "localhost";
570 zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost";
571 zap['D'] = make_field(hostname, "host name of server for incoming data from data filter") = "localhost";
572 zap['u'] = make_field(client_name, "client name") = "%";
573 zap['c'] = make_field(use_cout, "print to terminal");
574 zap['p'] = make_field(path, "directory for permanent archival of data") = "";
575 zap['d'] = make_field(debug, "debug level") = 0;
576
577
578 zap(argc, argv);
579 }
580 catch(const exception &error) {
581 FATAL(error.what() << endl);
582 }
583
584
585 JLogger* out = NULL;
586
587 if (use_cout)
588 out = new JStreamLogger(cout);
589 else
590 out = new JControlHostLogger(logger);
591
592 JDataWriter dwriter(getProcessName(client_name, argv[0]), server, hostname, out, debug, path);
593
594 dwriter.meta = JMeta(argc, argv);
595
596 dwriter.enter();
597 dwriter.run();
598}
Fixed parameters and ControlHost tags for KM3NeT DAQ.
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
JSUPPORT::JAutoTreeWriter< JNET::JTag > JTreeWriter_t
Type definition of auto map.
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:72
ROOT I/O of application specific meta data.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
I/O formatting auxiliaries.
Utility class to parse parameter values.
ROOT TTree parameter settings of various packages.
Scheduling of actions via fixed latency intervals.
Auxiliary methods for handling file names, type names and environment.
int getDetectorID() const
Get detector identifier.
Definition JDAQCHSM.hh:100
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_error_event ev_error
JDAQStateMachine::ev_configure_event ev_configure
Utility class to parse parameter values.
bool read(const JEquation &equation)
Read equation.
Time keeper.
Auxiliary class for CPU timing and usage.
Definition JTimer.hh:33
unsigned long long usec_wall
Definition JTimer.hh:238
void stop()
Stop timer.
Definition JTimer.hh:127
void reset()
Reset timer.
Definition JTimer.hh:93
void start()
Start timer.
Definition JTimer.hh:106
Byte array binary input.
int tellg() const
Get read position.
void seekg(const int pos)
Set read position.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition JException.hh:24
virtual const char * what() const override
Get error message.
Definition JException.hh:64
Auxiliary class for method select.
void set(const int file_descriptor)
Set file descriptor.
bool has(const int file_descriptor) const
Has file descriptor.
Exception for I/O.
The template JSharedPointer class can be used to share a pointer to an object.
Exception for socket.
static void Throw(const bool option)
Definition JThrow.hh:37
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
virtual void typeout(const std::string &tag, const std::string &message) override
Report message.
void status(const JMessage_t &message)
Message logger with time scheduler.
Message logging based on std::ostream.
ControlHost class.
ControlHost prefix.
Definition JPrefix.hh:33
int getSize() const
Get size.
Definition JPrefix.hh:62
Subscription list.
ControlHost tag.
Definition JTag.hh:38
std::string toString() const
Convert tag to string.
Definition JTag.hh:171
const JTag & getTag() const
Get tag.
Definition JTag.hh:86
Utility class to parse command line options.
Definition JParser.hh:1698
TFile * getFile() const
Get file.
Definition JRootFile.hh:66
virtual bool is_open() const override
Check is file is open.
Definition JRootFile.hh:77
Auxiliary class to copy input data to corresponding TTree.
virtual void open(const char *file_name) override
Open file.
void insert()
Insert (list of) data type(s).
virtual void close() override
Close file.
static JKey_t getKey(JType< T > type)
Get key.
int getRunNumber() const
Get run number.
Control unit client base class.
JSharedPointer< JControlHost > server
message server
bool isRunning() const
Check if this client is in runnig state.
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
Runcontrol client to write data to disk.
virtual void actionStop(int length, const char *buffer) override
void typeout()
Report status of data writing.
std::string hostname
host name of data server
long long int numberOfBytes
virtual void actionSelect(const JFileDescriptorMask &mask) override
Action method following last select call.
virtual void actionInit(int length, const char *buffer) override
JTreeWriter_t writer
TTree writer.
JLANG::JSharedPointer< JControlHost > datawriter
JMessageScheduler logErrorFile
JMessageScheduler logErrorRun
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
virtual void actionQuit(int length, const char *buffer) override
virtual void setSelect(JFileDescriptorMask &mask) const override
Set the file descriptor mask for the select call.
JMessageScheduler logErrorState
std::vector< char > buffer
internal buffer for incoming data
virtual void actionReset(int length, const char *buffer) override
JDataWriter(const std::string &name, const std::string &server, const std::string &hostname, JLogger *logger, const int level, const std::string &path)
Constructor.
virtual void actionStart(int length, const char *buffer) override
virtual void actionConfigure(int length, const char *buffer) override
static const int MAXIMUM_FILE_NUMBER
maximum file number for overwrite protection.
JMessageScheduler logErrorTag
std::string getFullPath(const std::string &path)
Get full path, i.e. add JEEP::PATHNAME_SEPARATOR if necessary.
static JNullStream null
Null I/O stream.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
bool putObject(TDirectory &dir, const TObject &object)
Write object to ROOT directory.
Support classes and methods for experiment specific I/O.
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
static const JNET::JTag RC_DWRITER
Definition JDAQTags.hh:71
static const JNET::JTag IO_EVENT
Definition JDAQTags.hh:88
static const int DWRITER_RECEIVE_BUFFER_SIZE
socket JDataWriter.cc <- JDataFilter.cc
Definition JDAQTags.hh:39
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const JNET::JTag RC_LOG
Definition JDAQTags.hh:72
std::string getProcessName(const std::string &name, const std::string &process)
Get process name of run control client.
char getTokenDelimeter()
Get the token delimeter for command messages.
static const JNET::JTag IO_TRIGGER_PARAMETERS
Definition JDAQTags.hh:90
Auxiliary data structure for sequence of same character.
Definition JManip.hh:330
Type list.
Definition JTypeList.hh:23
Auxiliary class for a type holder.
Definition JType.hh:19
Level specific message streamers.
Auxiliary class for all subscription.
Auxiliary class for ROOT I/O of application specific meta data.
Definition JMeta.hh:72
const std::string & getFullName() const
Get full name of this run control client.
void setClockInterval(const long long int interval_us)
Set interval time.
Map run number to trigger parameters.
void reset(const int run)
Remove all entries before given run.
void write(const int run, TFile *file)
Write trigger parameters for given run if not yet done.
bool is_written(const int run) const
Check if trigger parameters have been written for given run.
void read(const char *const data, const size_t size)
Read trigger parameters.
Auxiliary data structure for I/O of trigger parameters.
JValue_t()
Default constructor.
JTriggerParameters parameters
trigger parameters