Jpp  19.1.0-rc.1
the software that should make you happy
JDAQDriver.hh
Go to the documentation of this file.
1 #ifndef __JRUNCONTROL__JDAQDRIVER__
2 #define __JRUNCONTROL__JDAQDRIVER__
3 
4 #include <string>
5 #include <iostream>
6 #include <sstream>
7 #include <limits>
8 #include <unistd.h>
9 
10 #include "JLang/JException.hh"
11 #include "JLang/JRedirectStream.hh"
14 
16 #include "JRuncontrol/JClient.hh"
18 
19 
20 /**
21  * \author mdejong
22  */
23 
24 namespace KM3NETDAQ {
25 
26  using JLANG::JException;
27  using JLANG::JIOException;
29 
30 
31  /**
32  * Simple driver for run control clients.
33  * This class can be used to start a set of run control clients,
34  * trigger events and eventually stop the clients.
35  */
36  class JDAQDriver :
37  public JDAQClient
38  {
39  public:
40 
41  using JDAQClient::filter;
42 
43  static const int SLEEP_TIME_US = 500000;
44 
45  /**
46  * Constructor.
47  *
48  * \param name name of driver
49  * \param server name of command message server
50  * \param logger logger
51  * \param level debug level
52  * \param timeout_s timeout_s [s]
53  */
54  JDAQDriver(const std::string& name,
55  const std::string& server,
56  JLogger* logger,
57  const int level,
58  const int timeout_s) :
59  JDAQClient(name, server, logger, level),
60  timeout_us(timeout_s * 1000000),
61  is_alive (false)
62  {}
63 
64 
65  /**
66  * Enter the state machine.
67  * The driver will subscribe to the ControlHost tags corresponding to born, died and
68  * reply messages of the clients instead of the standard tags for run control commands.
69  * The clients are started after the driver is ready to receive ControlHost messages.
70  * In case of an error, a message is printed on the terminal and the state machine
71  * is not entered.
72  */
73  virtual bool enter() override
74  {
75  using namespace std;
76  using namespace JPP;
77 
78  if (server.is_valid() && logger.is_valid()) {
79 
80  try {
81 
82  server->Subscribe(JSubscriptionAll(RC_REPLY) +
86  server->SendMeAlways();
87  server->MyId(getFullName());
88 
89  // check alive of this driver
90 
91  for (int i = 0; i != timeout_us && !is_alive; ) {
92  if (!update(true)) {
93  usleep(SLEEP_TIME_US);
94  i += SLEEP_TIME_US;
95  }
96  }
97 
98  if (is_alive) {
99 
100  {
101  //clientList.start();
102 
103  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
104  if (i->isActive()) {
105 
106  JNoticeStream(logger) << "Start process " << *i;
107 
108  i->start();
109  }
110  }
111 
112  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
113 
114  while (update(true)) {
115  }
116 
117  if (clientList.count() >= clientList.count(JClient::ACTIVE))
118  break;
119  else
120  usleep(SLEEP_TIME_US);
121  }
122  }
123 
124  if (clientList.count() != clientList.count(JClient::ACTIVE)) {
125  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
126  if (i->isActive() && i->getBorn() <= i->getDied()) {
127  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
128  }
129  }
130  }
131 
132  return CHSM::machine::enter();
133 
134  } else {
135  cerr << "Timeout at subscription." << endl;
136  }
137  }
138  catch(const JControlHostException& error) {
139  cerr << error << endl;
140  }
141 
142  } else {
143  cerr << "Message server or logger not properly initialised." << endl;
144  }
145 
146  return false;
147  }
148 
149 
150  /**
151  * Exit the state machine.
152  *
153  * This method waits for the clients to terminate using the died message generated by ControlHost.
154  * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
155  */
156  virtual void actionExit() override
157  {
158  using namespace std;
159  using namespace JLANG;
160 
161  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
162 
163  while (update(true)) {}
164 
165  if (clientList.count() == 0)
166  break;
167  else
168  usleep(SLEEP_TIME_US);
169  }
170 
171  if (clientList.count() != 0) {
172  JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
173  }
174 
175  clientList.stop();
176  }
177 
178 
179  /**
180  * Action when entering state.
181  * This method waits for all clients to produce the enter state message.
182  * In case of a timeout, no specific action is taken but an error message is logged.
183  *
184  * \param state entered state
185  * \param event event that triggered transition
186  */
187  virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
188  {
189  for (int i = 0; i != timeout_us && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
190  if (!update(true)) {
191  usleep(SLEEP_TIME_US);
192  i += SLEEP_TIME_US;
193  }
194  }
195 
196  if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
197  JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
198  }
199  }
200 
201 
202  /**
203  * Update client list with incoming ControlHost message.
204  * This method receives and processes a message.
205  * The client list is updated accordingly.
206  * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
207  * The return value is then false.
208  * If the no-wait option is set to false, it waits until the next message is received.
209  *
210  * \param no_wait wait option
211  * \return true if message received; else false
212  */
213  bool update(const bool no_wait)
214  {
215  using namespace std;
216  using namespace JNET;
218 
219  try {
220 
221  string tag;
222  long long int length = 0;
223 
224  if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
225  if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
226 
227  char* data= new char[length];
228 
229  server->GetFullData(data, length);
230 
231  const string buffer(data, length);
232 
233  delete [] data;
234 
235  JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
236 
237  if (tag == RC_LOG) {
238 
239  rc_log = buffer;
240 
241  } else if (buffer.find(getFullName()) != string::npos) {
242 
243  if (tag == DISPTAG_Born)
244  is_alive = true;
245  else if (tag == DISPTAG_Died)
246  is_alive = false;
247 
248  } else {
249 
250  JClientList::iterator i = clientList.find(buffer);
251 
252  if (i != clientList.end()) {
253 
254  i->update(tag, buffer);
255 
256  } else {
257 
258  JErrorStream(logger) << "Message fom illegal client " << buffer;
259 
260  try {
261 
262  if (tag == DISPTAG_Born ||
263  tag == DISPTAG_Died ||
264  tag == RC_REPLY) {
265 
266  string key, hostname, name;
267 
268  istringstream is(buffer);
269 
270  const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
271 
272  is.imbue(loc);
273 
274  if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
275 
276  JClient client(name, hostname);
277 
278  client.update(tag, buffer);
279  client.setMode(JClient::ILLEGAL);
280 
281  clientList.insert(client);
282 
283  JWarningStream(logger) << "Added illegal client " << client.getFullName();
284 
285  } else {
286  THROW(JIOException, "JClient: Error reading " << buffer);
287  }
288  }
289  }
290  catch(const JException& error) {
291  JErrorStream(logger) << error;
292  }
293  }
294  }
295 
296  return true;
297  }
298  catch(const JControlHostException& error) {
299  JErrorStream(logger) << error;
300  }
301 
302  return false;
303  }
304 
305  virtual void actionStart(int, const char*) override
306  {
307  rc_log = "";
308  }
309 
310  virtual void actionStop(int, const char*) override
311  {
312  if (rc_log != "")
314  else
315  JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
316  }
317 
318  /**
319  * Run driver with user input.
320  */
321  void run()
322  {
323  run(std::cin);
324  }
325 
326 
327  /**
328  * Run driver.
329  *
330  * Example input format:
331  * <pre>
332  * # comment line.
333  *
334  * process <process name> <host name> [<start command>];
335  *
336  * # The following tokens in <start command> will be substituted:
337  * # $HOST$ by <host name>;
338  * # $NAME$ by <process name>;
339  * # $SERVER$ by JClient::SERVER;
340  * # $LOGGER$ by JClient::LOGGER;
341  * # $ARGS$ by part following '/' in <process name>;
342  *
343  * # enter state machine.
344  *
345  * enter
346  *
347  * # trigger event
348  * # data can be provided online and mixed with data from a separate file (optional).
349  * # multiple tags should be separated by a new line.
350  *
351  * event <event name> {
352  * [<tag 1> [data]]
353  * [<tag 2> [data][%<file name>%][data]]
354  * }
355  *
356  * # optionally quit before end of input
357  * [quit]
358  *
359  * # optionally kill processes that did not properly terminate.
360  * [exit]
361  * </pre>
362  *
363  * \param in input stream
364  */
365  void run(std::istream& in)
366  {
367  using namespace std;
368 
369  for (string key; in >> key; ) {
370 
371  if (key[0] == '#') {
372 
373  in.ignore(numeric_limits<streamsize>::max(), '\n');
374 
375  } else if (key == "enter") {
376 
377  enter();
378 
379  if (!active()) {
380  cerr << "State machine not entered; abort." << endl;
381  return;
382  }
383 
384  } else if (key == "exit") {
385 
386  timeout_us = 0;
387  exit();
388 
389  } else if (key == "quit") {
390 
391  break;
392 
393  } else if (key == "sleep") {
394 
395  int sec;
396 
397  if (in >> sec) {
398  sleep(sec);
399  }
400 
401  } else if (key == "process") {
402 
403  string buffer;
404 
405  getline(in, buffer, ';');
406 
407  istringstream is(buffer);
408 
409  JClient client;
410 
411  if (is >> client) {
412 
413  client.setMode(JClient::ACTIVE);
414 
415  if (!clientList.insert(client).second) {
416  JWarningStream(logger) << "Process already exists " << client;
417  }
418 
419  } else {
420  JErrorStream(logger) << "Error reading key word process.";
421  }
422 
423  } else if (key == "event" || key == "event*") {
424 
425  JEvent_t event;
426  char c;
427  string buffer;
428  const char eol = '\n';
429 
430  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
431 
432  if (clientList.count() != 0) {
433 
434  JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
435 
436  if (pev != NULL) {
437 
438  if (pev->active() || key == "event*") {
439 
440  istringstream is(buffer);
441 
442  for (string tag; is >> tag; ) {
443 
444  ostringstream os;
445 
446  os << event << getTokenDelimeter();
447 
448  copy(is, os, eol);
449 
450  JNoticeStream(logger) << key << ' ' << tag << ' ' << event;
451 
452  server->PutFullString(tag, os.str());
453  }
454 
455  if (key != "event*") {
456  (*pev)(0, NULL); // trigger driver
457  }
458 
459  } else {
460  JErrorStream(logger) << "Inactive event " << event;
461  }
462 
463  } else {
464  JErrorStream(logger) << "Unknown event " << event;
465  }
466 
467  } else {
468  JErrorStream(logger) << "No active client to trigger event.";
469  }
470 
471  } else {
472  JErrorStream(logger) << "Error reading key word event.";
473  }
474 
475  } else if (key == "message") {
476 
477  string tag;
478  string buffer;
479 
480  if (in >> tag && getline(in, buffer, ';'))
481  server->PutFullString(tag, buffer);
482  else
483  JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
484 
485  } else if (key == "print") {
486 
487  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
488  JNoticeStream(logger) << i->getFullName() << ' '
489  << i->getStartCommand() << ' '
490  << i->getAlive() << ' '
491  << i->getStatename();
492  }
493 
494  } else if (key == "filter") {
495 
496  string client;
497  string buffer;
498 
499  getline(in, buffer, ';');
500 
501  for (istringstream is(buffer); is >> client; ) {
502  filter(client);
503  }
504 
505  } else if (key == "sync") {
506 
507  synchronise();
508 
509  } else {
510 
511  JErrorStream(logger) << "Unknown key: " << key;
512 
513  in.ignore(numeric_limits<streamsize>::max(), '\n');
514  }
515  }
516  }
517 
518 
519  /**
520  * Update client list with incoming ControlHost messages until the client list
521  * is synchronised with the current state or until the timeout.
522  */
523  void update()
524  {
525  using namespace std;
526 
527  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
528 
529  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
530 
531  if (state->active()) {
532  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
533  if (!update(true)) {
534  usleep(SLEEP_TIME_US);
535  i += SLEEP_TIME_US;
536  }
537  }
538  }
539  }
540  }
541 
542 
543  /**
544  * Synchronise clients.
545  */
546  void synchronise()
547  {
548  using namespace std;
549 
550  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
551 
552  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
553 
554  if (state->active()) {
555 
556  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
557 
558  JDebugStream(logger) << "Synchronising " << state->name();
559 
560  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
561 
562  if (i->getMode() == JClient::ACTIVE) {
563 
564  if (!i->getAlive()) {
565 
566  try {
567 
568  string buffer;
569 
570  if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
571 
572  i->setAlive(true);
573 
574  if (buffer.find(i->getHostname()) == string::npos) {
575  JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
576  }
577  }
578  }
579  catch(const JControlHostException& error) {
580  JErrorStream(logger) << error;
581  }
582  }
583 
584  if (i->getAlive() && i->getStatename() != state->name()) {
585  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
586  }
587  }
588  }
589 
590 
591  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
592  if (!update(true)) {
593  usleep(SLEEP_TIME_US);
594  i += SLEEP_TIME_US;
595  }
596  }
597 
598  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
599  JWarningStream(logger) << "Timeout at synchronisation.";
600  }
601  }
602  }
603  }
604  }
605 
606 
607  /**
608  * Filter client list by putting failing clients to sleep.
609  * In this, only clients with names that contain the given character sequence are considered.
610  *
611  * \param target target name of client(s)
612  */
613  void filter(const std::string& target = "")
614  {
615  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
616 
617  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
618 
619  if (state->active()) {
620 
621  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
622 
623  if (target == "" || i->getName().find(target) != std::string::npos) {
624 
625  if (!i->getAlive() || i->getStatename() != state->name()) {
626 
627  JNoticeStream(logger) << "Put to sleep " << i->getFullName();
628 
629  i->setMode(JClient::SLEEP);
630  }
631  }
632  }
633  }
634  }
635  }
636 
637  int timeout_us; //!< timeout of state transitions [us]
638 
639  protected:
640 
642  bool is_alive;
643  std::string rc_log;
644 
645  /**
646  * Copy data from input to output stream.
647  * Tagged file names are recursively expanded.
648  *
649  * \param in input stream
650  * \param out output stream
651  * \param eol end of line
652  */
653  static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
654  {
655  using namespace std;
656  using namespace JPP;
657 
658  const JEquationParameters parameters("=", ";", "", "");
659 
660  string buffer;
661 
662  if (getline(in, buffer, eol)) {
663 
664  for (string::size_type pos = 0; pos < buffer.length(); ) {
665 
666  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
667  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
668 
669  if (lpos != string::npos &&
670  rpos != string::npos) {
671 
672  out << buffer.substr(pos, lpos);
673 
674  lpos += FILENAME_PREFIX.length();
675  pos += lpos;
676 
677  ifstream file(buffer.substr(pos, rpos - lpos).c_str());
678 
679  copy(file, out, '\0');
680 
681  rpos += FILENAME_POSTFIX.length();
682  pos += rpos - lpos;
683 
684  } else {
685 
686  out << buffer.substr(pos);
687 
688  pos += buffer.substr(pos).length();
689  }
690  }
691  }
692  }
693  };
694 }
695 
696 #endif
Exceptions.
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
std::string name
Definition: JDAQCHSM.hh:165
JDAQStateMachine::state_Main::state_RunControl RunControl
JDAQStateMachine::ev_check_event ev_check
JDAQStateMachine::state_Main Main
Exception for ControlHost.
Definition: JException.hh:486
Simple data structure to support I/O of equations (see class JLANG::JEquation).
General exception.
Definition: JException.hh:24
Exception for I/O.
Definition: JException.hh:342
Auxiliary class to specify white space character(s) in currect locale.
Interface for logging messages.
Definition: JLogger.hh:22
bool is_valid() const
Check validity of logger object.
List of ControlHost client managers.
Definition: JLigier.cc:479
ControlHost client manager.
Definition: JLigier.cc:245
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
Control unit client base class.
Definition: JDAQClient.hh:301
JDAQEvent_t * findEvent(const JTag &tag, const std::string &event_name)
Find event in event table.
Definition: JDAQClient.hh:536
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:834
virtual bool filter(const JTag &tag, int length, const char *buffer)
Filter message.
Definition: JDAQClient.hh:662
virtual bool exit() override
Exit the state machine.
Definition: JDAQClient.hh:485
JMessageLogger logger
message logger
Definition: JDAQClient.hh:835
Simple driver for run control clients.
Definition: JDAQDriver.hh:38
virtual bool enter() override
Enter the state machine.
Definition: JDAQDriver.hh:73
virtual void actionStart(int, const char *) override
Definition: JDAQDriver.hh:305
virtual bool filter(const JTag &tag, int length, const char *buffer)
Filter message.
Definition: JDAQClient.hh:662
void run(std::istream &in)
Run driver.
Definition: JDAQDriver.hh:365
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition: JDAQDriver.hh:54
JClientList clientList
Definition: JDAQDriver.hh:641
virtual void actionStop(int, const char *) override
Definition: JDAQDriver.hh:310
virtual void enterState(const CHSM::state &state, const CHSM::event &event) override
Action when entering state.
Definition: JDAQDriver.hh:187
int timeout_us
timeout of state transitions [us]
Definition: JDAQDriver.hh:637
void run()
Run driver with user input.
Definition: JDAQDriver.hh:321
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
Definition: JDAQDriver.hh:523
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
Definition: JDAQDriver.hh:613
void synchronise()
Synchronise clients.
Definition: JDAQDriver.hh:546
virtual void actionExit() override
Exit the state machine.
Definition: JDAQDriver.hh:156
static void copy(std::istream &in, std::ostream &out, const char eol='\n')
Copy data from input to output stream.
Definition: JDAQDriver.hh:653
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
Definition: JDAQDriver.hh:213
static const int SLEEP_TIME_US
Definition: JDAQDriver.hh:43
char * loc(char *orig)
Auxiliary classes and methods for language specific functionality.
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:478
static const JTag DISPTAG_Born("Born")
static const JTag DISPTAG_Died("Died")
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
static const std::string FILENAME_PREFIX
Definition: JDAQTags.hh:60
static const JNET::JTag RC_REPLY
Definition: JDAQTags.hh:67
static const std::string RUN_CONTROL_CLIENT
Definition: JDAQTags.hh:44
static const std::string TOKEN_DELIMETER
Definition: JDAQTags.hh:58
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:66
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:72
char getTokenDelimeter()
Get the token delimeter for command messages.
static const std::string FILENAME_POSTFIX
Definition: JDAQTags.hh:61
Definition: JSTDTypes.hh:14
Target.
Definition: JHead.hh:300
Level specific message streamers.
Auxiliary class for all subscription.
Definition: JControlHost.hh:99
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:120
Auxiliary class for handling event name and optional static information.
Definition: JEvent_t.hh:23
const std::string & getName() const
Get event name.
Definition: JEvent_t.hh:65