Jpp test-rotations-old
the software that should make you happy
Loading...
Searching...
No Matches
DataQueue.cc
Go to the documentation of this file.
1#include <iostream>
2#include <string>
3#include <cassert>
4
8#include <JDAQ/JDAQTags.hh>
9
13
14#include "debug_abrt.hh"
15#include "log.hh"
16
17#include <boost/program_options.hpp>
18/**
19 * \author cpellegrino
20 */
21
22namespace po = boost::program_options;
23#include <boost/foreach.hpp>
24#include <boost/scoped_ptr.hpp>
25#include "configure.hh"
26#include "version.hpp"
27
29void setLogLevel(int level);
30
31static
32void monitor()
33{
34 LOG_NOTICE << "# pframes: " << PuzzledFrame::n_obj
35 << " - # dgrams: " << CLBDataGram::n_obj << " - "
37}
38
39namespace KM3NETDAQ {
40
41class DataQueue : public JDAQClient
42{
43 bool const m_acou;
44 bool const m_opto;
45
46 // Objects
47 boost::scoped_ptr<DataInputInterface> m_input;
48
49 boost::scoped_ptr<FrameFarm> m_afarm;
50 boost::scoped_ptr<FrameFarm> m_ofarm;
51
52 boost::scoped_ptr<RecipientsHandler> m_orecipients;
53 boost::scoped_ptr<RecipientsHandler> m_arecipients;
54
55 boost::scoped_ptr<DFInterface> m_adfi;
56 boost::scoped_ptr<DFInterface> m_odfi;
57
58 // Threads
59 boost::scoped_ptr<boost::thread> m_aff_th;
60 boost::scoped_ptr<boost::thread> m_off_th;
61 boost::scoped_ptr<boost::thread> m_adfi_th;
62 boost::scoped_ptr<boost::thread> m_odfi_th;
63
64 public:
65
66 DataQueue(const std::string& name,
67 const std::string& server,
69 const int level,
70 bool acou,
71 bool opto)
72 :
73 JDAQClient(name, server, logger, level),
74 m_acou(acou),
75 m_opto(opto)
76 {
78 }
79
80 /**
81 * Interface methods for actions corresponding to state transitions.
82 */
83 virtual void actionEnter() {}
84 virtual void actionExit() {}
85
86 virtual void actionInit(int length, const char* buffer) {}
87 virtual void actionReset(int length, const char* buffer) {}
88
89 virtual void actionConfigure(int length, const char* buffer)
90 {
91 // parse the configuration
92 boost::property_tree::ptree const conf = detail::parse(
93 std::string(buffer, length));
94
95 // initialize some global flavor-independent variables
96 unsigned int const delta_ts = conf.get<unsigned int>(
97 "timeslice_duration");
98
99 uint64_t const run_start_time = conf.get<uint64_t>("run_start_time");
100
101 std::size_t const max_dump_size = conf.get<std::size_t>("max_dump_size");
102 std::string const prefix = conf.get<std::string>("dump_file_prefix");
103 std::string const postfix = conf.get<std::string>("dump_file_postfix");
104
105 std::vector<int> acou_ports;
106 std::vector<int> opto_ports;
107
108 if (m_acou) {
109 acou_ports = detail::vectorize<int>(conf.get<std::string>("acou_ports"));
110 }
111
112 if (m_opto) {
113 opto_ports = detail::vectorize<int>(conf.get<std::string>("opto_ports"));
114 }
115
116 int const n_channels = acou_ports.size() + opto_ports.size();
117
118 // create DataInputInterface
119 m_input.reset(new DataInputInterface(n_channels));
120
121 // create the FrameFarms
122 if (m_acou) {
123 m_afarm.reset(
124 new FrameFarm(
125 delta_ts,
126 run_start_time,
127 max_dump_size,
128 prefix + "_a",
129 postfix));
130 }
131
132 if (m_opto) {
133 m_ofarm.reset(
134 new FrameFarm(
135 delta_ts,
136 run_start_time,
137 max_dump_size,
138 prefix + "_o",
139 postfix));
140 }
141
142 // create the RecipientsHandlers and the DFInterfaces
143 if (m_acou) {
144 std::string const acou_recipient = conf.get<std::string>("acou_recipient");
145
146 m_arecipients.reset(new RecipientsHandler(10));
147 m_arecipients->add(acou_recipient);
149 }
150
151 if (m_opto) {
153 conf.get<std::string>("opto_recipients"));
154
155 m_orecipients.reset(new RecipientsHandler(10));
156 BOOST_FOREACH(std::string s, opto_recipients)
157 {
158 m_orecipients->add(s);
159 }
161 }
162
163 // add the channels to DataInputInterface
164 if (m_acou) {
165 BOOST_FOREACH(int port, acou_ports)
166 {
167 m_input->add_channel(port, *m_afarm);
168 }
169 }
170
171 if (m_opto) {
172 BOOST_FOREACH(int port, opto_ports)
173 {
174 m_input->add_channel(port, *m_ofarm);
175 }
176 }
177 }
178
179 virtual void actionQuit(int length, const char* buffer)
180 {
181 // reset the DataFilter interfaces
182 m_odfi.reset();
183 m_adfi.reset();
184
185 // reset the RecipientsHandlers
186 m_orecipients.reset();
187 m_arecipients.reset();
188
189 // reset the FrameFarms
190 m_ofarm.reset();
191 m_afarm.reset();
192
193 // reset the DataInputInterface
194 m_input.reset();
195 }
196
197 virtual void actionStart(int length, const char* buffer)
198 {
199 int const run_number = getRunNumber();
200
201 int const detector_id = getDetectorID();
202
203 assert(run_number >= 0);
204
206
207 // launch the FrameFarm threads
208 if (m_acou) {
209 m_afarm->runNumber(run_number);
210 m_afarm->detectorId(detector_id);
211 m_aff_th.reset(new boost::thread(boost::ref(*m_afarm)));
212 }
213 if (m_opto) {
214 m_ofarm->runNumber(run_number);
215 m_ofarm->detectorId(detector_id);
216 m_off_th.reset(new boost::thread(boost::ref(*m_ofarm)));
217 }
218
219 // launch the DFInterface threads
220 if (m_acou) m_adfi_th.reset(new boost::thread(boost::ref(*m_adfi)));
221 if (m_opto) m_odfi_th.reset(new boost::thread(boost::ref(*m_odfi)));
222
223 // launch the DataInputInterface
224 m_input->start();
225 }
226
227 virtual void actionStop(int length, const char* buffer)
228 {
229 // stop the DataInputInterface
230 m_input->stop();
231
232 // stop the FrameFarms
233 if (m_opto) m_ofarm->stop();
234 if (m_acou) m_afarm->stop();
235
236 // stop the DataFilter interfaces
237 if (m_opto) m_odfi->stop();
238 if (m_acou) m_adfi->stop();
239
240 // join the DataFilter interfaces
241 if (m_opto) m_odfi_th->join();
242 if (m_acou) m_adfi_th->join();
243
244 // join the FramFarms
245 if (m_opto) m_off_th->join();
246 if (m_acou) m_aff_th->join();
247
248 // call reset to go in stand-by
249 actionQuit(length, buffer);
250
251 // reset the DataFilter interfaces threads
252 m_odfi_th.reset();
253 m_adfi_th.reset();
254
255 // reset the FrameFarms threads
256 m_off_th.reset();
257 m_aff_th.reset();
258 }
259
260 virtual void actionPause (int length, const char* buffer)
261 {
262 m_input->pause();
263 }
264
265 virtual void actionContinue (int length, const char* buffer)
266 {
267 m_input->cont();
268 }
269
270 virtual void actionRunning()
271 {
272 monitor();
273 }
274
275 virtual void actionSelect(const JFileDescriptorMask& /*mask*/)
276 {
277 monitor();
278 }
279
280 virtual void actionInput(int length, const char* buffer)
281 {
282 // this to preserve future compatibility
283 this->JDAQClient::actionInput(length, buffer);
284
285 setLogLevel(this->logger.getLevel());
286 }
287
288 const JNET::JTag & clientTag() const
289 {
290 static const JNET::JTag tag;
291
292 if (m_acou && m_opto)
293 {
294 return RC_DQUEUE;
295 }
296 else if (m_acou)
297 {
298 return RC_DQUEUE_ACS;
299 }
300 else if (m_opto)
301 {
302 return RC_DQUEUE_OPT;
303 } else
304 {
305 return tag;
306 }
307
308 //assert(!"No DQ mode set.");
309 }
310};
311
312} // ns KM3NETDAQ
313
314int main(int argc, char* argv[])
315{
319
320 std::string server("localhost");
321 std::string logger("localhost");
322 std::string client_name("DataQueue");
323 int debug = 0;
324
325 po::options_description desc("Options");
326 desc.add_options()
327 ("help,h", "Print this help and exit.")
328 ("version,v", "Print the version and exit.")
329 ("optical,o", "Set the optical mode.")
330 ("acoustic,a", "Set the acoustic mode.")
331 (",H",
332 po::value<std::string>(&server)->default_value(server),
333 "Set the address of the SM server.")
334 (",M",
335 po::value<std::string>(&logger)->default_value(logger),
336 "Set the address of the logger server.")
337 (",u",
338 po::value<std::string>(&client_name)->default_value(client_name),
339 "Set the address of the client name.")
340 (",d",
341 po::value<int>(&debug)->default_value(debug),
342 "Set the debug level.");
343
344 bool acou = false, opto = false;
345
346 try
347 {
348 po::variables_map vm;
349 po::store(
350 po::command_line_parser(argc, argv).options(desc).run(),
351 vm);
352
353 if (vm.count("help"))
354 {
355 std::cout << desc << std::endl;
356 return EXIT_SUCCESS;
357 }
358
359 if (vm.count("version"))
360 {
361 std::cout << dataqueue::version::v() << std::endl;
362 return EXIT_SUCCESS;
363 }
364
365 po::notify(vm);
366
367 opto = vm.count("optical");
368
369 acou = vm.count("acoustic");
370
371 if (! (acou || opto))
372 {
373 throw std::runtime_error("FATAL: no mode specified. Use -o, -a or both. See the help.");
374 }
375 }
376 catch (const po::error& e)
377 {
378 std::cerr << "DataQueue: Error: " << e.what() << '\n'
379 << desc << std::endl;
380 return EXIT_FAILURE;
381 }
382 catch (const std::runtime_error& e)
383 {
384 std::cerr << "DataQueue: Error: " << e.what() << '\n'
385 << desc << std::endl;
386 return EXIT_FAILURE;
387 }
388
389 // Call to singleton in a thread-safe environment
390
392
393 // Don't delete this object. It will be handled by the DataQueue class.
396 new JLOGGER::JControlHostLogger(logger),
397 client_name,
398 debug);
399
400 // Thread-safe logger for the threads different from main
401 initLogger(*log);
402
403 KM3NETDAQ::DataQueue dqueue(client_name, server, log, debug, acou, opto);
404
405 dqueue.setClockInterval(30*1000*1000);
406
407 dqueue.enter();
408 dqueue.run();
409}
void setLogLevel(int level)
Definition jlog.cc:43
int main(int argc, char *argv[])
Definition DataQueue.cc:314
static void monitor()
Definition DataQueue.cc:32
void initLogger(JLOGGER::JMessageLoggerThreadSafe const &second)
Definition jlog.cc:38
Fixed parameters and ControlHost tags for KM3NeT DAQ.
General purpose message reporting.
int debug
debug level
Definition JSirene.cc:72
static boost::atomic< unsigned int > n_obj
static InBufferCollector & getCollector()
int getDetectorID() const
Get detector identifier.
Definition JDAQCHSM.hh:100
int getRunNumber() const
Get run number.
Definition JDAQCHSM.hh:111
int run_number
Definition JDAQCHSM.hh:167
int detector_id
Definition JDAQCHSM.hh:166
std::string name
Definition JDAQCHSM.hh:165
JDAQStateMachine::ev_configure_event ev_configure
Auxiliary class for method select.
Message logging based on ControlHost.
Interface for logging messages.
Definition JLogger.hh:22
Thread-safe message logger.
JLevel_t getLevel()
Get debug level.
ControlHost tag.
Definition JTag.hh:38
virtual void actionExit()
Definition DataQueue.cc:84
DataQueue(const std::string &name, const std::string &server, JLogger *logger, const int level, bool acou, bool opto)
Definition DataQueue.cc:66
boost::scoped_ptr< FrameFarm > m_afarm
Definition DataQueue.cc:49
boost::scoped_ptr< DFInterface > m_odfi
Definition DataQueue.cc:56
boost::scoped_ptr< RecipientsHandler > m_orecipients
Definition DataQueue.cc:52
boost::scoped_ptr< boost::thread > m_adfi_th
Definition DataQueue.cc:61
virtual void actionRunning()
This method is repeatedly called when this client machine is in state Running and the clock interval ...
Definition DataQueue.cc:270
virtual void actionContinue(int length, const char *buffer)
Definition DataQueue.cc:265
const JNET::JTag & clientTag() const
Definition DataQueue.cc:288
virtual void actionPause(int length, const char *buffer)
Definition DataQueue.cc:260
virtual void actionConfigure(int length, const char *buffer)
Definition DataQueue.cc:89
boost::scoped_ptr< boost::thread > m_odfi_th
Definition DataQueue.cc:62
virtual void actionQuit(int length, const char *buffer)
Definition DataQueue.cc:179
virtual void actionReset(int length, const char *buffer)
Definition DataQueue.cc:87
virtual void actionEnter()
Interface methods for actions corresponding to state transitions.
Definition DataQueue.cc:83
boost::scoped_ptr< DataInputInterface > m_input
Definition DataQueue.cc:47
virtual void actionSelect(const JFileDescriptorMask &)
Action method following last select call.
Definition DataQueue.cc:275
boost::scoped_ptr< FrameFarm > m_ofarm
Definition DataQueue.cc:50
virtual void actionInit(int length, const char *buffer)
Definition DataQueue.cc:86
virtual void actionInput(int length, const char *buffer)
This method is called at ev_input.
Definition DataQueue.cc:280
boost::scoped_ptr< boost::thread > m_aff_th
Definition DataQueue.cc:59
boost::scoped_ptr< boost::thread > m_off_th
Definition DataQueue.cc:60
virtual void actionStart(int length, const char *buffer)
Definition DataQueue.cc:197
boost::scoped_ptr< DFInterface > m_adfi
Definition DataQueue.cc:55
boost::scoped_ptr< RecipientsHandler > m_arecipients
Definition DataQueue.cc:53
virtual void actionStop(int length, const char *buffer)
Definition DataQueue.cc:227
Control unit client base class.
JSharedPointer< JControlHost > server
message server
virtual void actionInput(int length, const char *buffer) override
This method is called at ev_input.
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
void run()
Run as run control client following command messages via JNET::JControlHost.
JMessageLogger logger
message logger
virtual bool enter(const JArgs &args)
Enter the state machine.
void reset()
Definition log.hh:73
static Counter & get()
Definition log.hh:53
static boost::atomic< unsigned int > n_obj
void __debug_abort_on_wrong_size_(size_t size)
Definition debug_abrt.hh:11
#define LOG_NOTICE
Definition log.hh:112
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
static const JNET::JTag RC_DQUEUE_OPT
Definition JDAQTags.hh:76
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
static const JNET::JTag RC_DQUEUE
Definition JDAQTags.hh:74
static const JNET::JTag RC_DQUEUE_ACS
Definition JDAQTags.hh:75
std::vector< T > vectorize(const std::string &str)
Definition configure.hh:60
boost::property_tree::ptree parse(std::string str)
Definition configure.hh:24
void setClockInterval(const long long int interval_us)
Set interval time.