Jpp  18.0.1-rc.2
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDAQDriver.hh
Go to the documentation of this file.
1 #ifndef __JRUNCONTROL__JDAQDRIVER__
2 #define __JRUNCONTROL__JDAQDRIVER__
3 
4 #include <string>
5 #include <iostream>
6 #include <sstream>
7 #include <limits>
8 #include <unistd.h>
9 
10 #include "JLang/JException.hh"
11 #include "JLang/JRedirectStream.hh"
13 
15 #include "JRuncontrol/JClient.hh"
17 
18 
19 /**
20  * \author mdejong
21  */
22 
23 namespace KM3NETDAQ {
24 
25  using JLANG::JException;
26  using JLANG::JIOException;
28 
29 
30  /**
31  * Simple driver for run control clients.
32  * This class can be used to start a set of run control clients,
33  * trigger events and eventually stop the clients.
34  */
35  class JDAQDriver :
36  public JDAQClient
37  {
38  public:
39 
40  static const int SLEEP_TIME_US = 500000;
41 
42  /**
43  * Constructor.
44  *
45  * \param name name of driver
46  * \param server name of command message server
47  * \param logger logger
48  * \param level debug level
49  * \param timeout_s timeout_s [s]
50  */
52  const std::string& server,
53  JLogger* logger,
54  const int level,
55  const int timeout_s) :
56  JDAQClient(name, server, logger, level),
57  timeout_us(timeout_s * 1000000),
58  is_alive (false)
59  {}
60 
61 
62  /**
63  * Enter the state machine.
64  * The driver will subscribe to the ControlHost tags corresponding to born, died and
65  * reply messages of the clients instead of the standard tags for run control commands.
66  * The clients are started after the driver is ready to receive ControlHost messages.
67  * In case of an error, a message is printed on the terminal and the state machine
68  * is not entered.
69  */
70  virtual bool enter() override
71  {
72  using namespace std;
73  using namespace JPP;
74 
75  if (server.is_valid() && logger.is_valid()) {
76 
77  try {
78 
79  server->Subscribe(JSubscriptionAll(RC_REPLY) +
83  server->SendMeAlways();
84  server->MyId(getFullName());
85 
86  // check alive of this driver
87 
88  for (int i = 0; i != timeout_us && !is_alive; ) {
89  if (!update(true)) {
90  usleep(SLEEP_TIME_US);
91  i += SLEEP_TIME_US;
92  }
93  }
94 
95  if (is_alive) {
96 
97  {
98  //clientList.start();
99 
100  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
101  if (i->isActive()) {
102 
103  JNoticeStream(logger) << "Start process " << *i;
104 
105  i->start();
106  }
107  }
108 
109  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
110 
111  while (update(true)) {
112  }
113 
114  if (clientList.count() >= clientList.count(JClient::ACTIVE))
115  break;
116  else
117  usleep(SLEEP_TIME_US);
118  }
119  }
120 
121  if (clientList.count() != clientList.count(JClient::ACTIVE)) {
122  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
123  if (i->isActive() && i->getBorn() <= i->getDied()) {
124  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
125  }
126  }
127  }
128 
129  return CHSM::machine::enter();
130 
131  } else {
132  cerr << "Timeout at subscription." << endl;
133  }
134  }
135  catch(const JControlHostException& error) {
136  cerr << error << endl;
137  }
138 
139  } else {
140  cerr << "Message server or logger not properly initialised." << endl;
141  }
142 
143  return false;
144  }
145 
146 
147  /**
148  * Exit the state machine.
149  *
150  * This method waits for the clients to terminate using the died message generated by ControlHost.
151  * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
152  */
153  void actionExit()
154  {
155  using namespace std;
156  using namespace JLANG;
157 
158  for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) {
159 
160  while (update(true)) {}
161 
162  if (clientList.count() == 0)
163  break;
164  else
165  usleep(SLEEP_TIME_US);
166  }
167 
168  if (clientList.count() != 0) {
169  JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
170  }
171 
172  clientList.stop();
173  }
174 
175 
176  /**
177  * Action when entering state.
178  * This method waits for all clients to produce the enter state message.
179  * In case of a timeout, no specific action is taken but an error message is logged.
180  *
181  * \param state entered state
182  * \param event event that triggered transition
183  */
184  virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
185  {
186  for (int i = 0; i != timeout_us && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
187  if (!update(true)) {
188  usleep(SLEEP_TIME_US);
189  i += SLEEP_TIME_US;
190  }
191  }
192 
193  if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
194  JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
195  }
196  }
197 
198 
199  /**
200  * Update client list with incoming ControlHost message.
201  * This method receives and processes a message.
202  * The client list is updated accordingly.
203  * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
204  * The return value is then false.
205  * If the no-wait option is set to false, it waits until the next message is received.
206  *
207  * \param no_wait wait option
208  * \return true if message received; else false
209  */
210  bool update(const bool no_wait)
211  {
212  using namespace std;
213  using namespace JNET;
215 
216  try {
217 
218  string tag;
219  long long int length = 0;
220 
221  if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
222  if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
223 
224  char* data= new char[length];
225 
226  server->GetFullData(data, length);
227 
228  const string buffer(data, length);
229 
230  delete [] data;
231 
232  JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
233 
234  if (tag == RC_LOG) {
235 
236  rc_log = buffer;
237 
238  } else if (buffer.find(getFullName()) != string::npos) {
239 
240  if (tag == DISPTAG_Born)
241  is_alive = true;
242  else if (tag == DISPTAG_Died)
243  is_alive = false;
244 
245  } else {
246 
247  JClientList::iterator i = clientList.find(buffer);
248 
249  if (i != clientList.end()) {
250 
251  i->update(tag, buffer);
252 
253  } else {
254 
255  JErrorStream(logger) << "Message fom illegal client " << buffer;
256 
257  try {
258 
259  if (tag == DISPTAG_Born ||
260  tag == DISPTAG_Died ||
261  tag == RC_REPLY) {
262 
263  string key, hostname, name;
264 
265  istringstream is(buffer);
266 
267  const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
268 
269  is.imbue(loc);
270 
271  if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
272 
273  JClient client(name, hostname);
274 
275  client.update(tag, buffer);
276  client.setMode(JClient::ILLEGAL);
277 
278  clientList.insert(client);
279 
280  JWarningStream(logger) << "Added illegal client " << client.getFullName();
281 
282  } else {
283  THROW(JIOException, "JClient: Error reading " << buffer);
284  }
285  }
286  }
287  catch(const JException& error) {
288  JErrorStream(logger) << error;
289  }
290  }
291  }
292 
293  return true;
294  }
295  catch(const JControlHostException& error) {
296  JErrorStream(logger) << error;
297  }
298 
299  return false;
300  }
301 
302  virtual void actionStart(int, const char*) override
303  {
304  rc_log = "";
305  }
306 
307  virtual void actionStop(int, const char*) override
308  {
309  if (rc_log != "")
311  else
312  JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
313  }
314 
315  /**
316  * Run driver with user input.
317  */
318  void run()
319  {
320  run(std::cin);
321  }
322 
323 
324  /**
325  * Run driver.
326  *
327  * Example input format:
328  * <pre>
329  * # comment line.
330  *
331  * process <process name> <host name> [<start command>];
332  *
333  * # The following tokens in <start command> will be substituted:
334  * # $HOST$ by <host name>;
335  * # $NAME$ by <process name>;
336  * # $SERVER$ by JClient::SERVER;
337  * # $LOGGER$ by JClient::LOGGER;
338  * # $ARGS$ by part following '/' in <process name>;
339  *
340  * # enter state machine.
341  *
342  * enter
343  *
344  * # trigger event
345  * # data can be provided online and mixed with data from a separate file (optional).
346  * # multiple tags should be separated by a new line.
347  *
348  * event <event name> {
349  * [<tag 1> [data]]
350  * [<tag 2> [data][%<file name>%][data]]
351  * }
352  *
353  * # optionally quit before end of input
354  * [quit]
355  *
356  * # optionally kill processes that did not properly terminate.
357  * [exit]
358  * </pre>
359  *
360  * \param in input stream
361  */
362  void run(std::istream& in)
363  {
364  using namespace std;
365 
366  for (string key; in >> key; ) {
367 
368  if (key[0] == '#') {
369 
370  in.ignore(numeric_limits<streamsize>::max(), '\n');
371 
372  } else if (key == "enter") {
373 
374  enter();
375 
376  if (!active()) {
377  cerr << "State machine not entered; abort." << endl;
378  return;
379  }
380 
381  } else if (key == "exit") {
382 
383  timeout_us = 0;
384  exit();
385 
386  } else if (key == "quit") {
387 
388  break;
389 
390  } else if (key == "sleep") {
391 
392  int sec;
393 
394  if (in >> sec) {
395  sleep(sec);
396  }
397 
398  } else if (key == "process") {
399 
400  string buffer;
401 
402  getline(in, buffer, ';');
403 
404  istringstream is(buffer);
405 
406  JClient client;
407 
408  if (is >> client) {
409 
410  client.setMode(JClient::ACTIVE);
411 
412  if (!clientList.insert(client).second) {
413  JWarningStream(logger) << "Process already exists " << client;
414  }
415 
416  } else {
417  JErrorStream(logger) << "Error reading key word process.";
418  }
419 
420  } else if (key == "event" || key == "event*") {
421 
422  JEvent_t event;
423  char c;
424  string buffer;
425  const char eol = '\n';
426 
427  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
428 
429  if (clientList.count() != 0) {
430 
431  JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
432 
433  if (pev != NULL) {
434 
435  if (pev->active() || key == "event*") {
436 
437  istringstream is(buffer);
438 
439  for (string tag; is >> tag; ) {
440 
441  ostringstream os;
442 
443  os << event << getTokenDelimeter();
444 
445  copy(is, os, eol);
446 
447  JNoticeStream(logger) << key << ' ' << tag << ' ' << event;
448 
449  server->PutFullString(tag, os.str());
450  }
451 
452  if (key != "event*") {
453  (*pev)(0, NULL); // trigger driver
454  }
455 
456  } else {
457  JErrorStream(logger) << "Inactive event " << event;
458  }
459 
460  } else {
461  JErrorStream(logger) << "Unknown event " << event;
462  }
463 
464  } else {
465  JErrorStream(logger) << "No active client to trigger event.";
466  }
467 
468  } else {
469  JErrorStream(logger) << "Error reading key word event.";
470  }
471 
472  } else if (key == "message") {
473 
474  string tag;
475  string buffer;
476 
477  if (in >> tag && getline(in, buffer, ';'))
478  server->PutFullString(tag, buffer);
479  else
480  JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
481 
482  } else if (key == "print") {
483 
484  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
485  JNoticeStream(logger) << i->getFullName() << ' '
486  << i->getStartCommand() << ' '
487  << i->getAlive() << ' '
488  << i->getStatename();
489  }
490 
491  } else if (key == "filter") {
492 
493  string client;
494  string buffer;
495 
496  getline(in, buffer, ';');
497 
498  for (istringstream is(buffer); is >> client; ) {
499  filter(client);
500  }
501 
502  } else if (key == "sync") {
503 
504  synchronise();
505 
506  } else {
507 
508  JErrorStream(logger) << "Unknown key: " << key;
509 
510  in.ignore(numeric_limits<streamsize>::max(), '\n');
511  }
512  }
513  }
514 
515 
516  /**
517  * Update client list with incoming ControlHost messages until the client list
518  * is synchronised with the current state or until the timeout.
519  */
520  void update()
521  {
522  using namespace std;
523 
524  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
525 
526  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
527 
528  if (state->active()) {
529  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
530  if (!update(true)) {
531  usleep(SLEEP_TIME_US);
532  i += SLEEP_TIME_US;
533  }
534  }
535  }
536  }
537  }
538 
539 
540  /**
541  * Synchronise clients.
542  */
543  void synchronise()
544  {
545  using namespace std;
546 
547  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
548 
549  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
550 
551  if (state->active()) {
552 
553  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
554 
555  JDebugStream(logger) << "Synchronising " << state->name();
556 
557  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
558 
559  if (i->getMode() == JClient::ACTIVE) {
560 
561  if (!i->getAlive()) {
562 
563  try {
564 
565  string buffer;
566 
567  if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
568 
569  i->setAlive(true);
570 
571  if (buffer.find(i->getHostname()) == string::npos) {
572  JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
573  }
574  }
575  }
576  catch(const JControlHostException& error) {
577  JErrorStream(logger) << error;
578  }
579  }
580 
581  if (i->getAlive() && i->getStatename() != state->name()) {
582  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
583  }
584  }
585  }
586 
587 
588  for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
589  if (!update(true)) {
590  usleep(SLEEP_TIME_US);
591  i += SLEEP_TIME_US;
592  }
593  }
594 
595  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
596  JWarningStream(logger) << "Timeout at synchronisation.";
597  }
598  }
599  }
600  }
601  }
602 
603 
604  /**
605  * Filter client list by putting failing clients to sleep.
606  * In this, only clients with names that contain the given character sequence are considered.
607  *
608  * \param target target name of client(s)
609  */
610  void filter(const std::string& target = "")
611  {
612  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
613 
614  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
615 
616  if (state->active()) {
617 
618  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
619 
620  if (target == "" || i->getName().find(target) != std::string::npos) {
621 
622  if (!i->getAlive() || i->getStatename() != state->name()) {
623 
624  JNoticeStream(logger) << "Put to sleep " << i->getFullName();
625 
626  i->setMode(JClient::SLEEP);
627  }
628  }
629  }
630  }
631  }
632  }
633 
634  int timeout_us; //!< timeout of state transitions [us]
635 
636  protected:
637 
639  bool is_alive;
641 
642  /**
643  * Copy data from input to output stream.
644  * Tagged file names are recursively expanded.
645  *
646  * \param in input stream
647  * \param out output stream
648  * \param eol end of line
649  */
650  static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
651  {
652  using namespace std;
653 
654  string buffer;
655 
656  if (getline(in, buffer, eol)) {
657 
658  for (string::size_type pos = 0; pos < buffer.length(); ) {
659 
660  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
661  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
662 
663  if (lpos != string::npos &&
664  rpos != string::npos) {
665 
666  out << buffer.substr(pos, lpos);
667 
668  lpos += FILENAME_PREFIX.length();
669  pos += lpos;
670 
671  ifstream file(buffer.substr(pos, rpos - lpos).c_str());
672 
673  copy(file, out, '\0');
674 
675  rpos += FILENAME_POSTFIX.length();
676  pos += rpos - lpos;
677 
678  } else {
679 
680  out << buffer.substr(pos);
681 
682  pos += buffer.substr(pos).length();
683  }
684  }
685  }
686  }
687  };
688 }
689 
690 #endif
static const std::string FILENAME_PREFIX
Definition: JDAQTags.hh:52
General exception.
Definition: JException.hh:23
const std::string & getName() const
Get event name.
Definition: JEvent_t.hh:65
Exceptions.
static const JTag DISPTAG_Died("Died")
JDAQEvent_t * findEvent(const JTag &tag, const std::string &event_name)
Find event in event table.
Definition: JDAQClient.hh:533
Target.
Definition: JHead.hh:298
static const std::string FILENAME_POSTFIX
Definition: JDAQTags.hh:53
then usage $script[< detector identifier >< run range >]< QA/QCfile > nExample script to produce data quality plots nWhen a detector identifier and run range are data are downloaded from the database nand subsequently stored in the given QA QC file
Definition: JDataQuality.sh:19
ControlHost client manager.
Definition: JLigier.cc:243
JDAQStateMachine::state_Main Main
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
List of ControlHost client managers.
Definition: JLigier.cc:477
#define THROW(JException_t, A)
Marco for throwing exception with std::ostream compatible message.
Definition: JException.hh:696
virtual void actionStart(int, const char *) override
Definition: JDAQDriver.hh:302
Interface for logging messages.
Definition: JLogger.hh:22
static void copy(std::istream &in, std::ostream &out, const char eol= '\n')
Copy data from input to output stream.
Definition: JDAQDriver.hh:650
std::string name
Definition: JDAQCHSM.chsm:154
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:826
static const std::string TOKEN_DELIMETER
Definition: JDAQTags.hh:50
Simple driver for run control clients.
Definition: JDAQDriver.hh:35
is
Definition: JDAQCHSM.chsm:167
void synchronise()
Synchronise clients.
Definition: JDAQDriver.hh:543
void run()
Run driver with user input.
Definition: JDAQDriver.hh:318
static const int SLEEP_TIME_US
Definition: JDAQDriver.hh:40
static const std::string RUN_CONTROL_CLIENT
Definition: JDAQTags.hh:36
void actionExit()
Exit the state machine.
Definition: JDAQDriver.hh:153
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:64
static const JNET::JTag RC_REPLY
Definition: JDAQTags.hh:59
then echo Test string reversed by client(hit< return > to continue)." $DIR/JProcess -c "$DIR/JEcho-r" -C fi if (( 1 ))
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
Definition: JDAQDriver.hh:210
then awk string
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:478
Auxiliary class for handling event name and optional static information.
Definition: JEvent_t.hh:23
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:468
virtual bool exit()
Exit the state machine.
Definition: JDAQClient.hh:482
char getTokenDelimeter()
Get the token delimeter for command messages.
Auxiliary class for all subscription.
Definition: JControlHost.hh:97
void run(std::istream &in)
Run driver.
Definition: JDAQDriver.hh:362
JClientList clientList
Definition: JDAQDriver.hh:638
virtual void actionStop(int, const char *) override
Definition: JDAQDriver.hh:307
virtual void enterState(const CHSM::state &state, const CHSM::event &event) override
Action when entering state.
Definition: JDAQDriver.hh:184
JDAQStateMachine::ev_check_event ev_check
static const JTag DISPTAG_Born("Born")
$WORKDIR ev_configure_dqsimulator txt echo process $DQ_SIMULATOR $i $SOURCE_HOST[$index] csh c(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&($DQ_SIMULATOR\-u\$NAME\$\-H\$SERVER\$\-M\$LOGGER\$\-d $DEBUG</dev/null > &/dev/null &))'
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition: JDAQDriver.hh:51
int timeout_us
timeout of state transitions [us]
Definition: JDAQDriver.hh:634
bool is_valid() const
Check validity of logger object.
Auxiliary class to specify white space character(s) in currect locale.
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:120
Control unit client base class.
Definition: JDAQClient.hh:298
&set_variable SERVER
Definition: JStopDAQ.sh:29
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
Definition: JDAQDriver.hh:610
JMessageLogger logger
message logger
Definition: JDAQClient.hh:827
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:58
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
JDAQStateMachine::state_Main::state_RunControl RunControl
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
Definition: JDAQDriver.hh:520
char * loc(char *orig)
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
virtual bool enter() override
Enter the state machine.
Definition: JDAQDriver.hh:70
Exception for I/O.
Definition: JException.hh:324