Jpp test-rotations-old
the software that should make you happy
Loading...
Searching...
No Matches
JDAQDriver.hh
Go to the documentation of this file.
1#ifndef __JRUNCONTROL__JDAQDRIVER__
2#define __JRUNCONTROL__JDAQDRIVER__
3
4#include <string>
5#include <iostream>
6#include <sstream>
7#include <limits>
8#include <unistd.h>
9
10#include "JLang/JException.hh"
14
18
19
20/**
21 * \author mdejong
22 */
23
24namespace KM3NETDAQ {
25
29
30
31 /**
32 * Simple driver for run control clients.
33 * This class can be used to start a set of run control clients,
34 * trigger events and eventually stop the clients.
35 */
36 class JDAQDriver :
37 public JDAQClient
38 {
39 public:
40
42
43 static const int SLEEP_TIME_US = 500000;
44
45 /**
46 * Constructor.
47 *
48 * \param name name of driver
49 * \param server name of command message server
50 * \param logger logger
51 * \param level debug level
52 * \param timeout_s timeout_s [s]
53 */
54 JDAQDriver(const std::string& name,
55 const std::string& server,
57 const int level,
58 const int timeout_s) :
59 JDAQClient(name, server, logger, level),
60 timeout_us(timeout_s * 1000000),
61 is_alive (false)
62 {}
63
64
65 /**
66 * Enter the state machine.
67 * The driver will subscribe to the ControlHost tags corresponding to born, died and
68 * reply messages of the clients instead of the standard tags for run control commands.
69 * The clients are started after the driver is ready to receive ControlHost messages.
70 * In case of an error, a message is printed on the terminal and the state machine
71 * is not entered.
72 */
73 virtual bool enter() override
74 {
75 using namespace std;
76 using namespace JPP;
77
78 if (server.is_valid() && logger.is_valid()) {
79
80 try {
81
82 server->Subscribe(JSubscriptionAll(RC_REPLY) +
86 server->SendMeAlways();
87 server->MyId(getFullName());
88
89 // check alive of this driver
90
91 for (int i = 0; i != timeout_us && !is_alive; ) {
92 if (!update(true)) {
93 usleep(SLEEP_TIME_US);
94 i += SLEEP_TIME_US;
95 }
96 }
97
98 if (is_alive) {
99
100 {
101 //clientList.start();
102
103 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
104 if (i->isActive()) {
105
106 JNoticeStream(logger) << "Start process " << *i;
107
108 i->start();
109 }
110 }
111
112 for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
113
114 while (update(true)) {
115 }
116
117 if (clientList.count() >= clientList.count(JClient::ACTIVE))
118 break;
119 else
120 usleep(SLEEP_TIME_US);
121 }
122 }
123
124 if (clientList.count() != clientList.count(JClient::ACTIVE)) {
125 for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
126 if (i->isActive() && i->getBorn() <= i->getDied()) {
127 JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
128 }
129 }
130 }
131
132 return CHSM::machine::enter();
133
134 } else {
135 cerr << "Timeout at subscription." << endl;
136 }
137 }
138 catch(const JControlHostException& error) {
139 cerr << error << endl;
140 }
141
142 } else {
143 cerr << "Message server or logger not properly initialised." << endl;
144 }
145
146 return false;
147 }
148
149
150 /**
151 * Exit the state machine.
152 *
153 * This method waits for the clients to terminate using the died message generated by ControlHost.
154 * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
155 */
156 virtual void actionExit() override
157 {
158 using namespace std;
159 using namespace JLANG;
160
161 for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
162
163 while (update(true)) {}
164
165 if (clientList.count() == 0)
166 break;
167 else
168 usleep(SLEEP_TIME_US);
169 }
170
171 if (clientList.count() != 0) {
172 JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
173 }
174
175 clientList.stop();
176 }
177
178
179 /**
180 * Action when entering state.
181 * This method waits for all clients to produce the enter state message.
182 * In case of a timeout, no specific action is taken but an error message is logged.
183 *
184 * \param state entered state
185 * \param event event that triggered transition
186 */
187 virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
188 {
189 for (int i = 0; i != timeout_us && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
190 if (!update(true)) {
191 usleep(SLEEP_TIME_US);
192 i += SLEEP_TIME_US;
193 }
194 }
195
196 if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
197 JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
198 }
199 }
200
201
202 /**
203 * Update client list with incoming ControlHost message.
204 * This method receives and processes a message.
205 * The client list is updated accordingly.
206 * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
207 * The return value is then false.
208 * If the no-wait option is set to false, it waits until the next message is received.
209 *
210 * \param no_wait wait option
211 * \return true if message received; else false
212 */
213 bool update(const bool no_wait)
214 {
215 using namespace std;
216 using namespace JNET;
218
219 try {
220
221 string tag;
222 long long int length = 0;
223
224 if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
225 if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
226
227 char* data= new char[length];
228
229 server->GetFullData(data, length);
230
231 const string buffer(data, length);
232
233 delete [] data;
234
235 JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
236
237 if (tag == RC_LOG) {
238
239 rc_log = buffer;
240
241 } else if (buffer.find(getFullName()) != string::npos) {
242
243 if (tag == DISPTAG_Born)
244 is_alive = true;
245 else if (tag == DISPTAG_Died)
246 is_alive = false;
247
248 } else {
249
250 JClientList::iterator i = clientList.find(buffer);
251
252 if (i != clientList.end()) {
253
254 i->update(tag, buffer);
255
256 } else {
257
258 JErrorStream(logger) << "Message fom illegal client " << buffer;
259
260 try {
261
262 if (tag == DISPTAG_Born ||
263 tag == DISPTAG_Died ||
264 tag == RC_REPLY) {
265
266 string key, hostname, name;
267
268 istringstream is(buffer);
269
270 const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
271
272 is.imbue(loc);
273
274 if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
275
276 JClient client(name, hostname);
277
278 client.update(tag, buffer);
279 client.setMode(JClient::ILLEGAL);
280
281 clientList.insert(client);
282
283 JWarningStream(logger) << "Added illegal client " << client.getFullName();
284
285 } else {
286 THROW(JIOException, "JClient: Error reading " << buffer);
287 }
288 }
289 }
290 catch(const JException& error) {
291 JErrorStream(logger) << error;
292 }
293 }
294 }
295
296 return true;
297 }
298 catch(const JControlHostException& error) {
299 JErrorStream(logger) << error;
300 }
301
302 return false;
303 }
304
305 virtual void actionStart(int, const char*) override
306 {
307 rc_log = "";
308 }
309
310 virtual void actionStop(int, const char*) override
311 {
312 if (rc_log != "")
314 else
315 JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
316 }
317
318 /**
319 * Run driver with user input.
320 */
321 void run()
322 {
323 run(std::cin);
324 }
325
326
327 /**
328 * Run driver.
329 *
330 * Example input format:
331 * <pre>
332 * # comment line.
333 *
334 * process <process name> <host name> [<start command>];
335 *
336 * # The following tokens in <start command> will be substituted:
337 * # $HOST$ by <host name>;
338 * # $NAME$ by <process name>;
339 * # $SERVER$ by JClient::SERVER;
340 * # $LOGGER$ by JClient::LOGGER;
341 * # $ARGS$ by part following '/' in <process name>;
342 *
343 * # enter state machine.
344 *
345 * enter
346 *
347 * # trigger event
348 * # data can be provided online and mixed with data from a separate file (optional).
349 * # multiple tags should be separated by a new line.
350 *
351 * event <event name> {
352 * [<tag 1> [data]]
353 * [<tag 2> [data][%<file name>%][data]]
354 * }
355 *
356 * # optionally quit before end of input
357 * [quit]
358 *
359 * # optionally kill processes that did not properly terminate.
360 * [exit]
361 * </pre>
362 *
363 * \param in input stream
364 */
365 void run(std::istream& in)
366 {
367 using namespace std;
368
369 for (string key; in >> key; ) {
370
371 if (key[0] == '#') {
372
373 in.ignore(numeric_limits<streamsize>::max(), '\n');
374
375 } else if (key == "enter") {
376
377 enter();
378
379 if (!active()) {
380 cerr << "State machine not entered; abort." << endl;
381 return;
382 }
383
384 } else if (key == "exit") {
385
386 timeout_us = 0;
387 exit();
388
389 } else if (key == "quit") {
390
391 break;
392
393 } else if (key == "sleep") {
394
395 int sec;
396
397 if (in >> sec) {
398 sleep(sec);
399 }
400
401 } else if (key == "process") {
402
403 string buffer;
404
405 getline(in, buffer, ';');
406
407 istringstream is(buffer);
408
409 JClient client;
410
411 if (is >> client) {
412
413 client.setMode(JClient::ACTIVE);
414
415 if (!clientList.insert(client).second) {
416 JWarningStream(logger) << "Process already exists " << client;
417 }
418
419 } else {
420 JErrorStream(logger) << "Error reading key word process.";
421 }
422
423 } else if (key == "event" || key == "event*") {
424
425 JEvent_t event;
426 char c;
427 string buffer;
428 const char eol = '\n';
429
430 if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
431
432 if (clientList.count() != 0) {
433
434 JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
435
436 if (pev != NULL) {
437
438 if (pev->active() || key == "event*") {
439
440 istringstream is(buffer);
441
442 for (string tag; is >> tag; ) {
443
444 ostringstream os;
445
446 os << event << getTokenDelimeter();
447
448 copy(is, os, eol);
449
450 JNoticeStream(logger) << key << ' ' << tag << ' ' << event;
451
452 server->PutFullString(tag, os.str());
453 }
454
455 if (key != "event*") {
456 (*pev)(0, NULL); // trigger driver
457 }
458
459 } else {
460 JErrorStream(logger) << "Inactive event " << event;
461 }
462
463 } else {
464 JErrorStream(logger) << "Unknown event " << event;
465 }
466
467 } else {
468 JErrorStream(logger) << "No active client to trigger event.";
469 }
470
471 } else {
472 JErrorStream(logger) << "Error reading key word event.";
473 }
474
475 } else if (key == "message") {
476
477 string tag;
478 string buffer;
479
480 if (in >> tag && getline(in, buffer, ';'))
481 server->PutFullString(tag, buffer);
482 else
483 JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
484
485 } else if (key == "print") {
486
487 for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
488 JNoticeStream(logger) << i->getFullName() << ' '
489 << i->getStartCommand() << ' '
490 << i->getAlive() << ' '
491 << i->getStatename();
492 }
493
494 } else if (key == "filter") {
495
496 string client;
497 string buffer;
498
499 getline(in, buffer, ';');
500
501 for (istringstream is(buffer); is >> client; ) {
502 filter(client);
503 }
504
505 } else if (key == "sync") {
506
507 synchronise();
508
509 } else {
510
511 JErrorStream(logger) << "Unknown key: " << key;
512
513 in.ignore(numeric_limits<streamsize>::max(), '\n');
514 }
515 }
516 }
517
518
519 /**
520 * Update client list with incoming ControlHost messages until the client list
521 * is synchronised with the current state or until the timeout.
522 */
523 void update()
524 {
525 using namespace std;
526
527 const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
528
529 for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
530
531 if (state->active()) {
532 for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
533 if (!update(true)) {
534 usleep(SLEEP_TIME_US);
535 i += SLEEP_TIME_US;
536 }
537 }
538 }
539 }
540 }
541
542
543 /**
544 * Synchronise clients.
545 */
547 {
548 using namespace std;
549
550 const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
551
552 for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
553
554 if (state->active()) {
555
556 if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
557
558 JDebugStream(logger) << "Synchronising " << state->name();
559
560 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
561
562 if (i->getMode() == JClient::ACTIVE) {
563
564 if (!i->getAlive()) {
565
566 try {
567
568 string buffer;
569
570 if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
571
572 i->setAlive(true);
573
574 if (buffer.find(i->getHostname()) == string::npos) {
575 JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
576 }
577 }
578 }
579 catch(const JControlHostException& error) {
580 JErrorStream(logger) << error;
581 }
582 }
583
584 if (i->getAlive() && i->getStatename() != state->name()) {
585 server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
586 }
587 }
588 }
589
590
591 for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
592 if (!update(true)) {
593 usleep(SLEEP_TIME_US);
594 i += SLEEP_TIME_US;
595 }
596 }
597
598 if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
599 JWarningStream(logger) << "Timeout at synchronisation.";
600 }
601 }
602 }
603 }
604 }
605
606
607 /**
608 * Filter client list by putting failing clients to sleep.
609 * In this, only clients with names that contain the given character sequence are considered.
610 *
611 * \param target target name of client(s)
612 */
613 void filter(const std::string& target = "")
614 {
615 const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
616
617 for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
618
619 if (state->active()) {
620
621 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
622
623 if (target == "" || i->getName().find(target) != std::string::npos) {
624
625 if (!i->getAlive() || i->getStatename() != state->name()) {
626
627 JNoticeStream(logger) << "Put to sleep " << i->getFullName();
628
629 i->setMode(JClient::SLEEP);
630 }
631 }
632 }
633 }
634 }
635 }
636
637 int timeout_us; //!< timeout of state transitions [us]
638
639 protected:
640
643 std::string rc_log;
644
645 /**
646 * Copy data from input to output stream.
647 * Tagged file names are recursively expanded.
648 *
649 * \param in input stream
650 * \param out output stream
651 * \param eol end of line
652 */
653 static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
654 {
655 using namespace std;
656 using namespace JPP;
657
658 const JEquationParameters parameters("=", ";", "", "");
659
660 string buffer;
661
662 if (getline(in, buffer, eol)) {
663
664 for (string::size_type pos = 0; pos < buffer.length(); ) {
665
666 string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
667 string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
668
669 if (lpos != string::npos &&
670 rpos != string::npos) {
671
672 out << buffer.substr(pos, lpos);
673
674 lpos += FILENAME_PREFIX.length();
675 pos += lpos;
676
677 ifstream file(buffer.substr(pos, rpos - lpos).c_str());
678
679 copy(file, out, '\0');
680
681 rpos += FILENAME_POSTFIX.length();
682 pos += rpos - lpos;
683
684 } else {
685
686 out << buffer.substr(pos);
687
688 pos += buffer.substr(pos).length();
689 }
690 }
691 }
692 }
693 };
694}
695
696#endif
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::state_Main::state_RunControl RunControl
JDAQStateMachine::ev_check_event ev_check
JDAQStateMachine::state_Main Main
Exception for ControlHost.
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition JException.hh:24
Exception for I/O.
Auxiliary class to specify white space character(s) in currect locale.
Interface for logging messages.
Definition JLogger.hh:22
bool is_valid() const
Check validity of logger object.
List of ControlHost client managers.
Definition JLigier.cc:479
ControlHost client manager.
Definition JLigier.cc:245
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
static std::string SERVER
host name of message server
Definition JClient.hh:32
Control unit client base class.
JSharedPointer< JControlHost > server
message server
virtual bool filter(const JTag &tag, int length, const char *buffer)
Filter message.
virtual bool exit() override
Exit the state machine.
JDAQEvent_t * findEvent(const JTag &tag, const std::string &event_name)
Find event in event table.
JMessageLogger logger
message logger
Simple driver for run control clients.
Definition JDAQDriver.hh:38
virtual bool enter() override
Enter the state machine.
Definition JDAQDriver.hh:73
virtual void actionStart(int, const char *) override
void run(std::istream &in)
Run driver.
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition JDAQDriver.hh:54
virtual void actionStop(int, const char *) override
virtual void enterState(const CHSM::state &state, const CHSM::event &event) override
Action when entering state.
int timeout_us
timeout of state transitions [us]
void run()
Run driver with user input.
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
void synchronise()
Synchronise clients.
virtual void actionExit() override
Exit the state machine.
static void copy(std::istream &in, std::ostream &out, const char eol='\n')
Copy data from input to output stream.
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
static const int SLEEP_TIME_US
Definition JDAQDriver.hh:43
char * loc(char *orig)
Auxiliary classes and methods for language specific functionality.
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition JString.hh:478
static const JTag DISPTAG_Born("Born")
static const JTag DISPTAG_Died("Died")
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
static const std::string FILENAME_PREFIX
Definition JDAQTags.hh:60
static const JNET::JTag RC_REPLY
Definition JDAQTags.hh:67
static const std::string RUN_CONTROL_CLIENT
Definition JDAQTags.hh:44
static const std::string TOKEN_DELIMETER
Definition JDAQTags.hh:58
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const JNET::JTag RC_LOG
Definition JDAQTags.hh:72
char getTokenDelimeter()
Get the token delimeter for command messages.
static const std::string FILENAME_POSTFIX
Definition JDAQTags.hh:61
Target.
Definition JHead.hh:300
Level specific message streamers.
Auxiliary class for all subscription.
const std::string & getFullName() const
Get full name of this run control client.
Auxiliary class for handling event name and optional static information.
Definition JEvent_t.hh:23
const std::string & getName() const
Get event name.
Definition JEvent_t.hh:65