Jpp  18.5.2
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDAQDriver.hh
Go to the documentation of this file.
1 #ifndef __JRUNCONTROL__JDAQDRIVER__
2 #define __JRUNCONTROL__JDAQDRIVER__
3 
4 #include <string>
5 #include <iostream>
6 #include <sstream>
7 #include <limits>
8 #include <unistd.h>
9 
10 #include "JLang/JException.hh"
11 #include "JLang/JRedirectStream.hh"
13 
15 #include "JRuncontrol/JClient.hh"
17 
18 
19 /**
20  * \author mdejong
21  */
22 
23 namespace KM3NETDAQ {
24 
25  using JLANG::JException;
26  using JLANG::JIOException;
28 
29 
30  /**
31  * Simple driver for run control clients.
32  * This class can be used to start a set of run control clients,
33  * trigger events and eventually stop the clients.
34  */
35  class JDAQDriver :
36  public JDAQClient
37  {
38  public:
39 
40  using JDAQClient::filter;
41 
42  static const int SLEEP_TIME_US = 500000;
43 
44  /**
45  * Constructor.
46  *
47  * \param name name of driver
48  * \param server name of command message server
49  * \param logger logger
50  * \param level debug level
51  * \param timeout_s timeout_s [s]
52  */
54  const std::string& server,
55  JLogger* logger,
56  const int level,
57  const int timeout_s) :
58  JDAQClient(name, server, logger, level),
59  timeout_us(timeout_s * 1000000),
60  is_alive (false)
61  {}
62 
63 
64  /**
65  * Enter the state machine.
66  * The driver will subscribe to the ControlHost tags corresponding to born, died and
67  * reply messages of the clients instead of the standard tags for run control commands.
68  * The clients are started after the driver is ready to receive ControlHost messages.
69  * In case of an error, a message is printed on the terminal and the state machine
70  * is not entered.
71  */
72  virtual bool enter() override
73  {
74  using namespace std;
75  using namespace JPP;
76 
77  if (server.is_valid() && logger.is_valid()) {
78 
79  try {
80 
81  server->Subscribe(JSubscriptionAll(RC_REPLY) +
85  server->SendMeAlways();
86  server->MyId(getFullName());
87 
88  // check alive of this driver
89 
90  for (int i = 0; i != timeout_us && !is_alive; ) {
91  if (!update(true)) {
92  usleep(SLEEP_TIME_US);
93  i += SLEEP_TIME_US;
94  }
95  }
96 
97  if (is_alive) {
98 
99  {
100  //clientList.start();
101 
102  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
103  if (i->isActive()) {
104 
105  JNoticeStream(logger) << "Start process " << *i;
106 
107  i->start();
108  }
109  }
110 
111  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
112 
113  while (update(true)) {
114  }
115 
116  if (clientList.count() >= clientList.count(JClient::ACTIVE))
117  break;
118  else
119  usleep(SLEEP_TIME_US);
120  }
121  }
122 
123  if (clientList.count() != clientList.count(JClient::ACTIVE)) {
124  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
125  if (i->isActive() && i->getBorn() <= i->getDied()) {
126  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
127  }
128  }
129  }
130 
131  return CHSM::machine::enter();
132 
133  } else {
134  cerr << "Timeout at subscription." << endl;
135  }
136  }
137  catch(const JControlHostException& error) {
138  cerr << error << endl;
139  }
140 
141  } else {
142  cerr << "Message server or logger not properly initialised." << endl;
143  }
144 
145  return false;
146  }
147 
148 
149  /**
150  * Exit the state machine.
151  *
152  * This method waits for the clients to terminate using the died message generated by ControlHost.
153  * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
154  */
155  virtual void actionExit() override
156  {
157  using namespace std;
158  using namespace JLANG;
159 
160  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
161 
162  while (update(true)) {}
163 
164  if (clientList.count() == 0)
165  break;
166  else
167  usleep(SLEEP_TIME_US);
168  }
169 
170  if (clientList.count() != 0) {
171  JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
172  }
173 
174  clientList.stop();
175  }
176 
177 
178  /**
179  * Action when entering state.
180  * This method waits for all clients to produce the enter state message.
181  * In case of a timeout, no specific action is taken but an error message is logged.
182  *
183  * \param state entered state
184  * \param event event that triggered transition
185  */
186  virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
187  {
188  for (int i = 0; i != timeout_us && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
189  if (!update(true)) {
190  usleep(SLEEP_TIME_US);
191  i += SLEEP_TIME_US;
192  }
193  }
194 
195  if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
196  JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
197  }
198  }
199 
200 
201  /**
202  * Update client list with incoming ControlHost message.
203  * This method receives and processes a message.
204  * The client list is updated accordingly.
205  * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
206  * The return value is then false.
207  * If the no-wait option is set to false, it waits until the next message is received.
208  *
209  * \param no_wait wait option
210  * \return true if message received; else false
211  */
212  bool update(const bool no_wait)
213  {
214  using namespace std;
215  using namespace JNET;
217 
218  try {
219 
220  string tag;
221  long long int length = 0;
222 
223  if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
224  if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
225 
226  char* data= new char[length];
227 
228  server->GetFullData(data, length);
229 
230  const string buffer(data, length);
231 
232  delete [] data;
233 
234  JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
235 
236  if (tag == RC_LOG) {
237 
238  rc_log = buffer;
239 
240  } else if (buffer.find(getFullName()) != string::npos) {
241 
242  if (tag == DISPTAG_Born)
243  is_alive = true;
244  else if (tag == DISPTAG_Died)
245  is_alive = false;
246 
247  } else {
248 
249  JClientList::iterator i = clientList.find(buffer);
250 
251  if (i != clientList.end()) {
252 
253  i->update(tag, buffer);
254 
255  } else {
256 
257  JErrorStream(logger) << "Message fom illegal client " << buffer;
258 
259  try {
260 
261  if (tag == DISPTAG_Born ||
262  tag == DISPTAG_Died ||
263  tag == RC_REPLY) {
264 
265  string key, hostname, name;
266 
267  istringstream is(buffer);
268 
269  const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
270 
271  is.imbue(loc);
272 
273  if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
274 
275  JClient client(name, hostname);
276 
277  client.update(tag, buffer);
278  client.setMode(JClient::ILLEGAL);
279 
280  clientList.insert(client);
281 
282  JWarningStream(logger) << "Added illegal client " << client.getFullName();
283 
284  } else {
285  THROW(JIOException, "JClient: Error reading " << buffer);
286  }
287  }
288  }
289  catch(const JException& error) {
290  JErrorStream(logger) << error;
291  }
292  }
293  }
294 
295  return true;
296  }
297  catch(const JControlHostException& error) {
298  JErrorStream(logger) << error;
299  }
300 
301  return false;
302  }
303 
304  virtual void actionStart(int, const char*) override
305  {
306  rc_log = "";
307  }
308 
309  virtual void actionStop(int, const char*) override
310  {
311  if (rc_log != "")
313  else
314  JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
315  }
316 
317  /**
318  * Run driver with user input.
319  */
320  void run()
321  {
322  run(std::cin);
323  }
324 
325 
326  /**
327  * Run driver.
328  *
329  * Example input format:
330  * <pre>
331  * # comment line.
332  *
333  * process <process name> <host name> [<start command>];
334  *
335  * # The following tokens in <start command> will be substituted:
336  * # $HOST$ by <host name>;
337  * # $NAME$ by <process name>;
338  * # $SERVER$ by JClient::SERVER;
339  * # $LOGGER$ by JClient::LOGGER;
340  * # $ARGS$ by part following '/' in <process name>;
341  *
342  * # enter state machine.
343  *
344  * enter
345  *
346  * # trigger event
347  * # data can be provided online and mixed with data from a separate file (optional).
348  * # multiple tags should be separated by a new line.
349  *
350  * event <event name> {
351  * [<tag 1> [data]]
352  * [<tag 2> [data][%<file name>%][data]]
353  * }
354  *
355  * # optionally quit before end of input
356  * [quit]
357  *
358  * # optionally kill processes that did not properly terminate.
359  * [exit]
360  * </pre>
361  *
362  * \param in input stream
363  */
364  void run(std::istream& in)
365  {
366  using namespace std;
367 
368  for (string key; in >> key; ) {
369 
370  if (key[0] == '#') {
371 
372  in.ignore(numeric_limits<streamsize>::max(), '\n');
373 
374  } else if (key == "enter") {
375 
376  enter();
377 
378  if (!active()) {
379  cerr << "State machine not entered; abort." << endl;
380  return;
381  }
382 
383  } else if (key == "exit") {
384 
385  timeout_us = 0;
386  exit();
387 
388  } else if (key == "quit") {
389 
390  break;
391 
392  } else if (key == "sleep") {
393 
394  int sec;
395 
396  if (in >> sec) {
397  sleep(sec);
398  }
399 
400  } else if (key == "process") {
401 
402  string buffer;
403 
404  getline(in, buffer, ';');
405 
406  istringstream is(buffer);
407 
408  JClient client;
409 
410  if (is >> client) {
411 
412  client.setMode(JClient::ACTIVE);
413 
414  if (!clientList.insert(client).second) {
415  JWarningStream(logger) << "Process already exists " << client;
416  }
417 
418  } else {
419  JErrorStream(logger) << "Error reading key word process.";
420  }
421 
422  } else if (key == "event" || key == "event*") {
423 
424  JEvent_t event;
425  char c;
426  string buffer;
427  const char eol = '\n';
428 
429  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
430 
431  if (clientList.count() != 0) {
432 
433  JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
434 
435  if (pev != NULL) {
436 
437  if (pev->active() || key == "event*") {
438 
439  istringstream is(buffer);
440 
441  for (string tag; is >> tag; ) {
442 
443  ostringstream os;
444 
445  os << event << getTokenDelimeter();
446 
447  copy(is, os, eol);
448 
449  JNoticeStream(logger) << key << ' ' << tag << ' ' << event;
450 
451  server->PutFullString(tag, os.str());
452  }
453 
454  if (key != "event*") {
455  (*pev)(0, NULL); // trigger driver
456  }
457 
458  } else {
459  JErrorStream(logger) << "Inactive event " << event;
460  }
461 
462  } else {
463  JErrorStream(logger) << "Unknown event " << event;
464  }
465 
466  } else {
467  JErrorStream(logger) << "No active client to trigger event.";
468  }
469 
470  } else {
471  JErrorStream(logger) << "Error reading key word event.";
472  }
473 
474  } else if (key == "message") {
475 
476  string tag;
477  string buffer;
478 
479  if (in >> tag && getline(in, buffer, ';'))
480  server->PutFullString(tag, buffer);
481  else
482  JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
483 
484  } else if (key == "print") {
485 
486  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
487  JNoticeStream(logger) << i->getFullName() << ' '
488  << i->getStartCommand() << ' '
489  << i->getAlive() << ' '
490  << i->getStatename();
491  }
492 
493  } else if (key == "filter") {
494 
495  string client;
496  string buffer;
497 
498  getline(in, buffer, ';');
499 
500  for (istringstream is(buffer); is >> client; ) {
501  filter(client);
502  }
503 
504  } else if (key == "sync") {
505 
506  synchronise();
507 
508  } else {
509 
510  JErrorStream(logger) << "Unknown key: " << key;
511 
512  in.ignore(numeric_limits<streamsize>::max(), '\n');
513  }
514  }
515  }
516 
517 
518  /**
519  * Update client list with incoming ControlHost messages until the client list
520  * is synchronised with the current state or until the timeout.
521  */
522  void update()
523  {
524  using namespace std;
525 
526  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
527 
528  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
529 
530  if (state->active()) {
531  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
532  if (!update(true)) {
533  usleep(SLEEP_TIME_US);
534  i += SLEEP_TIME_US;
535  }
536  }
537  }
538  }
539  }
540 
541 
542  /**
543  * Synchronise clients.
544  */
545  void synchronise()
546  {
547  using namespace std;
548 
549  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
550 
551  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
552 
553  if (state->active()) {
554 
555  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
556 
557  JDebugStream(logger) << "Synchronising " << state->name();
558 
559  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
560 
561  if (i->getMode() == JClient::ACTIVE) {
562 
563  if (!i->getAlive()) {
564 
565  try {
566 
567  string buffer;
568 
569  if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
570 
571  i->setAlive(true);
572 
573  if (buffer.find(i->getHostname()) == string::npos) {
574  JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
575  }
576  }
577  }
578  catch(const JControlHostException& error) {
579  JErrorStream(logger) << error;
580  }
581  }
582 
583  if (i->getAlive() && i->getStatename() != state->name()) {
584  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
585  }
586  }
587  }
588 
589 
590  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
591  if (!update(true)) {
592  usleep(SLEEP_TIME_US);
593  i += SLEEP_TIME_US;
594  }
595  }
596 
597  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
598  JWarningStream(logger) << "Timeout at synchronisation.";
599  }
600  }
601  }
602  }
603  }
604 
605 
606  /**
607  * Filter client list by putting failing clients to sleep.
608  * In this, only clients with names that contain the given character sequence are considered.
609  *
610  * \param target target name of client(s)
611  */
612  void filter(const std::string& target = "")
613  {
614  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
615 
616  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
617 
618  if (state->active()) {
619 
620  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
621 
622  if (target == "" || i->getName().find(target) != std::string::npos) {
623 
624  if (!i->getAlive() || i->getStatename() != state->name()) {
625 
626  JNoticeStream(logger) << "Put to sleep " << i->getFullName();
627 
628  i->setMode(JClient::SLEEP);
629  }
630  }
631  }
632  }
633  }
634  }
635 
636  int timeout_us; //!< timeout of state transitions [us]
637 
638  protected:
639 
641  bool is_alive;
643 
644  /**
645  * Copy data from input to output stream.
646  * Tagged file names are recursively expanded.
647  *
648  * \param in input stream
649  * \param out output stream
650  * \param eol end of line
651  */
652  static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
653  {
654  using namespace std;
655 
656  string buffer;
657 
658  if (getline(in, buffer, eol)) {
659 
660  for (string::size_type pos = 0; pos < buffer.length(); ) {
661 
662  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
663  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
664 
665  if (lpos != string::npos &&
666  rpos != string::npos) {
667 
668  out << buffer.substr(pos, lpos);
669 
670  lpos += FILENAME_PREFIX.length();
671  pos += lpos;
672 
673  ifstream file(buffer.substr(pos, rpos - lpos).c_str());
674 
675  copy(file, out, '\0');
676 
677  rpos += FILENAME_POSTFIX.length();
678  pos += rpos - lpos;
679 
680  } else {
681 
682  out << buffer.substr(pos);
683 
684  pos += buffer.substr(pos).length();
685  }
686  }
687  }
688  }
689  };
690 }
691 
692 #endif
static const std::string FILENAME_PREFIX
Definition: JDAQTags.hh:54
General exception.
Definition: JException.hh:24
const std::string & getName() const
Get event name.
Definition: JEvent_t.hh:65
Exceptions.
static const JTag DISPTAG_Died("Died")
JDAQEvent_t * findEvent(const JTag &tag, const std::string &event_name)
Find event in event table.
Definition: JDAQClient.hh:536
Target.
Definition: JHead.hh:298
static const std::string FILENAME_POSTFIX
Definition: JDAQTags.hh:55
then usage $script[< detector identifier >< run range >]< QA/QCfile > nExample script to produce data quality plots nWhen a detector identifier and run range are data are downloaded from the database nand subsequently stored in the given QA QC file
Definition: JDataQuality.sh:19
ControlHost client manager.
Definition: JLigier.cc:243
JDAQStateMachine::state_Main Main
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
List of ControlHost client managers.
Definition: JLigier.cc:477
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:712
virtual void actionStart(int, const char *) override
Definition: JDAQDriver.hh:304
Interface for logging messages.
Definition: JLogger.hh:22
static void copy(std::istream &in, std::ostream &out, const char eol= '\n')
Copy data from input to output stream.
Definition: JDAQDriver.hh:652
virtual bool exit() override
Exit the state machine.
Definition: JDAQClient.hh:485
std::string name
Definition: JDAQCHSM.chsm:154
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:834
static const std::string TOKEN_DELIMETER
Definition: JDAQTags.hh:52
Simple driver for run control clients.
Definition: JDAQDriver.hh:35
is
Definition: JDAQCHSM.chsm:167
virtual bool filter(const JTag &tag, int length, const char *buffer)
Filter message.
Definition: JDAQClient.hh:662
void synchronise()
Synchronise clients.
Definition: JDAQDriver.hh:545
void run()
Run driver with user input.
Definition: JDAQDriver.hh:320
static const int SLEEP_TIME_US
Definition: JDAQDriver.hh:42
static const std::string RUN_CONTROL_CLIENT
Definition: JDAQTags.hh:38
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:66
virtual void actionExit() override
Exit the state machine.
Definition: JDAQDriver.hh:155
static const JNET::JTag RC_REPLY
Definition: JDAQTags.hh:61
then echo Test string reversed by client(hit< return > to continue)." $DIR/JProcess -c "$DIR/JEcho-r" -C fi if (( 1 ))
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
Definition: JDAQDriver.hh:212
then awk string
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:478
Auxiliary class for handling event name and optional static information.
Definition: JEvent_t.hh:23
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:484
char getTokenDelimeter()
Get the token delimeter for command messages.
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
void run(std::istream &in)
Run driver.
Definition: JDAQDriver.hh:364
JClientList clientList
Definition: JDAQDriver.hh:640
virtual void actionStop(int, const char *) override
Definition: JDAQDriver.hh:309
virtual void enterState(const CHSM::state &state, const CHSM::event &event) override
Action when entering state.
Definition: JDAQDriver.hh:186
JDAQStateMachine::ev_check_event ev_check
static const JTag DISPTAG_Born("Born")
$WORKDIR ev_configure_dqsimulator txt echo process $DQ_SIMULATOR $i $SOURCE_HOST[$index] csh c(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&($DQ_SIMULATOR\-u\$NAME\$\-H\$SERVER\$\-M\$LOGGER\$\-d $DEBUG</dev/null > &/dev/null &))'
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition: JDAQDriver.hh:53
int timeout_us
timeout of state transitions [us]
Definition: JDAQDriver.hh:636
bool is_valid() const
Check validity of logger object.
Auxiliary class to specify white space character(s) in currect locale.
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:120
Control unit client base class.
Definition: JDAQClient.hh:298
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
&set_variable SERVER
Definition: JStopDAQ.sh:29
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
Definition: JDAQDriver.hh:612
JMessageLogger logger
message logger
Definition: JDAQClient.hh:835
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:60
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
JDAQStateMachine::state_Main::state_RunControl RunControl
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
Definition: JDAQDriver.hh:522
char * loc(char *orig)
virtual bool enter() override
Enter the state machine.
Definition: JDAQDriver.hh:72
Exception for I/O.
Definition: JException.hh:340