Jpp test-rotations-old
the software that should make you happy
Loading...
Searching...
No Matches
JDQSimulator.cc
Go to the documentation of this file.
1#include <string>
2#include <iostream>
3#include <iomanip>
4#include <vector>
5#include <limits>
6#include <map>
7
8#include "TRandom3.h"
9#include "JMath/JRandom.hh"
10
11#include "Jeep/JParser.hh"
12#include "Jeep/JProperties.hh"
13#include "Jeep/JTimer.hh"
14#include "Jeep/JTimekeeper.hh"
16#include "JDAQ/JDAQHeaderIO.hh"
19#include "JSupport/JSupport.hh"
20#include "JLang/JException.hh"
23#include "JIO/JByteArrayIO.hh"
24#include "JNet/JSocket.hh"
25#include "JNet/JHostname.hh"
27#include "JTools/JRange.hh"
28
29
30namespace KM3NETDAQ {
31
32
33 using namespace JPP;
34
35
36 /**
37 * Data structure for configuration of JDQSimulator.
38 */
39 class JSource :
40 public std::string
41 {
42 public:
43 /**
44 * Default constructor.
45 */
47 std::string()
48 {}
49
50
51 /**
52 * Read JSource from input stream.
53 *
54 * \param in input stream
55 * \param source JSource
56 * \return input stream
57 */
58 friend inline std::istream& operator>>(std::istream& in, JSource& source)
59 {
60 int index;
61
62 in >> index >> static_cast<std::string&>(source);
63
64 return in;
65 }
66
67
68 /**
69 * Write JSource to output stream.
70 *
71 * \param out output stream
72 * \param source JSource
73 * \return output stream
74 */
75 friend inline std::ostream& operator<<(std::ostream& out, const JSource& source)
76 {
77 out << static_cast<const std::string&>(source);
78
79 return out;
80 }
81 };
82
83
84 /**
85 * Data structure for configuration of JDataFilter.
86 */
87 class JTarget :
88 public JSocketBlocking
89 {
90 public:
91 /**
92 * Default constructor.
93 */
96 {}
97
98
99 /**
100 * Read JTarget from input stream.
101 *
102 * \param in input stream
103 * \param target JTarget
104 * \return input stream
105 */
106 friend inline std::istream& operator>>(std::istream& in, JTarget& target)
107 {
108 using namespace std;
109 using namespace JPP;
110
111 int index;
112 JHostname hostname;
113
114 if (in >> index >> hostname) {
115
116 try {
117
118 target.connect(hostname.hostname, hostname.port);
119
120 target.setTcpNoDelay (true);
121 target.setReuseAddress(true);
122 target.setKeepAlive (true);
123 target.setReceiveBufferSize(1024);
124 target.setSendBufferSize (1024*1024);
125 target.setNonBlocking (false);
126 }
127 catch(const JException& error) {
128 cout << error << endl;
129 target.close();
130 }
131 }
132
133 return in;
134 }
135
136
137 /**
138 * Write JTarget to output stream.
139 *
140 * \param out output stream
141 * \param target JTarget
142 * \return output stream
143 */
144 friend inline std::ostream& operator<<(std::ostream& out, const JTarget& target)
145 {
146 using namespace std;
147
148 out << "TCP no-delay " << target.getTcpNoDelay() << endl;
149 out << "Reuse address " << target.getReuseAddress() << endl;
150 out << "Keep alive " << target.getKeepAlive() << endl;
151 out << "Receive buffer " << target.getReceiveBufferSize() << endl;
152 out << "Send buffer " << target.getSendBufferSize() << endl;
153 out << "Non blocking " << target.getNonBlocking() << endl;
154
155 return out;
156 }
157 };
158
159
160 typedef JRange<int> range_type; // frame index range
161 typedef std::map<int, range_type> map_type; // module inactivity
162
163 /**
164 * Runcontrol client to simulate data queue.
165 * In state running, this application will send raw data to the data filters
166 * in a round robin way, based on the frame index.
167 */
169 public JDAQClient
170 {
171 public:
172 /**
173 * Constructor.
174 *
175 * \param name name of client
176 * \param server name of command message server
177 * \param logger pointer to logger
178 * \param level debug level
179 */
180 JDQSimulator(const std::string& name,
181 const std::string& server,
183 const int level) :
184 JDAQClient(name, server, logger, level)
185 {
187 }
188
189
190 virtual void actionConfigure(int length, const char* buffer) override
191 {
192 using namespace std;
193 using namespace KM3NETDAQ;
194 using namespace JPP;
195
196
198 Long64_t numberOfEvents = 1;
199 int numberOfFrames = numeric_limits<int>::max();
200 double P = 0.0;
201
202 JProperties properties(JEquationParameters("=", ";", "", ""));
203
204 properties["source"] = source;
205 properties["target"] = target;
206 properties["snooze"] = snooze;
207 properties["inputFile"] = inputFile;
208 properties["numberOfEvents"] = numberOfEvents;
209 properties["numberOfFrames"] = numberOfFrames;
210 properties["probability"] = P;
211
212 properties.read(string(buffer, length));
213
214
215 for (vector<JTarget>::iterator i = target.begin(); i != target.end(); ) {
216 if (i->is_open())
217 ++i;
218 else
219 i = target.erase(i);
220 }
221
222 if (inputFile.empty()) { JErrorStream(logger) << "No input files"; }
223 if (target .empty()) { JErrorStream(logger) << "No targets"; }
224
225 if (!snooze.empty()) {
226
227 ostringstream os;
228
229 os << "snooze";
230
231 for (map_type::const_iterator i = snooze.begin(); i != snooze.end(); ++i) {
232 os << ' ' << setw(8) << i->first << " [" << FILL(6,'0') << i->second.getLowerLimit() << "," << FILL(6,'0') << i->second.getUpperLimit() << "]";
233 }
234
235 JNoticeStream(logger) << os.str();
236 }
237
238 const unsigned int index = distance(source.begin(), find(source.begin(), source.end(), getName()));
239
240 int number_of_hits = 0;
241 int number_of_errors = 0;
242
243 if (index < source.size()) {
244
245 while (inputFile.hasNext()) {
246
247 JDAQTimeslice* timeslice = inputFile.next();
248
249 int i1 = (timeslice->size() * (index + 0)) / source.size();
250 int i2 = (timeslice->size() * (index + 1)) / source.size();
251
252 if (i2 - i1 > numberOfFrames) {
253 i2 = i1 + numberOfFrames;
254 }
255
256 for (int i = i1; i != i2; ++i) {
257
258 JDAQSuperFrame& frame = timeslice->at(i);
259
260 for (JDAQSuperFrame::iterator hit = frame.begin(); hit != frame.end(); ++hit) {
261
262 ++number_of_hits;
263
264 if (gRandom->Rndm() <= P) {
265
266 *hit = JDAQHit(hit->getPMT(), getRandom<JDAQHit::JTDC_t>(), hit->getToT());
267
268 ++number_of_errors;
269 }
270 }
271 }
272
273 JDebugStream(logger) << "Processing timeslice: " << inputFile.getCounter() << " [" << i1 << "," << i2 << "]";
274
275 data.push_back(JTimeslice(i2 - i1));
276
277 for (int i = i1; i != i2; ++i) {
278 data.rbegin()->at(i - i1) << timeslice->at(i);
279 }
280 }
281
282 JNoticeStream(logger) << "Number of errors / hits " << number_of_errors << " / " << number_of_hits << " for P = " << P;
283
284 } else {
285
286 JErrorStream(logger) << "Source not found in configuration data: " << getName();
287 }
288
289 setClockInterval((long long int) (1e-3 * getFrameTime()));
290 }
291
292
293 virtual void actionReset(int length, const char* buffer) override
294 {
295 for (std::vector<JTarget>::iterator i = target.begin(); i != target.end(); ++i) {
296 i->close();
297 }
298
299 target.clear();
300 source.clear();
301 data .clear();
302 }
303
304
305 virtual void actionQuit(int length, const char* buffer) override
306 {
307 actionReset(0, NULL);
308 }
309
310
311 virtual void actionStart(int length, const char* buffer) override
312 {
313 numberOfSlices = 0;
314 numberOfBytes = 0;
315
317 data.reset();
318
319 timer.reset();
320
321 resetClock();
322 }
323
324
325 virtual void actionStop(int length, const char* buffer) override
326 {
327 if (timer.usec_wall > 0) { JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s"; }
328 if (numberOfSlices > 0) { JNoticeStream(logger) << "Delay/slice " << (int) (getClockDelay() / numberOfSlices) << " us"; }
329 }
330
331
332 virtual void actionRunning() override
333 {
334 if (!data.empty() && !target.empty()) {
335
336 timer.start();
337
338 try {
339
340 JDAQPreamble preamble;
342
343 for (JTimeslice::const_iterator frame = data->begin(); frame != data->end(); ++frame) {
344
346
347 in >> preamble;
348 in >> header;
349
350 if (snooze.count(header.getModuleID()) == 0 || !snooze[header.getModuleID()](header.getFrameIndex())) {
351
352 JSocketBlocking& socket = target[header.getFrameIndex() % target.size()];
353
354 socket.write(frame->data(), frame->size());
355
356 numberOfBytes += frame->size();
357 }
358 }
359
360 numberOfSlices += 1;
361 }
362 catch(const JException& exception) {
363 JErrorStream(logger) << exception;
364 }
365
366 data.next();
367
368 timer.stop();
369 }
370 }
371
372 private:
376
377
378 /**
379 * Memory management for sending of raw data.
380 */
383
384 class JData :
385 public std::vector<JTimeslice>
386 {
387 public:
388 /**
389 * Default constructor.
390 */
393 {}
394
395
396 /**
397 * Set run number.
398 *
399 * \param run_number run number
400 */
402 {
404
405 for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
406
407 for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
408
410
412
413 in >> header;
414
415 header.setRunNumber(run_number);
416
417 frame->seekp(getSizeof<JDAQPreamble>());
418
419 *frame << header;
420 }
421 }
422 }
423
424
425 /**
426 * Reset internal iterator to begin.
427 */
428 void reset()
429 {
430 page = begin();
431 }
432
433
434 /**
435 * Increment internal iterator.
436 * When the internal iterator reaches the end of the data,
437 * the frame indices of the data are increased and
438 * the internal iterator is reset to the begin of data.
439 */
440 void next()
441 {
442 if (page != end() && ++page == end()) {
443
445
446 for (iterator timeslice = begin(); timeslice != end(); ++timeslice) {
447
448 for (JTimeslice::iterator frame = timeslice->begin(); frame != timeslice->end(); ++frame) {
449
451
453
454 in >> header;
455
456 header.setFrameIndex(header.getFrameIndex() + this->size());
457 header.setTimesliceStart(JDAQUTCExtended(getTimeOfFrame(header.getFrameIndex())));
458
459 frame->seekp(getSizeof<JDAQPreamble>());
460
461 *frame << header;
462 }
463 }
464
465 reset();
466 }
467 }
468
469
470 /**
471 * Smart pointer operator.
472 *
473 * \return current iterator
474 */
475 const_iterator operator->()
476 {
477 return page;
478 }
479
480
481 private:
482 JData::const_iterator page;
483 };
484
485
487
488 long long int numberOfSlices; // total number of timeslices
489 long long int numberOfBytes; // total number of bytes
490
492 };
493}
494
495
496/**
497 * \file
498 *
499 * Program for real-time simulation of data queue.
500 * \author rbruijn
501 */
502int main(int argc, char* argv[])
503{
504 using namespace std;
505
506 string server;
507 string logger;
508 string client_name;
509 bool use_cout;
510 int debug;
511
512 try {
513
514 JParser<> zap("Program for real-time simulation of data queue.");
515
516 zap['H'] = make_field(server) = "localhost";
517 zap['M'] = make_field(logger) = "localhost";
518 zap['u'] = make_field(client_name) = "JDQSimulator";
519 zap['c'] = make_field(use_cout);
520 zap['d'] = make_field(debug) = 3;
521
522 zap(argc, argv);
523 }
524 catch(const exception &error) {
525 FATAL(error.what() << endl);
526 }
527
528
529 using namespace KM3NETDAQ;
530 using namespace JPP;
531
532 JLogger* out = NULL;
533
534 if (use_cout)
535 out = new JStreamLogger(cout);
536 else
537 out = new JControlHostLogger(logger);
538
539 JDQSimulator simbad(client_name, server, out, debug);
540
541 simbad.enter();
542 simbad.run();
543}
KM3NeT DAQ constants, bit handling, etc.
int main(int argc, char *argv[])
Exceptions.
Definition of random value generator.
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:72
Scanning of objects from multiple files according a format that follows from the extension of each fi...
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
Utility class to parse parameter values.
Auxiliary class to define a range between two values.
Base class for interprocess communication.
ROOT TTree parameter settings of various packages.
Scheduling of actions via fixed latency intervals.
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
int run_number
Definition JDAQCHSM.hh:167
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_configure_event ev_configure
Utility class to parse parameter values.
bool read(const JEquation &equation)
Read equation.
Auxiliary class for CPU timing and usage.
Definition JTimer.hh:33
unsigned long long usec_wall
Definition JTimer.hh:238
void stop()
Stop timer.
Definition JTimer.hh:127
void reset()
Reset timer.
Definition JTimer.hh:93
void start()
Start timer.
Definition JTimer.hh:106
Byte array binary input.
void seekg(const int pos)
Set read position.
Byte array binary output.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition JException.hh:24
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
Message logging based on std::ostream.
Blocking socket I/O.
int write(const char *buffer, const int length) override
Write data to socket.
Utility class to parse command line options.
Definition JParser.hh:1698
General purpose class for object reading from a list of file names.
virtual bool hasNext() override
Check availability of next element.
counter_type getCounter() const
Get counter.
virtual const pointer_type & next() override
Get next element.
Range of values.
Definition JRange.hh:42
std::vector< value_type >::iterator iterator
Definition JTimeslice.hh:33
void setFrameIndex(const int frame_index)
Set frame index.
void setRunNumber(const int run)
Set run number.
Control unit client base class.
JSharedPointer< JControlHost > server
message server
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
const_iterator end() const
Definition JDAQFrame.hh:166
const_iterator begin() const
Definition JDAQFrame.hh:165
Hit data structure.
Definition JDAQHit.hh:35
Data frame of one optical module.
Data structure for UTC time.
void reset()
Reset internal iterator to begin.
JData()
Default constructor.
JData::const_iterator page
void setRunNumber(int run_number)
Set run number.
void next()
Increment internal iterator.
const_iterator operator->()
Smart pointer operator.
Runcontrol client to simulate data queue.
long long int numberOfBytes
JIO::JByteArrayWriter JFrame
Memory management for sending of raw data.
virtual void actionConfigure(int length, const char *buffer) override
virtual void actionStart(int length, const char *buffer) override
virtual void actionRunning() override
This method is repeatedly called when this client machine is in state Running and the clock interval ...
virtual void actionStop(int length, const char *buffer) override
std::vector< JTarget > target
virtual void actionReset(int length, const char *buffer) override
std::vector< JFrame > JTimeslice
virtual void actionQuit(int length, const char *buffer) override
JDQSimulator(const std::string &name, const std::string &server, JLogger *logger, const int level)
Constructor.
long long int numberOfSlices
std::vector< JSource > source
Data structure for configuration of JDQSimulator.
JSource()
Default constructor.
friend std::istream & operator>>(std::istream &in, JSource &source)
Read JSource from input stream.
friend std::ostream & operator<<(std::ostream &out, const JSource &source)
Write JSource to output stream.
Data structure for configuration of JDataFilter.
friend std::istream & operator>>(std::istream &in, JTarget &target)
Read JTarget from input stream.
friend std::ostream & operator<<(std::ostream &out, const JTarget &target)
Write JTarget to output stream.
JTarget()
Default constructor.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
const char * getName()
Get ROOT name of given data type.
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
double getFrameTime()
Get frame time duration.
Definition JDAQClock.hh:162
double getTimeOfFrame(const int frame_index)
Get start time of frame in ns since start of run for a given frame index.
Definition JDAQClock.hh:185
JRange< int > range_type
size_t getSizeof< JDAQSuperFrameHeader >()
Get size of type.
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
size_t getSizeof< JDAQPreamble >()
Get size of type.
std::map< int, range_type > map_type
static const JNET::JTag RC_DQSIMULATOR
Definition JDAQTags.hh:69
Auxiliary data structure for sequence of same character.
Definition JManip.hh:330
Target.
Definition JHead.hh:300
Level specific message streamers.
Auxiliary data structure for hostname and port number.
Definition JHostname.hh:35
std::string hostname
Definition JHostname.hh:171
void setClockInterval(const long long int interval_us)
Set interval time.
long long int getClockDelay() const
Get total delay time.
void resetClock()
Reset clock.