41{
45
46 unsigned int ts_duration = 100;
47 int run_number = -1;
48 int detector_id = 0;
50 std::string acou_recipient;
51 std::string roy_server;
52 unsigned int acoustic_port =
no_port;
53 unsigned int optical_port =
no_port;
54
55 std::string file_prefix("dump_file"), file_postfix(".dqd");
56 std::string roy_setup;
57
58 std::size_t dump_size = 1024 * 1024 * 1024;
59
60 po::options_description desc("Options");
61 desc.add_options()
62 ("help,h", "Print this help and exit.")
63 ("version,v", "Print the version and exit.")
64 ("optical,o",
65 po::value<unsigned int>(&optical_port),
66 "Set the port to listen for optical data.")
67 ("acoustic,a",
68 po::value<unsigned int>(&acoustic_port),
69 "Set the port to listen for acoustic data.")
70
71 ("timeslice,t",
72 po::value<unsigned int>(&ts_duration)->required(),
73 "Set the value of the time slice duration in milliseconds.")
74
75 ("maxdumpsize",
76 po::value<std::size_t>(&dump_size)->default_value(dump_size),
77 "Set the maximum size of the dump file.")
78
79 ("prefix",
80 po::value<std::string>(&file_prefix)->default_value(file_prefix),
81 "Set the dump file name prefix.")
82
83 ("postfix",
84 po::value<std::string>(&file_postfix)->default_value(file_postfix),
85 "Set the dump file name postfix.")
86
87
88
89
90
91
92
93 ("optical-recipients",
94 po::value<std::vector<std::string> >(&opto_recipients)->multitoken(),
95 "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.")
96
97 ("acoustic-recipient",
98 po::value<std::string>(&acou_recipient),
99 "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.")
100
101 ("run-number,r",
102 po::value<int>(&run_number)->default_value(run_number),
103 "Set the run-number. If it is set, data not belonging to the specified run will be discarded.")
104 ("detector-id,i",
105 po::value<int>(&detector_id)->default_value(detector_id),
106 "Set the detector id.");
107
108 bool acou = false;
109 bool opto = false;
110
111
112
113 try
114 {
115 po::variables_map vm;
116 po::store(
117 po::command_line_parser(argc, argv).options(desc).run(),
118 vm);
119
120 if (vm.count("help"))
121 {
122 std::cout << desc << std::endl;
123 return EXIT_SUCCESS;
124 }
125
126 if (vm.count("version"))
127 {
128 std::cout << dataqueue::version::v() << std::endl;
129 return EXIT_SUCCESS;
130 }
131
132 po::notify(vm);
133
134 opto = vm.count("optical");
135
136 acou = vm.count("acoustic");
137
138 if (! (acou || opto))
139 {
140 throw std::runtime_error("FATAL: Both acoustic and optical port missing.");
141 }
142
143 if (acou && !vm.count("acoustic-recipient"))
144 {
145 throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified.");
146 }
147
148 if (opto && !vm.count("optical-recipients"))
149 {
150 throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified.");
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 }
185 catch (const po::error& e)
186 {
187 std::cerr << "DataQueue: Error: " << e.what() << '\n'
188 << desc << std::endl;
189 return EXIT_FAILURE;
190 }
191 catch (const std::runtime_error& e)
192 {
193 std::cerr << "DataQueue: Error: " << e.what() << '\n'
194 << desc << std::endl;
195 return EXIT_FAILURE;
196 }
197
198
199
201
204 boost::thread* acou_thread = 0;
205 boost::thread* opto_thread = 0;
206
209
210 if (opto)
211 {
212 for (std::vector<std::string>::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it)
213 {
214 optoRecipients.add(*it);
215 }
216 }
217
218 if (acou)
219 {
220 acouRecipients.add(acou_recipient);
221 }
222
225
226 boost::thread* farm_threads[2] = {0, 0};
227
229
230 if (acou)
231 {
232 std::cout << "Acoustics on\n";
233 aFarm =
new FrameFarm(ts_duration, 0, dump_size, file_prefix +
"_a", file_postfix);
236
237 farm_threads[1] = new boost::thread(boost::ref(*aFarm));
239 acou_thread = new boost::thread(boost::ref(*aDFI));
240 doms_interface.add_channel(acoustic_port, *aFarm);
241 doms_interface.add_worker();
242 }
243
244 if (opto)
245 {
246 std::cout << "Optics on\n";
247 oFarm =
new FrameFarm(ts_duration, 0, dump_size, file_prefix +
"_o", file_postfix);
250
251 farm_threads[0] = new boost::thread(boost::ref(*oFarm));
253 opto_thread = new boost::thread(boost::ref(*oDFI));
254 doms_interface.add_channel(optical_port, *oFarm);
255 doms_interface.add_worker();
256 }
257
258 doms_interface.start();
259
260
261
262 std::cout << "Hit \'q\' and press [Return] to exit\n";
263
264 char ch = 0;
265
266 do
267 {
268 const timeval timeout = {10, 0};
270 {
271 std::cin >> ch;
272 fflush(stdin);
273 }
274
278 } while (ch != 'q');
279
280 std::cout << "Closing DataQueue\n";
281
282
283
284 doms_interface.stop();
285
286 std::cout << "DOMs interface closed\n";
287
288
289
290 if (oFarm)
292
293 if (aFarm)
295
296 std::cout << "Farms closed\n";
297
298
299
301 {
303 acou_thread->join();
304 delete aDFI;
305 delete acou_thread;
306 }
307
309 {
311 opto_thread->join();
312 delete oDFI;
313 delete opto_thread;
314 }
315
316 std::cout << "DataFilter Interfaces stopped\n";
317
318 if (oFarm)
319 farm_threads[0]->join();
320
321 if (aFarm)
322 farm_threads[1]->join();
323
324 std::cout << "Farms returned\n";
325
326 delete aFarm;
327 delete oFarm;
328 delete farm_threads[0];
329 delete farm_threads[1];
330
331 std::cout << "Bye bye\n";
332}
static boost::atomic< unsigned int > n_obj
unsigned int detectorId(unsigned int detector_id)
static InBufferCollector & getCollector()
static boost::atomic< unsigned int > n_obj
void __debug_abort_on_wrong_size_(size_t size)
static const unsigned int no_port
static int wait_cin_for(timeval tv)