Jpp  pmt_effective_area_update
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DataQueue.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <string>
3 #include <cassert>
4 
8 #include <JDAQ/JDAQTags.hh>
9 
13 
14 #include "debug_abrt.hh"
15 #include "log.hh"
16 
17 #include <boost/program_options.hpp>
18 /**
19  * \author cpellegrino
20  */
21 
22 namespace po = boost::program_options;
23 #include <boost/foreach.hpp>
24 #include <boost/scoped_ptr.hpp>
25 #include "configure.hh"
26 #include "version.hpp"
27 
29 void setLogLevel(int level);
30 
31 static
32 void monitor()
33 {
34  LOG_NOTICE << "# pframes: " << PuzzledFrame::n_obj
35  << " - # dgrams: " << CLBDataGram::n_obj << " - "
36  << Log::Counter::get();
37 }
38 
39 namespace KM3NETDAQ {
40 
41 class DataQueue : public JDAQClient
42 {
43  bool const m_acou;
44  bool const m_opto;
45 
46  // Objects
47  boost::scoped_ptr<DataInputInterface> m_input;
48 
49  boost::scoped_ptr<FrameFarm> m_afarm;
50  boost::scoped_ptr<FrameFarm> m_ofarm;
51 
52  boost::scoped_ptr<RecipientsHandler> m_orecipients;
53  boost::scoped_ptr<RecipientsHandler> m_arecipients;
54 
55  boost::scoped_ptr<DFInterface> m_adfi;
56  boost::scoped_ptr<DFInterface> m_odfi;
57 
58  // Threads
59  boost::scoped_ptr<boost::thread> m_aff_th;
60  boost::scoped_ptr<boost::thread> m_off_th;
61  boost::scoped_ptr<boost::thread> m_adfi_th;
62  boost::scoped_ptr<boost::thread> m_odfi_th;
63 
64  public:
65 
66  DataQueue(const std::string& name,
67  const std::string& server,
68  JLogger* logger,
69  const int level,
70  bool acou,
71  bool opto)
72  :
73  JDAQClient(name, server, logger, level),
74  m_acou(acou),
75  m_opto(opto)
76  {
78  }
79 
80  /**
81  * Interface methods for actions corresponding to state transitions.
82  */
83  virtual void actionEnter() {}
84  virtual void actionExit() {}
85 
86  virtual void actionInit(int length, const char* buffer) {}
87  virtual void actionReset(int length, const char* buffer) {}
88 
89  virtual void actionConfigure(int length, const char* buffer)
90  {
91  // parse the configuration
92  boost::property_tree::ptree const conf = detail::parse(
93  std::string(buffer, length));
94 
95  // initialize some global flavor-independent variables
96  unsigned int const delta_ts = conf.get<unsigned int>(
97  "timeslice_duration");
98 
99  uint64_t const run_start_time = conf.get<uint64_t>("run_start_time");
100 
101  std::size_t const max_dump_size = conf.get<std::size_t>("max_dump_size");
102  std::string const prefix = conf.get<std::string>("dump_file_prefix");
103  std::string const postfix = conf.get<std::string>("dump_file_postfix");
104 
105  std::vector<int> acou_ports;
106  std::vector<int> opto_ports;
107 
108  if (m_acou) {
109  acou_ports = detail::vectorize<int>(conf.get<std::string>("acou_ports"));
110  }
111 
112  if (m_opto) {
113  opto_ports = detail::vectorize<int>(conf.get<std::string>("opto_ports"));
114  }
115 
116  int const n_channels = acou_ports.size() + opto_ports.size();
117 
118  // create DataInputInterface
119  m_input.reset(new DataInputInterface(n_channels));
120 
121  // create the FrameFarms
122  if (m_acou) {
123  m_afarm.reset(
124  new FrameFarm(
125  delta_ts,
126  run_start_time,
127  max_dump_size,
128  prefix + "_a",
129  postfix));
130  }
131 
132  if (m_opto) {
133  m_ofarm.reset(
134  new FrameFarm(
135  delta_ts,
136  run_start_time,
137  max_dump_size,
138  prefix + "_o",
139  postfix));
140  }
141 
142  // create the RecipientsHandlers and the DFInterfaces
143  if (m_acou) {
144  std::string const acou_recipient = conf.get<std::string>("acou_recipient");
145 
146  m_arecipients.reset(new RecipientsHandler(10));
147  m_arecipients->add(acou_recipient);
148  m_adfi.reset(new DFInterface(*m_afarm, *m_arecipients));
149  }
150 
151  if (m_opto) {
152  std::vector<std::string> const opto_recipients = detail::vectorize<std::string>(
153  conf.get<std::string>("opto_recipients"));
154 
155  m_orecipients.reset(new RecipientsHandler(10));
156  BOOST_FOREACH(std::string s, opto_recipients)
157  {
158  m_orecipients->add(s);
159  }
160  m_odfi.reset(new DFInterface(*m_ofarm, *m_orecipients));
161  }
162 
163  // add the channels to DataInputInterface
164  if (m_acou) {
165  BOOST_FOREACH(int port, acou_ports)
166  {
167  m_input->add_channel(port, *m_afarm);
168  }
169  }
170 
171  if (m_opto) {
172  BOOST_FOREACH(int port, opto_ports)
173  {
174  m_input->add_channel(port, *m_ofarm);
175  }
176  }
177  }
178 
179  virtual void actionQuit(int length, const char* buffer)
180  {
181  // reset the DataFilter interfaces
182  m_odfi.reset();
183  m_adfi.reset();
184 
185  // reset the RecipientsHandlers
186  m_orecipients.reset();
187  m_arecipients.reset();
188 
189  // reset the FrameFarms
190  m_ofarm.reset();
191  m_afarm.reset();
192 
193  // reset the DataInputInterface
194  m_input.reset();
195  }
196 
197  virtual void actionStart(int length, const char* buffer)
198  {
199  int const run_number = getRunNumber();
200 
201  int const detector_id = getDetectorID();
202 
203  assert(run_number >= 0);
204 
206 
207  // launch the FrameFarm threads
208  if (m_acou) {
209  m_afarm->runNumber(run_number);
210  m_afarm->detectorId(detector_id);
211  m_aff_th.reset(new boost::thread(boost::ref(*m_afarm)));
212  }
213  if (m_opto) {
214  m_ofarm->runNumber(run_number);
215  m_ofarm->detectorId(detector_id);
216  m_off_th.reset(new boost::thread(boost::ref(*m_ofarm)));
217  }
218 
219  // launch the DFInterface threads
220  if (m_acou) m_adfi_th.reset(new boost::thread(boost::ref(*m_adfi)));
221  if (m_opto) m_odfi_th.reset(new boost::thread(boost::ref(*m_odfi)));
222 
223  // launch the DataInputInterface
224  m_input->start();
225  }
226 
227  virtual void actionStop(int length, const char* buffer)
228  {
229  // stop the DataInputInterface
230  m_input->stop();
231 
232  // stop the FrameFarms
233  if (m_opto) m_ofarm->stop();
234  if (m_acou) m_afarm->stop();
235 
236  // stop the DataFilter interfaces
237  if (m_opto) m_odfi->stop();
238  if (m_acou) m_adfi->stop();
239 
240  // join the DataFilter interfaces
241  if (m_opto) m_odfi_th->join();
242  if (m_acou) m_adfi_th->join();
243 
244  // join the FramFarms
245  if (m_opto) m_off_th->join();
246  if (m_acou) m_aff_th->join();
247 
248  // call reset to go in stand-by
249  actionQuit(length, buffer);
250 
251  // reset the DataFilter interfaces threads
252  m_odfi_th.reset();
253  m_adfi_th.reset();
254 
255  // reset the FrameFarms threads
256  m_off_th.reset();
257  m_aff_th.reset();
258  }
259 
260  virtual void actionPause (int length, const char* buffer)
261  {
262  m_input->pause();
263  }
264 
265  virtual void actionContinue (int length, const char* buffer)
266  {
267  m_input->cont();
268  }
269 
270  virtual void actionRunning()
271  {
272  monitor();
273  }
274 
275  virtual void actionSelect(const JFileDescriptorMask& /*mask*/)
276  {
277  monitor();
278  }
279 
280  virtual void actionInput(int length, const char* buffer)
281  {
282  // this to preserve future compatibility
283  this->JDAQClient::actionInput(length, buffer);
284 
285  setLogLevel(this->logger.getLevel());
286  }
287 
288  const JNET::JTag & clientTag() const
289  {
290  static const JNET::JTag tag;
291 
292  if (m_acou && m_opto)
293  {
294  return RC_DQUEUE;
295  }
296  else if (m_acou)
297  {
298  return RC_DQUEUE_ACS;
299  }
300  else if (m_opto)
301  {
302  return RC_DQUEUE_OPT;
303  } else
304  {
305  return tag;
306  }
307 
308  //assert(!"No DQ mode set.");
309  }
310 };
311 
312 } // ns KM3NETDAQ
313 
314 int main(int argc, char* argv[])
315 {
316  __debug_abort_on_wrong_size_<CLBCommonHeader>(40);
317  __debug_abort_on_wrong_size_<DAQCommonHeader>(56);
318  __debug_abort_on_wrong_size_<UTCTime>(8);
319 
320  std::string server("localhost");
321  std::string logger("localhost");
322  std::string client_name("DataQueue");
323  int debug = 0;
324 
325  po::options_description desc("Options");
326  desc.add_options()
327  ("help,h", "Print this help and exit.")
328  ("version,v", "Print the version and exit.")
329  ("optical,o", "Set the optical mode.")
330  ("acoustic,a", "Set the acoustic mode.")
331  (",H",
332  po::value<std::string>(&server)->default_value(server),
333  "Set the address of the SM server.")
334  (",M",
335  po::value<std::string>(&logger)->default_value(logger),
336  "Set the address of the logger server.")
337  (",u",
338  po::value<std::string>(&client_name)->default_value(client_name),
339  "Set the address of the client name.")
340  (",d",
341  po::value<int>(&debug)->default_value(debug),
342  "Set the debug level.");
343 
344  bool acou = false, opto = false;
345 
346  try
347  {
348  po::variables_map vm;
349  po::store(
350  po::command_line_parser(argc, argv).options(desc).run(),
351  vm);
352 
353  if (vm.count("help"))
354  {
355  std::cout << desc << std::endl;
356  return EXIT_SUCCESS;
357  }
358 
359  if (vm.count("version"))
360  {
361  std::cout << dataqueue::version::v() << std::endl;
362  return EXIT_SUCCESS;
363  }
364 
365  po::notify(vm);
366 
367  opto = vm.count("optical");
368 
369  acou = vm.count("acoustic");
370 
371  if (! (acou || opto))
372  {
373  throw std::runtime_error("FATAL: no mode specified. Use -o, -a or both. See the help.");
374  }
375  }
376  catch (const po::error& e)
377  {
378  std::cerr << "DataQueue: Error: " << e.what() << '\n'
379  << desc << std::endl;
380  return EXIT_FAILURE;
381  }
382  catch (const std::runtime_error& e)
383  {
384  std::cerr << "DataQueue: Error: " << e.what() << '\n'
385  << desc << std::endl;
386  return EXIT_FAILURE;
387  }
388 
389  // Call to singleton in a thread-safe environment
390 
392 
393  // Don't delete this object. It will be handled by the DataQueue class.
396  new JLOGGER::JControlHostLogger(logger),
397  client_name,
398  debug);
399 
400  // Thread-safe logger for the threads different from main
401  initLogger(*log);
402 
403  KM3NETDAQ::DataQueue dqueue(client_name, server, log, debug, acou, opto);
404 
405  dqueue.setClockInterval(30*1000*1000);
406 
407  dqueue.enter();
408  dqueue.run();
409 }
boost::scoped_ptr< boost::thread > m_adfi_th
Definition: DataQueue.cc:61
void initLogger(JLOGGER::JMessageLoggerThreadSafe const &second)
static const JNET::JTag RC_DQUEUE_OPT
Definition: JDAQTags.hh:54
int main(int argc, char *argv[])
Definition: Main.cc:15
JDAQStateMachine::ev_configure_event ev_configure
virtual void actionInput(int length, const char *buffer)
This method is called at ev_input.
Definition: DataQueue.cc:280
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: DataQueue.cc:270
bool const m_opto
Definition: DataQueue.cc:44
static const JNET::JTag RC_DQUEUE
Definition: JDAQTags.hh:52
int run_number
Definition: JDAQCHSM.chsm:156
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
bool const m_acou
Definition: DataQueue.cc:43
boost::scoped_ptr< DFInterface > m_adfi
Definition: DataQueue.cc:55
void setClockInterval(const long long int interval_us)
Set interval time.
Definition: JDAQClient.hh:164
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition: DataQueue.cc:83
std::string name
Definition: JDAQCHSM.chsm:154
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:492
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:800
virtual void actionContinue(int length, const char *buffer)
Definition: DataQueue.cc:265
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:661
static Counter & get()
Definition: log.hh:53
virtual void actionExit()
Definition: DataQueue.cc:84
virtual void actionStop(int length, const char *buffer)
Definition: DataQueue.cc:227
boost::scoped_ptr< FrameFarm > m_afarm
Definition: DataQueue.cc:49
boost::scoped_ptr< boost::thread > m_odfi_th
Definition: DataQueue.cc:62
boost::scoped_ptr< RecipientsHandler > m_orecipients
Definition: DataQueue.cc:52
#define LOG_NOTICE
Definition: log.hh:112
boost::property_tree::ptree parse(std::string str)
Definition: configure.hh:24
int getDetectorID() const
Get detector identifier.
Definition: JDAQCHSM.chsm:89
then usage $script[port]< option > nPossible options
virtual void actionStart(int length, const char *buffer)
Definition: DataQueue.cc:197
static boost::atomic< unsigned int > n_obj
Definition: clb_datagram.hh:35
const JNET::JTag & clientTag() const
Definition: DataQueue.cc:288
virtual void actionInit(int length, const char *buffer)
Definition: DataQueue.cc:86
Auxiliary class for method select.
void store(const std::string &file_name, const JDetector &detector)
Store detector to output file.
virtual void actionQuit(int length, const char *buffer)
Definition: DataQueue.cc:179
virtual void actionPause(int length, const char *buffer)
Definition: DataQueue.cc:260
boost::scoped_ptr< boost::thread > m_aff_th
Definition: DataQueue.cc:59
int debug
debug level
Definition: JSirene.cc:63
virtual void actionConfigure(int length, const char *buffer)
Definition: DataQueue.cc:89
boost::scoped_ptr< FrameFarm > m_ofarm
Definition: DataQueue.cc:50
void setLogLevel(int level)
Definition: jlog.cc:43
virtual void actionInput(int length, const char *buffer)
This method is called at ev_input.
Definition: JDAQClient.hh:606
boost::scoped_ptr< RecipientsHandler > m_arecipients
Definition: DataQueue.cc:53
General purpose message reporting.
int getRunNumber() const
Get run number.
Definition: JDAQCHSM.chsm:100
static InBufferCollector & getCollector()
Control unit client base class.
Definition: JDAQClient.hh:272
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:363
boost::scoped_ptr< DFInterface > m_odfi
Definition: DataQueue.cc:56
static const JNET::JTag RC_DQUEUE_ACS
Definition: JDAQTags.hh:53
JMessageLogger logger
message logger
Definition: JDAQClient.hh:801
Fixed parameters andd ControlHost tags for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
virtual void actionReset(int length, const char *buffer)
Definition: DataQueue.cc:87
void reset()
Definition: log.hh:73
int detector_id
Definition: JDAQCHSM.chsm:155
static void monitor()
Definition: DataQueue.cc:32
ControlHost tag.
Definition: JTag.hh:38
data_type v[N+1][M+1]
Definition: JPolint.hh:740
Thread-safe message logger.
DataQueue(const std::string &name, const std::string &server, JLogger *logger, const int level, bool acou, bool opto)
Definition: DataQueue.cc:66
boost::scoped_ptr< DataInputInterface > m_input
Definition: DataQueue.cc:47
boost::scoped_ptr< boost::thread > m_off_th
Definition: DataQueue.cc:60
static boost::atomic< unsigned int > n_obj
virtual void actionSelect(const JFileDescriptorMask &)
Action method following last select call.
Definition: DataQueue.cc:275
JLevel_t getLevel()
Get debug level.