Jpp test-rotations-new
the software that should make you happy
Loading...
Searching...
No Matches
saDataQueue.cc
Go to the documentation of this file.
1#include <iostream>
2#include <vector>
3#include <string>
4
5#include <boost/thread.hpp>
6#include <boost/ref.hpp>
7#include <boost/lexical_cast.hpp>
8#include <boost/program_options.hpp>
9
10/**
11 * \author cpellegrino
12 */
13
14namespace po = boost::program_options;
15
18
20
21#include "debug_abrt.hh"
22#include "log.hh"
23#include "version.hpp"
24
25#include <sys/select.h>
26#include <unistd.h>
27
28static int wait_cin_for(timeval tv)
29{
30 fd_set set;
31 FD_ZERO(&set);
32 FD_SET(0, &set);
33 const int val = select(1, &set, 0, 0, &tv);
34
35 return val <= 0 ? 0 : val;
36}
37
38const static unsigned int no_port = 65537;
39
40int main(int argc, char* argv[])
41{
45
46 unsigned int ts_duration = 100;
47 int run_number = -1;
48 int detector_id = 0;
49 std::vector<std::string> opto_recipients;
50 std::string acou_recipient;
51 std::string roy_server;
52 unsigned int acoustic_port = no_port;
53 unsigned int optical_port = no_port;
54
55 std::string file_prefix("dump_file"), file_postfix(".dqd");
56 std::string roy_setup;
57
58 std::size_t dump_size = 1024 * 1024 * 1024; // 1GB
59
60 po::options_description desc("Options");
61 desc.add_options()
62 ("help,h", "Print this help and exit.")
63 ("version,v", "Print the version and exit.")
64 ("optical,o",
65 po::value<unsigned int>(&optical_port),
66 "Set the port to listen for optical data.")
67 ("acoustic,a",
68 po::value<unsigned int>(&acoustic_port),
69 "Set the port to listen for acoustic data.")
70
71 ("timeslice,t",
72 po::value<unsigned int>(&ts_duration)->required(),
73 "Set the value of the time slice duration in milliseconds.")
74
75 ("maxdumpsize",
76 po::value<std::size_t>(&dump_size)->default_value(dump_size),
77 "Set the maximum size of the dump file.")
78
79 ("prefix",
80 po::value<std::string>(&file_prefix)->default_value(file_prefix),
81 "Set the dump file name prefix.")
82
83 ("postfix",
84 po::value<std::string>(&file_postfix)->default_value(file_postfix),
85 "Set the dump file name postfix.")
86
87 /*("royweb,r",
88 po::value<std::string>(&roy_server)->implicit_value(
89 "hitrate_:localhost:9999"),
90 "Sends the monitoring hit rates to the specified ROyWeb \
91server. The syntax is tag_prefix:server_ip:server_port.")*/
92
93 ("optical-recipients",
94 po::value<std::vector<std::string> >(&opto_recipients)->multitoken(),
95 "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.")
96
97 ("acoustic-recipient",
98 po::value<std::string>(&acou_recipient),
99 "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.")
100
101 ("run-number,r",
102 po::value<int>(&run_number)->default_value(run_number),
103 "Set the run-number. If it is set, data not belonging to the specified run will be discarded.")
104 ("detector-id,i",
105 po::value<int>(&detector_id)->default_value(detector_id),
106 "Set the detector id.");
107
108 bool acou = false;
109 bool opto = false;
110// bool uses_roy = false;
111
112
113 try
114 {
115 po::variables_map vm;
116 po::store(
117 po::command_line_parser(argc, argv).options(desc).run(),
118 vm);
119
120 if (vm.count("help"))
121 {
122 std::cout << desc << std::endl;
123 return EXIT_SUCCESS;
124 }
125
126 if (vm.count("version"))
127 {
128 std::cout << dataqueue::version::v() << std::endl;
129 return EXIT_SUCCESS;
130 }
131
132 po::notify(vm);
133
134 opto = vm.count("optical");
135
136 acou = vm.count("acoustic");
137
138 if (! (acou || opto))
139 {
140 throw std::runtime_error("FATAL: Both acoustic and optical port missing.");
141 }
142
143 if (acou && !vm.count("acoustic-recipient"))
144 {
145 throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified.");
146 }
147
148 if (opto && !vm.count("optical-recipients"))
149 {
150 throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified.");
151 }
152
153 /*if (vm.count("royweb"))
154 {
155 if (!moni)
156 {
157 throw std::runtime_error("you can use ROyWeb only with the \
158monitoring channel");
159 }
160
161 uses_roy = true;
162 std::replace(roy_setup.begin(), roy_setup.end(), ':', ' ');
163 std::istringstream ss(roy_setup);
164 int param_count = 0;
165 if (ss >> tagprefix)
166 {
167 ++param_count;
168 }
169 if (ss >> roy_server)
170 {
171 ++param_count;
172 }
173 if (ss >> roy_port)
174 {
175 ++param_count;
176 }
177
178 if (param_count != 3)
179 {
180 throw std::runtime_error("you must specify all the parameters \
181or accept all the default one to use with ROyWeb.");
182 }
183 }*/
184 }
185 catch (const po::error& e)
186 {
187 std::cerr << "DataQueue: Error: " << e.what() << '\n'
188 << desc << std::endl;
189 return EXIT_FAILURE;
190 }
191 catch (const std::runtime_error& e)
192 {
193 std::cerr << "DataQueue: Error: " << e.what() << '\n'
194 << desc << std::endl;
195 return EXIT_FAILURE;
196 }
197
198 // Call to singleton in a thread-safe environment
199
201
202 DFInterface* aDFI = 0;
203 DFInterface* oDFI = 0;
204 boost::thread* acou_thread = 0;
205 boost::thread* opto_thread = 0;
206
207 RecipientsHandler acouRecipients(10);
208 RecipientsHandler optoRecipients(10);
209
210 if (opto)
211 {
212 for (std::vector<std::string>::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it)
213 {
214 optoRecipients.add(*it);
215 }
216 }
217
218 if (acou)
219 {
220 acouRecipients.add(acou_recipient);
221 }
222
223 FrameFarm* aFarm = 0;
224 FrameFarm* oFarm = 0;
225
226 boost::thread* farm_threads[2] = {0, 0};
227
228 DataInputInterface doms_interface(0);
229
230 if (acou)
231 {
232 std::cout << "Acoustics on\n";
233 aFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_a", file_postfix);
234 aFarm->runNumber(run_number);
235 aFarm->detectorId(detector_id);
236
237 farm_threads[1] = new boost::thread(boost::ref(*aFarm));
238 aDFI = new DFInterface(*aFarm, acouRecipients);
239 acou_thread = new boost::thread(boost::ref(*aDFI));
240 doms_interface.add_channel(acoustic_port, *aFarm);
241 doms_interface.add_worker();
242 }
243
244 if (opto)
245 {
246 std::cout << "Optics on\n";
247 oFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_o", file_postfix);
248 oFarm->runNumber(run_number);
249 oFarm->detectorId(detector_id);
250
251 farm_threads[0] = new boost::thread(boost::ref(*oFarm));
252 oDFI = new DFInterface(*oFarm, optoRecipients);
253 opto_thread = new boost::thread(boost::ref(*oDFI));
254 doms_interface.add_channel(optical_port, *oFarm);
255 doms_interface.add_worker();
256 }
257
258 doms_interface.start();
259
260 // Wait for an external stop
261
262 std::cout << "Hit \'q\' and press [Return] to exit\n";
263
264 char ch = 0;
265
266 do
267 {
268 const timeval timeout = {10, 0};
269 if (wait_cin_for(timeout))
270 {
271 std::cin >> ch;
272 fflush(stdin);
273 }
274
275 std::cout << "Number of pframes: " << PuzzledFrame::n_obj << '\n'
276 << "Number of datagrams: " << CLBDataGram::n_obj << '\n'
278 } while (ch != 'q');
279
280 std::cout << "Closing DataQueue\n";
281
282 // Stopping input;
283
284 doms_interface.stop();
285
286 std::cout << "DOMs interface closed\n";
287
288 // Stopping internal data management system
289
290 if (oFarm)
291 oFarm->stop();
292
293 if (aFarm)
294 aFarm->stop();
295
296 std::cout << "Farms closed\n";
297
298 // Stopping sending data
299
300 if (acoustic_port != no_port)
301 {
302 aDFI->stop();
303 acou_thread->join();
304 delete aDFI;
305 delete acou_thread;
306 }
307
308 if (optical_port != no_port)
309 {
310 oDFI->stop();
311 opto_thread->join();
312 delete oDFI;
313 delete opto_thread;
314 }
315
316 std::cout << "DataFilter Interfaces stopped\n";
317
318 if (oFarm)
319 farm_threads[0]->join();
320
321 if (aFarm)
322 farm_threads[1]->join();
323
324 std::cout << "Farms returned\n";
325
326 delete aFarm;
327 delete oFarm;
328 delete farm_threads[0];
329 delete farm_threads[1];
330
331 std::cout << "Bye bye\n";
332}
static boost::atomic< unsigned int > n_obj
void add_channel(unsigned short port, FrameFarm &farm)
void stop()
Definition frame_farm.hh:72
int runNumber() const
Definition frame_farm.hh:87
unsigned int detectorId(unsigned int detector_id)
Definition frame_farm.hh:97
static InBufferCollector & getCollector()
static Counter & get()
Definition log.hh:53
static boost::atomic< unsigned int > n_obj
bool add(const std::string &id)
void __debug_abort_on_wrong_size_(size_t size)
Definition debug_abrt.hh:11
int main(int argc, char *argv[])
static const unsigned int no_port
static int wait_cin_for(timeval tv)