Jpp  16.0.0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
saDataQueue.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <vector>
3 #include <string>
4 
5 #include <boost/thread.hpp>
6 #include <boost/ref.hpp>
7 #include <boost/lexical_cast.hpp>
8 #include <boost/program_options.hpp>
9 
10 /**
11  * \author cpellegrino
12  */
13 
14 namespace po = boost::program_options;
15 
18 
20 
21 #include "debug_abrt.hh"
22 #include "log.hh"
23 #include "version.hpp"
24 
25 #include <sys/select.h>
26 #include <unistd.h>
27 
28 static int wait_cin_for(timeval tv)
29 {
30  fd_set set;
31  FD_ZERO(&set);
32  FD_SET(0, &set);
33  const int val = select(1, &set, 0, 0, &tv);
34 
35  return val <= 0 ? 0 : val;
36 }
37 
38 const static unsigned int no_port = 65537;
39 
40 int main(int argc, char* argv[])
41 {
42  __debug_abort_on_wrong_size_<CLBCommonHeader>(40);
43  __debug_abort_on_wrong_size_<DAQCommonHeader>(56);
44  __debug_abort_on_wrong_size_<UTCTime>(8);
45 
46  unsigned int ts_duration = 100;
47  int run_number = -1;
48  int detector_id = 0;
49  std::vector<std::string> opto_recipients;
50  std::string acou_recipient;
51  std::string roy_server;
52  unsigned int acoustic_port = no_port;
53  unsigned int optical_port = no_port;
54 
55  std::string file_prefix("dump_file"), file_postfix(".dqd");
56  std::string roy_setup;
57 
58  std::size_t dump_size = 1024 * 1024 * 1024; // 1GB
59 
60  po::options_description desc("Options");
61  desc.add_options()
62  ("help,h", "Print this help and exit.")
63  ("version,v", "Print the version and exit.")
64  ("optical,o",
65  po::value<unsigned int>(&optical_port),
66  "Set the port to listen for optical data.")
67  ("acoustic,a",
68  po::value<unsigned int>(&acoustic_port),
69  "Set the port to listen for acoustic data.")
70 
71  ("timeslice,t",
72  po::value<unsigned int>(&ts_duration)->required(),
73  "Set the value of the time slice duration in milliseconds.")
74 
75  ("maxdumpsize",
76  po::value<std::size_t>(&dump_size)->default_value(dump_size),
77  "Set the maximum size of the dump file.")
78 
79  ("prefix",
80  po::value<std::string>(&file_prefix)->default_value(file_prefix),
81  "Set the dump file name prefix.")
82 
83  ("postfix",
84  po::value<std::string>(&file_postfix)->default_value(file_postfix),
85  "Set the dump file name postfix.")
86 
87  /*("royweb,r",
88  po::value<std::string>(&roy_server)->implicit_value(
89  "hitrate_:localhost:9999"),
90  "Sends the monitoring hit rates to the specified ROyWeb \
91 server. The syntax is tag_prefix:server_ip:server_port.")*/
92 
93  ("optical-recipients",
94  po::value<std::vector<std::string> >(&opto_recipients)->multitoken(),
95  "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.")
96 
97  ("acoustic-recipient",
98  po::value<std::string>(&acou_recipient),
99  "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.")
100 
101  ("run-number,r",
102  po::value<int>(&run_number)->default_value(run_number),
103  "Set the run-number. If it is set, data not belonging to the specified run will be discarded.")
104  ("detector-id,i",
105  po::value<int>(&detector_id)->default_value(detector_id),
106  "Set the detector id.");
107 
108  bool acou = false;
109  bool opto = false;
110 // bool uses_roy = false;
111 
112 
113  try
114  {
115  po::variables_map vm;
116  po::store(
117  po::command_line_parser(argc, argv).options(desc).run(),
118  vm);
119 
120  if (vm.count("help"))
121  {
122  std::cout << desc << std::endl;
123  return EXIT_SUCCESS;
124  }
125 
126  if (vm.count("version"))
127  {
128  std::cout << dataqueue::version::v() << std::endl;
129  return EXIT_SUCCESS;
130  }
131 
132  po::notify(vm);
133 
134  opto = vm.count("optical");
135 
136  acou = vm.count("acoustic");
137 
138  if (! (acou || opto))
139  {
140  throw std::runtime_error("FATAL: Both acoustic and optical port missing.");
141  }
142 
143  if (acou && !vm.count("acoustic-recipient"))
144  {
145  throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified.");
146  }
147 
148  if (opto && !vm.count("optical-recipients"))
149  {
150  throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified.");
151  }
152 
153  /*if (vm.count("royweb"))
154  {
155  if (!moni)
156  {
157  throw std::runtime_error("you can use ROyWeb only with the \
158 monitoring channel");
159  }
160 
161  uses_roy = true;
162  std::replace(roy_setup.begin(), roy_setup.end(), ':', ' ');
163  std::istringstream ss(roy_setup);
164  int param_count = 0;
165  if (ss >> tagprefix)
166  {
167  ++param_count;
168  }
169  if (ss >> roy_server)
170  {
171  ++param_count;
172  }
173  if (ss >> roy_port)
174  {
175  ++param_count;
176  }
177 
178  if (param_count != 3)
179  {
180  throw std::runtime_error("you must specify all the parameters \
181 or accept all the default one to use with ROyWeb.");
182  }
183  }*/
184  }
185  catch (const po::error& e)
186  {
187  std::cerr << "DataQueue: Error: " << e.what() << '\n'
188  << desc << std::endl;
189  return EXIT_FAILURE;
190  }
191  catch (const std::runtime_error& e)
192  {
193  std::cerr << "DataQueue: Error: " << e.what() << '\n'
194  << desc << std::endl;
195  return EXIT_FAILURE;
196  }
197 
198  // Call to singleton in a thread-safe environment
199 
201 
202  DFInterface* aDFI = 0;
203  DFInterface* oDFI = 0;
204  boost::thread* acou_thread = 0;
205  boost::thread* opto_thread = 0;
206 
207  RecipientsHandler acouRecipients(10);
208  RecipientsHandler optoRecipients(10);
209 
210  if (opto)
211  {
212  for (std::vector<std::string>::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it)
213  {
214  optoRecipients.add(*it);
215  }
216  }
217 
218  if (acou)
219  {
220  acouRecipients.add(acou_recipient);
221  }
222 
223  FrameFarm* aFarm = 0;
224  FrameFarm* oFarm = 0;
225 
226  boost::thread* farm_threads[2] = {0, 0};
227 
228  DataInputInterface doms_interface(0);
229 
230  if (acou)
231  {
232  std::cout << "Acoustics on\n";
233  aFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_a", file_postfix);
234  aFarm->runNumber(run_number);
235  aFarm->detectorId(detector_id);
236 
237  farm_threads[1] = new boost::thread(boost::ref(*aFarm));
238  aDFI = new DFInterface(*aFarm, acouRecipients);
239  acou_thread = new boost::thread(boost::ref(*aDFI));
240  doms_interface.add_channel(acoustic_port, *aFarm);
241  doms_interface.add_worker();
242  }
243 
244  if (opto)
245  {
246  std::cout << "Optics on\n";
247  oFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_o", file_postfix);
248  oFarm->runNumber(run_number);
249  oFarm->detectorId(detector_id);
250 
251  farm_threads[0] = new boost::thread(boost::ref(*oFarm));
252  oDFI = new DFInterface(*oFarm, optoRecipients);
253  opto_thread = new boost::thread(boost::ref(*oDFI));
254  doms_interface.add_channel(optical_port, *oFarm);
255  doms_interface.add_worker();
256  }
257 
258  doms_interface.start();
259 
260  // Wait for an external stop
261 
262  std::cout << "Hit \'q\' and press [Return] to exit\n";
263 
264  char ch = 0;
265 
266  do
267  {
268  const timeval timeout = {10, 0};
269  if (wait_cin_for(timeout))
270  {
271  std::cin >> ch;
272  fflush(stdin);
273  }
274 
275  std::cout << "Number of pframes: " << PuzzledFrame::n_obj << '\n'
276  << "Number of datagrams: " << CLBDataGram::n_obj << '\n'
277  << Log::Counter::get();
278  } while (ch != 'q');
279 
280  std::cout << "Closing DataQueue\n";
281 
282  // Stopping input;
283 
284  doms_interface.stop();
285 
286  std::cout << "DOMs interface closed\n";
287 
288  // Stopping internal data management system
289 
290  if (oFarm)
291  oFarm->stop();
292 
293  if (aFarm)
294  aFarm->stop();
295 
296  std::cout << "Farms closed\n";
297 
298  // Stopping sending data
299 
300  if (acoustic_port != no_port)
301  {
302  aDFI->stop();
303  acou_thread->join();
304  delete aDFI;
305  delete acou_thread;
306  }
307 
308  if (optical_port != no_port)
309  {
310  oDFI->stop();
311  opto_thread->join();
312  delete oDFI;
313  delete opto_thread;
314  }
315 
316  std::cout << "DataFilter Interfaces stopped\n";
317 
318  if (oFarm)
319  farm_threads[0]->join();
320 
321  if (aFarm)
322  farm_threads[1]->join();
323 
324  std::cout << "Farms returned\n";
325 
326  delete aFarm;
327  delete oFarm;
328  delete farm_threads[0];
329  delete farm_threads[1];
330 
331  std::cout << "Bye bye\n";
332 }
void add_channel(unsigned short port, FrameFarm &farm)
int main(int argc, char *argv[])
Definition: Main.cc:15
int runNumber() const
Definition: frame_farm.hh:87
unsigned int detectorId(unsigned int detector_id)
Definition: frame_farm.hh:97
bool add(const std::string &id)
static Counter & get()
Definition: log.hh:53
void stop()
Definition: frame_farm.hh:72
static int wait_cin_for(timeval tv)
Definition: saDataQueue.cc:28
static const unsigned int no_port
Definition: saDataQueue.cc:38
then usage $script[port]< option > nPossible options
static boost::atomic< unsigned int > n_obj
Definition: clb_datagram.hh:35
void store(const std::string &file_name, const JDetector &detector)
Store detector to output file.
void stop()
Definition: DFInterface.hh:32
static InBufferCollector & getCollector()
data_type v[N+1][M+1]
Definition: JPolint.hh:756
static boost::atomic< unsigned int > n_obj