Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JDAQDriver.hh
Go to the documentation of this file.
1 #ifndef __JRUNCONTROL__JDAQDRIVER__
2 #define __JRUNCONTROL__JDAQDRIVER__
3 
4 #include <string>
5 #include <iostream>
6 #include <sstream>
7 #include <limits>
8 #include <unistd.h>
9 
10 #include "JLang/JException.hh"
11 #include "JLang/JRedirectStream.hh"
13 
15 #include "JRuncontrol/JClient.hh"
17 
18 
19 /**
20  * \author mdejong
21  */
22 
23 namespace KM3NETDAQ {
24 
25  using JLANG::JException;
26  using JLANG::JIOException;
28 
29 
30  /**
31  * Simple driver for run control clients.
32  * This class can be used to start a set of run control clients,
33  * trigger events and eventually stop the clients.
34  */
35  class JDAQDriver :
36  public JDAQClient
37  {
38  public:
39  /**
40  * Constructor.
41  *
42  * \param name name of driver
43  * \param server name of command message server
44  * \param logger logger
45  * \param level debug level
46  * \param timeout_s timeout_s [s]
47  */
48  JDAQDriver(const std::string& name,
49  const std::string& server,
50  JLogger* logger,
51  const int level,
52  const int timeout_s) :
53  JDAQClient(name, server, logger, level),
54  timeout_s (timeout_s),
55  is_alive (false)
56  {}
57 
58 
59  /**
60  * Enter the state machine.
61  * The driver will subscribe to the ControlHost tags corresponding to born, died and
62  * reply messages of the clients instead of the standard tags for run control commands.
63  * The clients are started after the driver is ready to receive ControlHost messages.
64  * In case of an error, a message is printed on the terminal and the state machine
65  * is not entered.
66  */
67  virtual bool enter()
68  {
69  using namespace std;
70  using namespace JLANG;
71  using namespace JNET;
72 
73  if (server.is_valid() && logger.is_valid()) {
74 
75  try {
76 
77  server->Subscribe(JSubscriptionAll(RC_REPLY) +
81  server->SendMeAlways();
82  server->MyId(getFullName());
83 
84  // check alive of this driver
85 
86  for (int i = 0; i != timeout_s && !is_alive; ) {
87  if (!update(true)) {
88  sleep(1);
89  ++i;
90  }
91  }
92 
93  if (is_alive) {
94 
95  {
96  // redirect standard output to message logger
97 
98  //JDebugStream out(logger);
99  //JRedirectStream redirect(cout, out);
100 
101  clientList.start();
102 
103  for (int i = 0; i != timeout_s; ++i) {
104 
105  while (update(true)) {}
106 
107  if (clientList.count() >= clientList.count(JClient::ACTIVE))
108  break;
109  else
110  sleep(1);
111  }
112  }
113 
114  if (clientList.count() != clientList.count(JClient::ACTIVE)) {
115  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
116  if (i->isActive() && i->getBorn() <= i->getDied()) {
117  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
118  }
119  }
120  }
121 
122  return CHSM::machine::enter();
123 
124  } else {
125  cerr << "Timeout at subscription." << endl;
126  }
127  }
128  catch(const JControlHostException& error) {
129  cerr << error << endl;
130  }
131 
132  } else {
133  cerr << "Message server or logger not properly initialised." << endl;
134  }
135 
136  return false;
137  }
138 
139 
140  /**
141  * Exit the state machine.
142  *
143  * This method waits for the clients to terminate using the died message generated by ControlHost.
144  * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
145  */
146  void actionExit()
147  {
148  using namespace std;
149  using namespace JLANG;
150 
151  for (int i = 0; i != timeout_s; ++i) {
152 
153  while (update(true)) {}
154 
155  if (clientList.count() == 0)
156  break;
157  else
158  sleep(1);
159  }
160 
161  if (clientList.count() != 0) {
162  JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
163  }
164 
165  // redirect standard output to message logger
166 
167  //JDebugStream out(logger);
168  //JRedirectStream redirect(cout, out);
169 
170  clientList.stop();
171  }
172 
173 
174  /**
175  * Action when entering state.
176  * This method waits for all clients to produce the enter state message.
177  * In case of a timeout, no specific action is taken but an error message is logged.
178  *
179  * \param state entered state
180  * \param event event that triggered transition
181  */
182  void enterState(const CHSM::state& state, const CHSM::event& event)
183  {
184  for (int i = 0; i != timeout_s && clientList.count(state) < clientList.count(JClient::ACTIVE); ) {
185  if (!update(true)) {
186  sleep(1);
187  ++i;
188  }
189  }
190 
191  if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
192  JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name();
193  }
194  }
195 
196 
197  /**
198  * Update client list with incoming ControlHost message.
199  * This method receives and processes a message.
200  * The client list is updated accordingly.
201  * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
202  * The return value is then false.
203  * If the no-wait option is set to false, it waits until the next message is received.
204  *
205  * \param no_wait wait option
206  * \return true if message received; else false
207  */
208  bool update(const bool no_wait)
209  {
210  using namespace std;
211  using namespace JNET;
213 
214  try {
215 
216  string tag;
217  int length = 0;
218 
219  if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
220  if (!no_wait && server->WaitHead (tag, length) < 0) { return false; }
221 
222  char* data= new char[length];
223 
224  server->GetFullData(data, length);
225 
226  const string buffer(data, length);
227 
228  delete [] data;
229 
230  JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
231 
232  if (tag == RC_LOG) {
233 
234  rc_log = buffer;
235 
236  } else if (buffer.find(getFullName()) != string::npos) {
237 
238  if (tag == DISPTAG_Born)
239  is_alive = true;
240  else if (tag == DISPTAG_Died)
241  is_alive = false;
242 
243  } else {
244 
245  JClientList::iterator i = clientList.find(buffer);
246 
247  if (i != clientList.end()) {
248 
249  i->update(tag, buffer);
250 
251  } else {
252 
253  JErrorStream(logger) << "Message fom illegal client " << buffer;
254 
255  try {
256 
257  if (tag == DISPTAG_Born ||
258  tag == DISPTAG_Died ||
259  tag == RC_REPLY) {
260 
261  string key, hostname, name;
262 
263  istringstream is(buffer);
264 
265  const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
266 
267  is.imbue(loc);
268 
269  if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {
270 
271  JClient client(name, hostname);
272 
273  client.update(tag, buffer);
274  client.setMode(JClient::ILLEGAL);
275 
276  clientList.insert(client);
277 
278  JWarningStream(logger) << "Added illegal client " << client.getFullName();
279 
280  } else {
281  throw JIOException("JClient: Error reading " + buffer);
282  }
283  }
284  }
285  catch(const JException& error) {
286  JErrorStream(logger) << error;
287  }
288  }
289  }
290 
291  return true;
292  }
293  catch(const JControlHostException& error) {
294  JErrorStream(logger) << error;
295  }
296 
297  return false;
298  }
299 
300  virtual void actionStart(int, const char*)
301  {
302  rc_log = "";
303  }
304 
305  virtual void actionStop(int, const char*)
306  {
307  if (rc_log != "")
309  else
310  JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
311  }
312 
313  /**
314  * Run driver with user input.
315  */
316  void run()
317  {
318  run(std::cin);
319  }
320 
321 
322  /**
323  * Run driver.
324  *
325  * Example input format:
326  * <pre>
327  * # comment line.
328  *
329  * process <process name> <host name> [<start command>];
330  *
331  * # The following tokens in <start command> will be substituted:
332  * # $HOST$ by <host name>;
333  * # $NAME$ by <process name>;
334  * # $SERVER$ by JClient::SERVER;
335  * # $LOGGER$ by JClient::LOGGER;
336  * # $ARGS$ by part following '/' in <process name>;
337  *
338  * # enter state machine.
339  *
340  * enter
341  *
342  * # trigger event
343  * # data can be provided online and mixed with data from a separate file (optional).
344  * # multiple tags should be separated by a new line.
345  *
346  * event <event name> {
347  * [<tag 1> [data]]
348  * [<tag 2> [data][%<file name>%][data]]
349  * }
350  *
351  * # optionally quit before end of input
352  * [quit]
353  *
354  * # optionally kill processes that did not properly terminate.
355  * [exit]
356  * </pre>
357  *
358  * \param in input stream
359  */
360  void run(std::istream& in)
361  {
362  using namespace std;
363 
364  for (string key; in >> key; ) {
365 
366  if (key[0] == '#') {
367 
368  in.ignore(numeric_limits<streamsize>::max(), '\n');
369 
370  } else if (key == "enter") {
371 
372  enter();
373 
374  if (!active()) {
375  cerr << "State machine not entered; abort." << endl;
376  return;
377  }
378 
379  } else if (key == "exit") {
380 
381  timeout_s = 0;
382  exit();
383 
384  } else if (key == "quit") {
385 
386  break;
387 
388  } else if (key == "sleep") {
389 
390  int sec;
391 
392  if (in >> sec) {
393  sleep(sec);
394  }
395 
396  } else if (key == "process") {
397 
398  string buffer;
399 
400  getline(in, buffer, ';');
401 
402  istringstream is(buffer);
403 
404  JClient client;
405 
406  if (is >> client) {
407 
408  client.setMode(JClient::ACTIVE);
409 
410  if (!clientList.insert(client).second) {
411  JWarningStream(logger) << "Process already exists " << client;
412  }
413 
414  } else {
415  JErrorStream(logger) << "Error reading key word process.";
416  }
417 
418  } else if (key == "event" || key == "event*") {
419 
420  JEvent_t event;
421  char c;
422  string buffer;
423  const char eol = '\n';
424 
425  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {
426 
427  JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
428 
429  if (pev != NULL) {
430 
431  if (pev->active() || key == "event*") {
432 
433  istringstream is(buffer);
434 
435  for (string tag; is >> tag; ) {
436 
437  ostringstream os;
438 
439  os << event << getTokenDelimeter();
440 
441  copy(is, os, eol);
442 
443  server->PutFullString(tag, os.str());
444  }
445 
446  if (key != "event*") {
447  (*pev)(0, NULL); // trigger driver
448  }
449 
450  } else {
451  JErrorStream(logger) << "Inactive event " << event;
452  }
453 
454  } else {
455  JErrorStream(logger) << "Unknown event " << event;
456  }
457 
458  } else {
459  JErrorStream(logger) << "Error reading key word event.";
460  }
461 
462  } else if (key == "print") {
463 
464  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
465  JNoticeStream(logger) << i->getFullName() << ' '
466  << i->getStartCommand() << ' '
467  << i->getAlive() << ' '
468  << i->getStatename();
469  }
470 
471  } else if (key == "filter") {
472 
473  string client;
474  string buffer;
475 
476  getline(in, buffer, ';');
477 
478  for (istringstream is(buffer); is >> client; ) {
479  filter(client);
480  }
481 
482  } else if (key == "sync") {
483 
484  synchronise();
485 
486  } else {
487 
488  JErrorStream(logger) << "Unknown key: " << key;
489 
490  in.ignore(numeric_limits<streamsize>::max(), '\n');
491  }
492  }
493  }
494 
495 
496  /**
497  * Update client list with incoming ControlHost messages until the client list
498  * is synchronised with the current state or until the timeout.
499  */
500  void update()
501  {
502  using namespace std;
503 
504  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
505 
506  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
507 
508  if (state->active()) {
509  for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
510  if (!update(true)) {
511  sleep(1);
512  ++i;
513  }
514  }
515  }
516  }
517  }
518 
519 
520  /**
521  * Synchronise clients.
522  */
523  void synchronise()
524  {
525  using namespace std;
526 
527  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
528 
529  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
530 
531  if (state->active()) {
532 
533  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
534 
535  JDebugStream(logger) << "Synchronising " << state->name();
536 
537  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
538 
539  if (i->getMode() == JClient::ACTIVE) {
540 
541  if (!i->getAlive()) {
542 
543  try {
544 
545  string buffer;
546 
547  if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {
548 
549  i->setAlive(true);
550 
551  if (buffer.find(i->getHostname()) == string::npos) {
552  JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
553  }
554  }
555  }
556  catch(const JControlHostException& error) {
557  JErrorStream(logger) << error;
558  }
559  }
560 
561  if (i->getAlive() && i->getStatename() != state->name()) {
562  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name());
563  }
564  }
565  }
566 
567 
568  for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
569  if (!update(true)) {
570  sleep(1);
571  ++i;
572  }
573  }
574 
575  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
576  JWarningStream(logger) << "Timeout at synchronisation.";
577  }
578  }
579  }
580  }
581  }
582 
583 
584  /**
585  * Filter client list by putting failing clients to sleep.
586  * In this, only clients with names that contain the given character sequence are considered.
587  *
588  * \param target target name of client(s)
589  */
590  void filter(const std::string& target = "")
591  {
592  const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);
593 
594  for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {
595 
596  if (state->active()) {
597 
598  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
599 
600  if (target == "" || i->getName().find(target) != std::string::npos) {
601 
602  if (!i->getAlive() || i->getStatename() != state->name()) {
603 
604  JNoticeStream(logger) << "Put to sleep " << i->getFullName();
605 
606  i->setMode(JClient::SLEEP);
607  }
608  }
609  }
610  }
611  }
612  }
613 
614  int timeout_s; //!< timeout of state transitions [s]
615 
616  protected:
617 
619  bool is_alive;
620  std::string rc_log;
621 
622  /**
623  * Copy data from input to output stream.
624  * Tagged file names are recursively expanded.
625  *
626  * \param in input stream
627  * \param out output stream
628  * \param eol end of line
629  */
630  static void copy(std::istream& in, std::ostream& out, const char eol = '\n')
631  {
632  using namespace std;
633 
634  string buffer;
635 
636  if (getline(in, buffer, eol)) {
637 
638  for (string::size_type pos = 0; pos < buffer.length(); ) {
639 
640  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
641  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
642 
643  if (lpos != string::npos &&
644  rpos != string::npos) {
645 
646  out << buffer.substr(pos, lpos);
647 
648  lpos += FILENAME_PREFIX.length();
649  pos += lpos;
650 
651  ifstream file(buffer.substr(pos, rpos - lpos).c_str());
652 
653  copy(file, out, '\0');
654 
655  rpos += FILENAME_POSTFIX.length();
656  pos += rpos - lpos;
657 
658  } else {
659 
660  out << buffer.substr(pos);
661 
662  pos += buffer.substr(pos).length();
663  }
664  }
665  }
666  }
667  };
668 }
669 
670 #endif
static const std::string FILENAME_PREFIX
Definition: JDAQTags.hh:38
int timeout_s
timeout of state transitions [s]
Definition: JDAQDriver.hh:614
General exception.
Definition: JException.hh:40
const std::string & getName() const
Get event name.
Exceptions.
static const JTag DISPTAG_Died("Died")
Target.
Definition: JHead.hh:146
static const std::string FILENAME_POSTFIX
Definition: JDAQTags.hh:39
ControlHost client manager.
Definition: JLigier.cc:241
virtual bool enter()
Enter the state machine.
Definition: JDAQDriver.hh:67
JTag getUniqueTag(const std::string &hostname, const std::string &name)
Get unique tag of run control client.
virtual void actionStop(int, const char *)
Definition: JDAQDriver.hh:305
List of ControlHost client managers.
Definition: JLigier.cc:453
virtual void actionStart(int, const char *)
Definition: JDAQDriver.hh:300
Interface for logging messages.
Definition: JLogger.hh:22
static void copy(std::istream &in, std::ostream &out, const char eol= '\n')
Copy data from input to output stream.
Definition: JDAQDriver.hh:630
Structure to store the ToT mean and standard deviation of the hits produced by a nanobeacon in a sour...
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
static const std::string TOKEN_DELIMETER
Definition: JDAQTags.hh:36
Simple driver for run control clients.
Definition: JDAQDriver.hh:35
void enterState(const CHSM::state &state, const CHSM::event &event)
Action when entering state.
Definition: JDAQDriver.hh:182
void synchronise()
Synchronise clients.
Definition: JDAQDriver.hh:523
void run()
Run driver with user input.
Definition: JDAQDriver.hh:316
static const std::string RUN_CONTROL_CLIENT
Definition: JDAQTags.hh:22
void actionExit()
Exit the state machine.
Definition: JDAQDriver.hh:146
static const JNET::JTag RC_LOG
Definition: JDAQTags.hh:49
static const JNET::JTag RC_REPLY
Definition: JDAQTags.hh:45
bool update(const bool no_wait)
Update client list with incoming ControlHost message.
Definition: JDAQDriver.hh:208
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:468
Auxiliary class for handling event name and optional number.
Level specific message streamers.
Exception for ControlHost.
Definition: JException.hh:432
virtual bool exit()
Exit the state machine.
Definition: JDAQClient.hh:225
char getTokenDelimeter()
Get the token delimeter for command messages.
Auxiliary class for all subscription.
Definition: JControlHost.hh:95
void run(std::istream &in)
Run driver.
Definition: JDAQDriver.hh:360
JClientList clientList
Definition: JDAQDriver.hh:618
static const JTag DISPTAG_Born("Born")
JDAQDriver(const std::string &name, const std::string &server, JLogger *logger, const int level, const int timeout_s)
Constructor.
Definition: JDAQDriver.hh:48
bool is_valid() const
Check validity of logger object.
Auxiliary class to specify white space character(s) in currect locale.
Run control client base class.
Definition: JDAQClient.hh:88
void filter(const std::string &target="")
Filter client list by putting failing clients to sleep.
Definition: JDAQDriver.hh:590
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
static int WhereIs(const std::string &host_name, const std::string &nick_name, std::string &answer)
Locate ControlHost client(s).
JDAQStateMachine::ev_daq_event JDAQEvent_t
Type definition of a DAQ event.
Definition: JEventTable.hh:22
const std::string & getFullName() const
Get full name of this run control client.
Definition: JDAQClient.hh:248
void update()
Update client list with incoming ControlHost messages until the client list is synchronised with the ...
Definition: JDAQDriver.hh:500
JDAQEvent_t * findEvent(const JNET::JTag &tag, const std::string &event_name)
Find event in event table.
Definition: JDAQClient.hh:309
std::string hostname
Definition: JDAQClient.hh:829
Exception for I/O.
Definition: JException.hh:306