Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DataQueue.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <string>
3 #include <cassert>
4 
8 #include <JDAQ/JDAQTags.hh>
9 
13 
14 #include "debug_abrt.hh"
15 #include "log.hh"
16 
17 #include <boost/program_options.hpp>
18 /**
19  * \author cpellegrino
20  */
21 
22 namespace po = boost::program_options;
23 #include <boost/foreach.hpp>
24 #include <boost/scoped_ptr.hpp>
25 #include "configure.hh"
26 #include "version.hpp"
27 
29 
30 static
31 void monitor()
32 {
33  LOG_NOTICE << "# pframes: " << PuzzledFrame::n_obj
34  << " - # dgrams: " << CLBDataGram::n_obj << " - "
35  << Log::Counter::get();
36 }
37 
38 namespace KM3NETDAQ {
39 
40 class DataQueue : public JDAQClient
41 {
42  bool const m_acou;
43  bool const m_opto;
44 
45  // Objects
46  boost::scoped_ptr<DataInputInterface> m_input;
47 
48  boost::scoped_ptr<FrameFarm> m_afarm;
49  boost::scoped_ptr<FrameFarm> m_ofarm;
50 
51  boost::scoped_ptr<RecipientsHandler> m_orecipients;
52  boost::scoped_ptr<RecipientsHandler> m_arecipients;
53 
54  boost::scoped_ptr<DFInterface> m_adfi;
55  boost::scoped_ptr<DFInterface> m_odfi;
56 
57  // Threads
58  boost::scoped_ptr<boost::thread> m_aff_th;
59  boost::scoped_ptr<boost::thread> m_off_th;
60  boost::scoped_ptr<boost::thread> m_adfi_th;
61  boost::scoped_ptr<boost::thread> m_odfi_th;
62 
63  public:
64 
65  DataQueue(const std::string& name,
66  const std::string& server,
67  JLogger* logger,
68  const int level,
69  bool acou,
70  bool opto)
71  :
72  JDAQClient(name, server, logger, level),
73  m_acou(acou),
74  m_opto(opto)
75  {
76  replaceEvent(RC_CMD, clientTag(), ev_configure);
77  }
78 
79  /**
80  * Interface methods for actions corresponding to state transitions.
81  */
82  virtual void actionEnter() {}
83  virtual void actionExit() {}
84 
85  virtual void actionInit(int length, const char* buffer) {}
86  virtual void actionReset(int length, const char* buffer) {}
87 
88  virtual void actionConfigure(int length, const char* buffer)
89  {
90  // parse the configuration
91  boost::property_tree::ptree const conf = detail::parse(
92  std::string(buffer, length));
93 
94  // initialize some global flavor-independent variables
95  unsigned int const delta_ts = conf.get<unsigned int>(
96  "timeslice_duration");
97 
98  uint64_t const run_start_time = conf.get<uint64_t>("run_start_time");
99 
100  std::size_t const max_dump_size = conf.get<std::size_t>("max_dump_size");
101  std::string const prefix = conf.get<std::string>("dump_file_prefix");
102  std::string const postfix = conf.get<std::string>("dump_file_postfix");
103 
104  std::vector<int> acou_ports;
105  std::vector<int> opto_ports;
106 
107  if (m_acou) {
108  acou_ports = detail::vectorize<int>(conf.get<std::string>("acou_ports"));
109  }
110 
111  if (m_opto) {
112  opto_ports = detail::vectorize<int>(conf.get<std::string>("opto_ports"));
113  }
114 
115  int const n_channels = acou_ports.size() + opto_ports.size();
116 
117  // create DataInputInterface
118  m_input.reset(new DataInputInterface(n_channels));
119 
120  // create the FrameFarms
121  if (m_acou) {
122  m_afarm.reset(
123  new FrameFarm(
124  delta_ts,
125  run_start_time,
126  max_dump_size,
127  prefix + "_a",
128  postfix));
129  }
130 
131  if (m_opto) {
132  m_ofarm.reset(
133  new FrameFarm(
134  delta_ts,
135  run_start_time,
136  max_dump_size,
137  prefix + "_o",
138  postfix));
139  }
140 
141  // create the RecipientsHandlers and the DFInterfaces
142  if (m_acou) {
143  std::string const acou_recipient = conf.get<std::string>("acou_recipient");
144 
145  m_arecipients.reset(new RecipientsHandler(10));
146  m_arecipients->add(acou_recipient);
147  m_adfi.reset(new DFInterface(*m_afarm, *m_arecipients));
148  }
149 
150  if (m_opto) {
151  std::vector<std::string> const opto_recipients = detail::vectorize<std::string>(
152  conf.get<std::string>("opto_recipients"));
153 
154  m_orecipients.reset(new RecipientsHandler(10));
155  BOOST_FOREACH(std::string s, opto_recipients)
156  {
157  m_orecipients->add(s);
158  }
159  m_odfi.reset(new DFInterface(*m_ofarm, *m_orecipients));
160  }
161 
162  // add the channels to DataInputInterface
163  if (m_acou) {
164  BOOST_FOREACH(int port, acou_ports)
165  {
166  m_input->add_channel(port, *m_afarm);
167  }
168  }
169 
170  if (m_opto) {
171  BOOST_FOREACH(int port, opto_ports)
172  {
173  m_input->add_channel(port, *m_ofarm);
174  }
175  }
176  }
177 
178  virtual void actionQuit(int length, const char* buffer)
179  {
180  // reset the DataFilter interfaces
181  m_odfi.reset();
182  m_adfi.reset();
183 
184  // reset the RecipientsHandlers
185  m_orecipients.reset();
186  m_arecipients.reset();
187 
188  // reset the FrameFarms
189  m_ofarm.reset();
190  m_afarm.reset();
191 
192  // reset the DataInputInterface
193  m_input.reset();
194  }
195 
196  virtual void actionStart(int length, const char* buffer)
197  {
198  int const run_number = getRunNumber();
199 
200  int const detector_id = getDetectorID();
201 
202  assert(run_number >= 0);
203 
205 
206  // launch the FrameFarm threads
207  if (m_acou) {
208  m_afarm->runNumber(run_number);
209  m_afarm->detectorId(detector_id);
210  m_aff_th.reset(new boost::thread(boost::ref(*m_afarm)));
211  }
212  if (m_opto) {
213  m_ofarm->runNumber(run_number);
214  m_ofarm->detectorId(detector_id);
215  m_off_th.reset(new boost::thread(boost::ref(*m_ofarm)));
216  }
217 
218  // launch the DFInterface threads
219  if (m_acou) m_adfi_th.reset(new boost::thread(boost::ref(*m_adfi)));
220  if (m_opto) m_odfi_th.reset(new boost::thread(boost::ref(*m_odfi)));
221 
222  // launch the DataInputInterface
223  m_input->start();
224  }
225 
226  virtual void actionStop(int length, const char* buffer)
227  {
228  // stop the DataInputInterface
229  m_input->stop();
230 
231  // stop the FrameFarms
232  if (m_opto) m_ofarm->stop();
233  if (m_acou) m_afarm->stop();
234 
235  // stop the DataFilter interfaces
236  if (m_opto) m_odfi->stop();
237  if (m_acou) m_adfi->stop();
238 
239  // join the DataFilter interfaces
240  if (m_opto) m_odfi_th->join();
241  if (m_acou) m_adfi_th->join();
242 
243  // join the FramFarms
244  if (m_opto) m_off_th->join();
245  if (m_acou) m_aff_th->join();
246 
247  // call reset to go in stand-by
248  actionQuit(length, buffer);
249 
250  // reset the DataFilter interfaces threads
251  m_odfi_th.reset();
252  m_adfi_th.reset();
253 
254  // reset the FrameFarms threads
255  m_off_th.reset();
256  m_aff_th.reset();
257  }
258 
259  virtual void actionPause (int length, const char* buffer)
260  {
261  m_input->pause();
262  }
263 
264  virtual void actionContinue (int length, const char* buffer)
265  {
266  m_input->cont();
267  }
268 
269  virtual void actionRunning()
270  {
271  monitor();
272  }
273 
274  virtual void actionSelect(const JFileDescriptorMask& /*mask*/)
275  {
276  monitor();
277  }
278 
279  const JNET::JTag & clientTag() const
280  {
281  if (m_acou && m_opto)
282  {
283  return RC_DQUEUE;
284  }
285  else if (m_acou)
286  {
287  return RC_DQUEUE_ACS;
288  }
289  else if (m_opto)
290  {
291  return RC_DQUEUE_OPT;
292  }
293 
294  assert(!"No DQ mode set.");
295  }
296 };
297 
298 } // ns KM3NETDAQ
299 
300 int main(int argc, char* argv[])
301 {
302  __debug_abort_on_wrong_size_<CLBCommonHeader>(40);
303  __debug_abort_on_wrong_size_<DAQCommonHeader>(56);
304  __debug_abort_on_wrong_size_<UTCTime>(8);
305 
306  std::string server("localhost");
307  std::string logger("localhost");
308  std::string client_name("DataQueue");
309  int debug = 0;
310 
311  po::options_description desc("Options");
312  desc.add_options()
313  ("help,h", "Print this help and exit.")
314  ("version,v", "Print the version and exit.")
315  ("optical,o", "Set the optical mode.")
316  ("acoustic,a", "Set the acoustic mode.")
317  (",H",
318  po::value<std::string>(&server)->default_value(server),
319  "Set the address of the SM server.")
320  (",M",
321  po::value<std::string>(&logger)->default_value(logger),
322  "Set the address of the logger server.")
323  (",u",
324  po::value<std::string>(&client_name)->default_value(client_name),
325  "Set the address of the client name.")
326  (",d",
327  po::value<int>(&debug)->default_value(debug),
328  "Set the debug level.");
329 
330  bool acou = false, opto = false;
331 
332  try
333  {
334  po::variables_map vm;
335  po::store(
336  po::command_line_parser(argc, argv).options(desc).run(),
337  vm);
338 
339  if (vm.count("help"))
340  {
341  std::cout << desc << std::endl;
342  return EXIT_SUCCESS;
343  }
344 
345  if (vm.count("version"))
346  {
347  std::cout << dataqueue::version::v() << std::endl;
348  return EXIT_SUCCESS;
349  }
350 
351  po::notify(vm);
352 
353  opto = vm.count("optical");
354 
355  acou = vm.count("acoustic");
356 
357  if (! (acou || opto))
358  {
359  throw std::runtime_error("FATAL: no mode specified. Use -o, -a or both. See the help.");
360  }
361  }
362  catch (const po::error& e)
363  {
364  std::cerr << "DataQueue: Error: " << e.what() << '\n'
365  << desc << std::endl;
366  return EXIT_FAILURE;
367  }
368  catch (const std::runtime_error& e)
369  {
370  std::cerr << "DataQueue: Error: " << e.what() << '\n'
371  << desc << std::endl;
372  return EXIT_FAILURE;
373  }
374 
375  // Call to singleton in a thread-safe environment
376 
378 
379  // Don't delete this object. It will be handled by the DataQueue class.
382  new JLOGGER::JControlHostLogger(logger),
383  client_name,
384  debug);
385 
386  // Thread-safe logger for the threads different from main
387  initLogger(*log);
388 
389  KM3NETDAQ::DataQueue dqueue(client_name, server, log, debug, acou, opto);
390 
391  dqueue.setClockInterval(30*1000*1000);
392 
393  dqueue.enter();
394  dqueue.run();
395 }
boost::scoped_ptr< boost::thread > m_adfi_th
Definition: DataQueue.cc:60
void initLogger(JLOGGER::JMessageLoggerThreadSafe const &second)
static const JNET::JTag RC_DQUEUE_OPT
Definition: JDAQTags.hh:53
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition: DataQueue.cc:269
bool const m_opto
Definition: DataQueue.cc:43
static const JNET::JTag RC_DQUEUE
Definition: JDAQTags.hh:51
Interface for logging messages.
Definition: JLogger.hh:22
Message logging based on ControlHost.
void setClockInterval(const long long int interval_us)
Set interval time.
Definition: JDAQClient.hh:385
bool const m_acou
Definition: DataQueue.cc:42
boost::scoped_ptr< DFInterface > m_adfi
Definition: DataQueue.cc:54
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition: DataQueue.cc:82
JSharedPointer< JControlHost > server
message server
Definition: JDAQClient.hh:635
virtual void actionContinue(int length, const char *buffer)
Definition: DataQueue.cc:264
void run()
Run as run control client following command messages via JNET::JControlHost.
Definition: JDAQClient.hh:493
static Counter & get()
Definition: log.hh:53
virtual void actionExit()
Definition: DataQueue.cc:83
virtual void actionStop(int length, const char *buffer)
Definition: DataQueue.cc:226
boost::scoped_ptr< FrameFarm > m_afarm
Definition: DataQueue.cc:48
boost::scoped_ptr< boost::thread > m_odfi_th
Definition: DataQueue.cc:61
boost::scoped_ptr< RecipientsHandler > m_orecipients
Definition: DataQueue.cc:51
#define LOG_NOTICE
Definition: log.hh:112
boost::property_tree::ptree parse(std::string str)
Definition: configure.hh:24
virtual void actionStart(int length, const char *buffer)
Definition: DataQueue.cc:196
static boost::atomic< unsigned int > n_obj
Definition: clb_datagram.hh:35
const JNET::JTag & clientTag() const
Definition: DataQueue.cc:279
virtual void actionInit(int length, const char *buffer)
Definition: DataQueue.cc:85
Auxiliary class for method select.
virtual void actionQuit(int length, const char *buffer)
Definition: DataQueue.cc:178
virtual void actionPause(int length, const char *buffer)
Definition: DataQueue.cc:259
boost::scoped_ptr< boost::thread > m_aff_th
Definition: DataQueue.cc:58
int debug
debug level
Definition: JSirene.cc:59
virtual void actionConfigure(int length, const char *buffer)
Definition: DataQueue.cc:88
boost::scoped_ptr< FrameFarm > m_ofarm
Definition: DataQueue.cc:49
boost::scoped_ptr< RecipientsHandler > m_arecipients
Definition: DataQueue.cc:52
General purpose message reporting.
static InBufferCollector & getCollector()
Run control client base class.
Definition: JDAQClient.hh:88
virtual bool enter(const JArgs &args)
Enter the state machine.
Definition: JDAQClient.hh:143
void store(const JString &file_name, const JDetector &detector)
Store detector to output file.
boost::scoped_ptr< DFInterface > m_odfi
Definition: DataQueue.cc:55
static const JNET::JTag RC_DQUEUE_ACS
Definition: JDAQTags.hh:52
JMessageLogger logger
message logger
Definition: JDAQClient.hh:636
void replaceEvent(const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:294
Fixed parameters for KM3NeT DAQ.
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
virtual void actionReset(int length, const char *buffer)
Definition: DataQueue.cc:86
void reset()
Definition: log.hh:73
static void monitor()
Definition: DataQueue.cc:31
ControlHost tag.
Definition: JTag.hh:35
Thread-safe message logger.
DataQueue(const std::string &name, const std::string &server, JLogger *logger, const int level, bool acou, bool opto)
Definition: DataQueue.cc:65
boost::scoped_ptr< DataInputInterface > m_input
Definition: DataQueue.cc:46
boost::scoped_ptr< boost::thread > m_off_th
Definition: DataQueue.cc:59
static boost::atomic< unsigned int > n_obj
virtual void actionSelect(const JFileDescriptorMask &)
Action method following last select call.
Definition: DataQueue.cc:274
int main(int argc, char *argv[])
Definition: Main.cpp:15