Jpp  17.0.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDAQDriver.hh
Go to the documentation of this file.
1 #ifndef __JRUNCONTROL__JDAQDRIVER__
2 #define __JRUNCONTROL__JDAQDRIVER__
3 
4 #include <string>
5 #include <iostream>
6 #include <sstream>
7 #include <limits>
8 #include <unistd.h>
9 
10 #include "JLang/JException.hh"
11 #include "JLang/JRedirectStream.hh"
13 
15 #include "JRuncontrol/JClient.hh"
17 
18 
19 /**
20  * \author mdejong
21  */
22 
23 namespace KM3NETDAQ {
24 
25  using JLANG::JException;
26  using JLANG::JIOException;
28 
29 
30  /**
31  * Simple driver for run control clients.
32  * This class can be used to start a set of run control clients,
33  * trigger events and eventually stop the clients.
34  */
35  class JDAQDriver :
36  public JDAQClient
37  {
38  public:
39  /**
40  * Constructor.
41  *
42  * \param name name of driver
43  * \param server name of command message server
44  * \param logger logger
45  * \param level debug level
46  * \param timeout_s timeout_s [s]
47  */
48  JDAQDriver(const std::string& name,
49  const std::string& server,
50  JLogger* logger,
51  const int level,
52  const int timeout_s) :
53  JDAQClient(name, server, logger, level),
54  timeout_s (timeout_s),
55  is_alive (false)
56  {}
57 
58 
59  /**
60  * Enter the state machine.
61  * The driver will subscribe to the ControlHost tags corresponding to born, died and
62  * reply messages of the clients instead of the standard tags for run control commands.
63  * The clients are started after the driver is ready to receive ControlHost messages.
64  * In case of an error, a message is printed on the terminal and the state machine
65  * is not entered.
66  */
67  virtual bool enter() override
68  {
69  using namespace std;
70  using namespace JLANG;
71  using namespace JNET;
72 
73  if (server.is_valid() && logger.is_valid()) {
74 
75  try {
76 
77  server->Subscribe(JSubscriptionAll(RC_REPLY) +
81  server->SendMeAlways();
82  server->MyId(getFullName());
83 
84  // check alive of this driver
85 
86  for (int i = 0; i != timeout_s && !is_alive; ) {
87  if (!update(true)) {
88  sleep(1);
89  ++i;
90  }
91  }
92 
93  if (is_alive) {
94 
95  {
96  clientList.start();
97 
98  for (int i = 0; i != timeout_s; ++i) {
99 
100  while (update(true)) {}
101 
102  if (clientList.count() >= clientList.count(JClient::ACTIVE))
103  break;
104  else
105  sleep(1);
106  }
107  }
108 
109  if (clientList.count() != clientList.count(JClient::ACTIVE)) {
110  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
111  if (i->isActive() && i->getBorn() <= i->getDied()) {
112  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
113  }
114  }
115  }
116 
117  return CHSM::machine::enter();
118 
119  } else {
120  cerr << "Timeout at subscription." << endl;
121  }
122  }
123  catch(const JControlHostException& error) {
124  cerr << error << endl;
125  }
126 
127  } else {
128  cerr << "Message server or logger not properly initialised." << endl;
129  }
130 
131  return false;
132  }
133 
134 
135  /**
136  * Exit the state machine.
137  *
138  * This method waits for the clients to terminate using the died message generated by ControlHost.
139  * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
140  */
141  void actionExit()
142  {
143  using namespace std;
144  using namespace JLANG;
145 
146  for (int i = 0; i != timeout_s; ++i) {
147 
148  while (update(true)) {}
149 
150  if (clientList.count() == 0)
151  break;
152  else
153  sleep(1);
154  }
155 
156  if (clientList.count() != 0) {
157  JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
158  }
159 
160  clientList.stop();
161  }
162 
163 
164  /**
165  * Action when entering state.
166  * This method waits for all clients to produce the enter state message.
167  * In case of a timeout, no specific action is taken but an error message is logged.
168  *
169  * \param state entered state
170  * \param event event that triggered transition
171  */
172  virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
173  {
174  for (int i = 0; i != timeout_s && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
175  if (!update(true)) {
176  sleep(1);
177  ++i;
178  }
179  }
180 
181  if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
182  JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
183  }
184  }
185 
186 
187  /**
188  * Update client list with incoming ControlHost message.
189  * This method receives and processes a message.
190  * The client list is updated accordingly.
191  * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
192  * The return value is then false.
193  * If the no-wait option is set to false, it waits until the next message is received.
194  *
195  * \param no_wait wait option
196  * \return true if message received; else false
197  */
198  bool update(const bool no_wait)
199  {
200  using namespace std;
201  using namespace JNET;
203 
204  try {
205 
206  string tag;
207  long long int length = 0;
208 
209  if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
210  if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
211 
212  char* data= new char[length];
213 
214  server->GetFullData(data, length);
215 
216  const string buffer(data, length);
217 
218  delete [] data;
219 
220  JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
221 
222  if (tag == RC_LOG) {
223 
224  rc_log = buffer;
225 
226  } else if (buffer.find(getFullName()) != string::npos) {
227 
228  if (tag == DISPTAG_Born)
229  is_alive = true;
230  else if (tag == DISPTAG_Died)
231  is_alive = false;
232 
233  } else {
234 
235  JClientList::iterator i = clientList.find(buffer);
236 
237  if (i != clientList.end()) {
238 
239  i->update(tag, buffer);
240 
241  } else {
242 
243  JErrorStream(logger) << "Message fom illegal client " << buffer;
244 
245  try {
246 
247  if (tag == DISPTAG_Born ||
248  tag == DISPTAG_Died ||
249  tag == RC_REPLY) {
250 
251  string key, hostname, name;
252 
253  istringstream is(buffer);
254 
255  const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
256 
257  is.imbue(loc);
258 
259  if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
260 
261  JClient client(name, hostname);
262 
263  client.update(tag, buffer);
264  client.setMode(JClient::ILLEGAL);
265 
266  clientList.insert(client);
267 
268  JWarningStream(logger) << "Added illegal client " << client.getFullName();
269 
270  } else {
271  throw JIOException("JClient: Error reading " + buffer);
272  }
273  }
274  }
275  catch(const JException& error) {
276  JErrorStream(logger) << error;
277  }
278  }
279  }
280 
281  return true;
282  }
283  catch(const JControlHostException& error) {
284  JErrorStream(logger) << error;
285  }
286 
287  return false;
288  }
289 
290  virtual void actionStart(int, const char*) override
291  {
292  rc_log = "";
293  }
294 
295  virtual void actionStop(int, const char*) override
296  {
297  if (rc_log != "")
299  else
300  JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
301  }
302 
303  /**
304  * Run driver with user input.
305  */
306  void run()
307  {
308  run(std::cin);
309  }
310 
311 
312  /**
313  * Run driver.
314  *
315  * Example input format:
316  * <pre>
317  * # comment line.
318  *
319  * process <process name> <host name> [<start command>];
320  *
321  * # The following tokens in <start command> will be substituted:
322  * # $HOST$ by <host name>;
323  * # $NAME$ by <process name>;
324  * # $SERVER$ by JClient::SERVER;
325  * # $LOGGER$ by JClient::LOGGER;
326  * # $ARGS$ by part following '/' in <process name>;
327  *
328  * # enter state machine.
329  *
330  * enter
331  *
332  * # trigger event
333  * # data can be provided online and mixed with data from a separate file (optional).
334  * # multiple tags should be separated by a new line.
335  *
336  * event <event name> {
337  * [<tag 1> [data]]
338  * [<tag 2> [data][%<file name>%][data]]
339  * }
340  *
341  * # optionally quit before end of input
342  * [quit]
343  *
344  * # optionally kill processes that did not properly terminate.
345  * [exit]
346  * </pre>
347  *
348  * \param in input stream
349  */
350  void run(std::istream& in)
351  {
352  using namespace std;
353 
354  for (string key; in >> key; ) {
355 
356  if (key[0] == '#') {
357 
358  in.ignore(numeric_limits<streamsize>::max(), '\n');
359 
360  } else if (key == "enter") {
361 
362  enter();
363 
364  if (!active()) {
365  cerr << "State machine not entered; abort." << endl;
366  return;
367  }
368 
369  } else if (key == "exit") {
370 
371  timeout_s = 0;
372  exit();
373 
374  } else if (key == "quit") {
375 
376  break;
377 
378  } else if (key == "sleep") {
379 
380  int sec;
381 
382  if (in >> sec) {
383  sleep(sec);
384  }
385 
386  } else if (key == "process") {
387 
388  string buffer;
389 
390  getline(in, buffer, ';');
391 
392  istringstream is(buffer);
393 
394  JClient client;
395 
396  if (is >> client) {
397 
398  client.setMode(JClient::ACTIVE);
399 
400  if (!clientList.insert(client).second) {
401  JWarningStream(logger) << "Process already exists " << client;
402  }
403 
404  } else {
405  JErrorStream(logger) << "Error reading key word process.";
406  }
407 
408  } else if (key == "event" || key == "event*") {
409 
410  JEvent_t event;
411  char c;
412  string buffer;
413  const char eol = '\n';
414 
415  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
416 
417  if (clientList.count() != 0) {
418 
419  JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
420 
421  if (pev != NULL) {
422 
423  if (pev->active() || key == "event*") {
424 
425  istringstream is(buffer);
426 
427  for (string tag; is >> tag; ) {
428 
429  ostringstream os;
430 
431  os << event << getTokenDelimeter();
432 
433  copy(is, os, eol);
434 
435  server->PutFullString(tag, os.str());
436  }
437 
438  if (key != "event*") {
439  (*pev)(0, NULL); // trigger driver
440  }
441 
442  } else {
443  JErrorStream(logger) << "Inactive event " << event;
444  }
445 
446  } else {
447  JErrorStream(logger) << "Unknown event " << event;
448  }
449 
450  } else {
451  JErrorStream(logger) << "No active client to trigger event.";
452  }
453 
454  } else {
455  JErrorStream(logger) << "Error reading key word event.";
456  }
457 
458  } else if (key == "message") {
459 
460  string tag;
461  string buffer;
462 
463  if (in >> tag && getline(in, buffer, ';'))
464  server->PutFullString(tag, buffer);
465  else
466  JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
467 
468  } else if (key == "print") {
469 
470  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
471  JNoticeStream(logger) << i->getFullName() << ' '
472  << i->getStartCommand() << ' '
473  << i->getAlive() << ' '
474  << i->getStatename();
475  }
476 
477  } else if (key == "filter") {
478 
479  string client;
480  string buffer;
481 
482  getline(in, buffer, ';');
483 
484  for (istringstream is(buffer); is >> client; ) {
485  filter(client);
486  }
487 
488  } else if (key == "sync") {
489 
490  synchronise();
491 
492  } else {
493 
494  JErrorStream(logger) << "Unknown key: " << key;
495 
496  in.ignore(numeric_limits<streamsize>::max(), '\n');
497  }
498  }
499  }
500 
501 
502  /**
503  * Update client list with incoming ControlHost messages until the client list
504  * is synchronised with the current state or until the timeout.
505  */
506  void update()
507  {
508  using namespace std;
509 
510  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
511 
512  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
513 
514  if (state->active()) {
515  for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
516  if (!update(true)) {
517  sleep(1);
518  ++i;
519  }
520  }
521  }
522  }
523  }
524 
525 
526  /**
527  * Synchronise clients.
528  */
529  void synchronise()
530  {
531  using namespace std;
532 
533  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
534 
535  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
536 
537  if (state->active()) {
538 
539  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
540 
541  JDebugStream(logger) << "Synchronising " << state->name();
542 
543  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
544 
545  if (i->getMode() == JClient::ACTIVE) {
546 
547  if (!i->getAlive()) {
548 
549  try {
550 
551  string buffer;
552 
553  if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
554 
555  i->setAlive(true);
556 
557  if (buffer.find(i->getHostname()) == string::npos) {
558  JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
559  }
560  }
561  }
562  catch(const JControlHostException& error) {
563  JErrorStream(logger) << error;
564  }
565  }
566 
567  if (i->getAlive() && i->getStatename() != state->name()) {
568  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
569  }
570  }
571  }
572 
573 
574  for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
575  if (!update(true)) {
576  sleep(1);
577  ++i;
578  }
579  }
580 
581  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
582  JWarningStream(logger) << "Timeout at synchronisation.";
583  }
584  }
585  }
586  }
587  }
588 
589 
590  /**
591  * Filter client list by putting failing clients to sleep.
592  * In this, only clients with names that contain the given character sequence are considered.
593  *
594  * \param target target name of client(s)
595  */
596  void filter(const std::string& target = "")
597  {
598  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
599 
600  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
601 
602  if (state->active()) {
603 
604  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
605 
606  if (target == "" || i->getName().find(target) != std::string::npos) {
607 
608  if (!i->getAlive() || i->getStatename() != state->name()) {
609 
610  JNoticeStream(logger) << "Put to sleep " << i->getFullName();
611 
612  i->setMode(JClient::SLEEP);
613  }
614  }
615  }
616  }
617  }
618  }
619 
620  int timeout_s; //!< timeout of state transitions [s]
621 
622  protected:
623 
625  bool is_alive;
626  std::string rc_log;
627 
628  /**
629  * Copy data from input to output stream.
630  * Tagged file names are recursively expanded.
631  *
632  * \param in input stream
633  * \param out output stream
634  * \param eol end of line
635  */
636  static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
637  {
638  using namespace std;
639 
640  string buffer;
641 
642  if (getline(in, buffer, eol)) {
643 
644  for (string::size_type pos = 0; pos < buffer.length(); ) {
645 
646  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
647  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
648 
649  if (lpos != string::npos &&
650  rpos != string::npos) {
651 
652  out << buffer.substr(pos, lpos);
653 
654  lpos += FILENAME_PREFIX.length();
655  pos += lpos;
656 
657  ifstream file(buffer.substr(pos, rpos - lpos).c_str());
658 
659  copy(file, out, '\0');
660 
661  rpos += FILENAME_POSTFIX.length();
662  pos += rpos - lpos;
663 
664  } else {
665 
666  out << buffer.substr(pos);
667 
668  pos += buffer.substr(pos).length();
669  }
670  }
671  }
672  }
673  };
674 }
675 
676 #endif
static const std::string FILENAME_PREFIX
Definition: JDAQTags.hh:38
int timeout_s
timeout of state transitions [s]
Definition: JDAQDriver.hh:620
General exception.
Definition: JException.hh:23
const std::string & getName() const
Get event name.
Exceptions.
static const JTag DISPTAG_Died("Died")
JDAQEvent_t * findEvent(const JTag &tag, const std::string &event_name)
Find event in event table.
Definition: JDAQClient.hh:507
Target.
Definition: JHead.hh:296
static const std::string FILENAME_POSTFIX
Definition: JDAQTags.hh:39
ControlHost client manager.
Definition: JLigier.cc:243
JDAQStateMachine::state_Main Main
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
List of ControlHost client managers.
Definition: JLigier.cc:477
virtual void actionStart(int, const char *) override
Definition: JDAQDriver.hh:290
Interface for logging messages.
Definition: JLogger.hh:22
static void copy(std::istream &in, std::ostream &out, const char eol= '\n')
Copy data from input to output stream.
Definition: JDAQDriver.hh:636
std::string name
Definition: JDAQCHSM.chsm:154
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:800
static const std::string TOKEN_DELIMETER
Definition: JDAQTags.hh:36
Simple driver for run control clients.
Definition: JDAQDriver.hh:35
is
Definition: JDAQCHSM.chsm:167
void synchronise()
Synchronise clients.
Definition: JDAQDriver.hh:529
void run()
Run driver with user input.
Definition: JDAQDriver.hh:306
static const std::string RUN_CONTROL_CLIENT
Definition: JDAQTags.hh:22
void actionExit()
Exit the state machine.
Definition: JDAQDriver.hh:141
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:50
static const JNET::JTag RC_REPLY
Definition: JDAQTags.hh:45
then echo Test string reversed by client(hit< return > to continue)." $DIR/JProcess -c "$DIR/JEcho-r" -C fi if (( 1 ))
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
Definition: JDAQDriver.hh:198
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:478
Auxiliary class for handling event name and optional number.
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:468
virtual bool exit()
Exit the state machine.
Definition: JDAQClient.hh:456
char getTokenDelimeter()
Get the token delimeter for command messages.
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
void run(std::istream &in)
Run driver.
Definition: JDAQDriver.hh:350
JClientList clientList
Definition: JDAQDriver.hh:624
virtual void actionStop(int, const char *) override
Definition: JDAQDriver.hh:295
virtual void enterState(const CHSM::state &state, const CHSM::event &event) override
Action when entering state.
Definition: JDAQDriver.hh:172
JDAQStateMachine::ev_check_event ev_check
static const JTag DISPTAG_Born("Born")
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition: JDAQDriver.hh:48
bool is_valid() const
Check validity of logger object.
Auxiliary class to specify white space character(s) in currect locale.
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:119
Control unit client base class.
Definition: JDAQClient.hh:272
&set_variable SERVER
Definition: JStopDAQ.sh:29
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
Definition: JDAQDriver.hh:596
JMessageLogger logger
message logger
Definition: JDAQClient.hh:801
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
$WORKDIR ev_configure_domsimulator txt echo process $DOM_SIMULATOR $i $SOURCE_HOST[$index] csh c(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&($DOM_SIMULATOR\-u\$NAME\$\-H\$SERVER\$\-M\$LOGGER\$\-d $DEBUG</dev/null > &/dev/null &))'
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
JDAQStateMachine::state_Main::state_RunControl RunControl
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
Definition: JDAQDriver.hh:506
char * loc(char *orig)
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
virtual bool enter() override
Enter the state machine.
Definition: JDAQDriver.hh:67
Exception for I/O.
Definition: JException.hh:324