Jpp master_rocky-44-g75b7c4f75
the software that should make you happy
Loading...
Searching...
No Matches
AcousticDataFilter.cc
Go to the documentation of this file.
2#ifdef DEBUG_ACOUSTICDATAFILTER
3#include <stdlib.h>
4#include <iostream>
5#include <boost/date_time/posix_time/posix_time.hpp>
7#define DEBUG_MSG(str,lev) do { if (((uint8_t)lev)>=AcousticDataFilter::DEBUG_LEVEL) { DBG_MSG << boost::posix_time::microsec_clock::local_time() << " ACOUSTICDATAFILTER(" << (void*)this << "): "<< str << '\n'; } }while( false )
8#else
9#define DEBUG_MSG(str,lev) do { } while ( false )
10#endif
11
12/**
13 * \author fsimeone
14 */
15
16class Parser
17{
18public:
19 Parser(std::string d,char t)
20 {
21 std::istringstream iss(d);
22 std::string token;
23
24 while (std::getline(iss, token, t))
25 {
26 boost::algorithm::trim(token);
27 size_t pos = token.find('=');
28 std::string key=token.substr(0, pos);
29 std::string value=token.substr(pos+1);
30 boost::algorithm::trim(key);
31 boost::algorithm::trim(value);
33 }
34 }
35
37 {
39 std::pair <std::multimap<std::string,std::string>::iterator, std::multimap<std::string,std::string>::iterator> i;
40 i = mmap.equal_range(tag);
41 for (std::multimap<std::string,std::string>::iterator it=i.first; it!=i.second; ++it)
42 {
43 r.push_back(it->second);
44 }
45
46 return r;
47 }
48
49 std::string Find(std::string tag, size_t index, std::string value = "")
50 {
51 std::vector<std::string> result = Find(tag);
52
53 if (result.size() > index)
54 return result[index];
55 else
56 return value;
57 }
58
59private:
61};
62
63AcousticDataFilter::AcousticDataFilter( const std::string& address,\
64 const std::string& port,\
65 const std::string& name,\
66 const std::string& server,\
67 const std::string& wisdom,\
68 const std::string& output,\
69 const std::string& raw,\
70 JLOGGER::JLogger* logger,\
71 const int level): KM3NETDAQ::JDAQClient(name,server,logger,level),\
72 server_acceptor_(server_io_service_),
73 strand_(trigger_io_service_)
74{
75 DEBUG_MSG("Constructor",0);
76
78
79 address_=address;
80 port_=port;
81 exit_=false;
82 running_=false;
83 wisdom_file_=wisdom;
84 toa_file_path_=output;
86 Start();
87}
88
90{
91 DEBUG_MSG("Destructor",0);
92 Stop();
93}
94
96{
97 DEBUG_MSG("Start",2);
98 server_work_ = new boost::asio::io_service::work(server_io_service_);
99 trigger_work_ = new boost::asio::io_service::work(trigger_io_service_);
100
101 boost::asio::ip::tcp::resolver l_resolver(server_io_service_);
102 boost::asio::ip::tcp::resolver::query l_query(address_,port_);
103 boost::asio::ip::tcp::endpoint l_endpoint = *l_resolver.resolve(l_query);
104 server_acceptor_.open(l_endpoint.protocol());
105 server_acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
106 server_acceptor_.bind(l_endpoint);
107 server_acceptor_.listen();
108
109 for (std::size_t i=0; i<SERVER_THREAD_POOL; ++i) { server_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&server_io_service_)); }
110 for (std::size_t i=0; i<TRIGGER_THREAD_POOL; ++i) { trigger_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&trigger_io_service_)); }
111 strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
112}
113
115{
116 DEBUG_MSG("Stop",2);
117
118 server_acceptor_.close();
119 delete server_work_;
120
122 for (i=links_.begin(); i!=links_.end(); ++i){ (*i)->Disconnect(); }
123 server_io_service_.stop();
124 server_threads_.join_all();
125 server_io_service_.reset();
126
127 delete trigger_work_;
128 trigger_io_service_.stop();
129 trigger_threads_.join_all();
130 trigger_io_service_.reset();
131
132 links_.clear();
133 doms_.clear();
134 probes_.clear();
135
136 if (toa_file_.is_open()) toa_file_.close();
137}
138
140{
141 DEBUG_MSG("Start accept",2);
143 links_.push_back(l_link);
144 server_acceptor_.async_accept(l_link->Socket(), boost::bind(&AcousticDataFilter::Accept_completed, this, l_link, boost::asio::placeholders::error));
145}
146
147void AcousticDataFilter::Accept_completed(pLink link, const boost::system::error_code& error)
148{
149 DEBUG_MSG("Accept completed",2);
150
151 if (!error)
152 {
153 link->Start_reading();
154 strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
155 }
156}
157
158void AcousticDataFilter::actionInit(int length, const char* buffer)
159{
160 DEBUG_MSG("SM:Init",2);
161 FILE* pfile;
162 pfile=fopen(wisdom_file_.c_str(),"r");
163 if (pfile!=NULL)
164 {
165#ifdef TOAFINDER_USE_FLOAT
166 if (fftwf_import_wisdom_from_file(pfile)==1)
167#else
168 if (fftw_import_wisdom_from_file(pfile)==1)
169#endif
170 {
171 DEBUG_MSG("SM:Init wisdom imported",2);
172 }
173 fclose(pfile);
174 }
175}
176
177void AcousticDataFilter::actionReset(int length, const char* buffer)
178{
179 DEBUG_MSG("SM:Reset",2);
180 FILE* pfile;
181 pfile=fopen(wisdom_file_.c_str(),"w");
182 if (pfile!=NULL)
183 {
184DEBUG_MSG("SM:Reset wisdom saved",2);
185#ifdef TOAFINDER_USE_FLOAT
186 fftwf_export_wisdom_to_file(pfile);
187#else
188 fftw_export_wisdom_to_file(pfile);
189#endif
190 fclose(pfile);
191 }
192 links_.clear();
193 doms_.clear();
194 probes_.clear();
195 if (toa_file_.is_open()) toa_file_.close();
196}
197
198void AcousticDataFilter::actionConfigure(int length, const char* buffer)
199{
200 DEBUG_MSG("SM:Configure",2);
201 Parser dummy(std::string(buffer, length),';');
202 DEBUG_MSG("SM:Cerco",2);
203
204 std::string l_debug_file_path;
205 std::string l_window;
206 std::string l_overlap;
207 std::vector<std::string> l_toa_path;
208 std::vector<std::string> l_dbg_path;
210 std::vector<std::string> l_dom_configuration;
211
212 l_window = dummy.Find("ADF_analysis_window_size", 0);
213 l_overlap = dummy.Find("ADF_analysis_window_overlap", 0);
214 l_probes = dummy.Find("ADF_waveform");
215 l_dom_configuration = dummy.Find("ADF_DOM_configuration");
216
217 l_toa_path=dummy.Find("ADF_TOA_path");
218 if (l_toa_path.size()!=0) toa_file_path_=l_toa_path[0];
219 l_dbg_path=dummy.Find("ADF_DBG_path");
220 if (l_dbg_path.size()!=0) debug_file_path_=l_dbg_path[0];
221
222 DEBUG_MSG("SM:Configure. win="<<l_window,2);
223 DEBUG_MSG("SM:Configure. ovr="<<l_overlap,2);
224 DEBUG_MSG("SM:Configure. TOA_path="<<toa_file_path_,2);
225 DEBUG_MSG("SM:Configure. DBG_path="<<debug_file_path_,2);
226
227 DEBUG_MSG("SM:Configure. n dom="<<l_dom_configuration.size(),2);
228 DEBUG_MSG("SM:Configure. n pro="<<l_probes.size(),2);
229
230 Create_probes(l_probes,atoi(l_window.c_str()));
231 Create_DOMs(l_dom_configuration,debug_file_path_,atoi(l_window.c_str()),atoi(l_overlap.c_str()));
232
233
234
235}
236
237void AcousticDataFilter::actionStart(int length, const char* buffer)
238{
239 DEBUG_MSG("SM start",2);
240 running_=true;
241
242}
243
245{
246 DEBUG_MSG("Create probes",1);
247
248 for(size_t t=0;t<probes.size();++t)
249 {
250 std::istringstream l_iss(probes.at(t));
251 std::string l_line;
252 uint32_t l_id;
253 uint32_t l_samples;
254 real_type* l_buffer;
255
256 std::getline(l_iss, l_line,' ');
257 l_id=atoi(l_line.c_str());
258 DEBUG_MSG("id probe="<<l_id,1);
259 std::getline(l_iss, l_line,' ');
260 l_samples=atoi(l_line.c_str());
261 DEBUG_MSG("n samples="<<l_samples,1);
262 l_buffer=(real_type*)malloc(l_samples*sizeof(real_type));
263 for (uint32_t i=0; i<l_samples; ++i)
264 {
265 std::getline(l_iss, l_line,' ');
266 real_type l_dummy=atof(l_line.c_str());
267 DEBUG_MSG("sample="<<l_dummy,0);
268 l_buffer[i]=l_dummy;
269 }
270 DEBUG_MSG("creating probes",1);
271 pProbe l_probe = boost::make_shared<KM3::TOALIB::CTOAFinder::probeSignalT>(*(new KM3::TOALIB::CTOAFinder::probeSignalT(window)));
272 DEBUG_MSG("copying data",1);
273 l_probe->copyFromBuffer(l_buffer,l_samples);
274 DEBUG_MSG("insert probe",1);
275 probes_.insert(probe_entry(l_id,l_probe));
276 }
277 DEBUG_MSG("Probes creation finished",1);
278}
279
280void AcousticDataFilter::Create_DOMs(std::vector<std::string> dom_configuration,std::string& debug_file_path,std::size_t window,std::size_t overlap)
281{
282 DEBUG_MSG("Create DOM",1);
283
284 for(size_t t=0;t<dom_configuration.size();++t)
285 {
286 std::istringstream l_iss(dom_configuration.at(t));
287 std::string l_line;
288 uint32_t l_dom_id;
289 uint32_t l_probe_id;
290 uint32_t l_threshold;
291 uint32_t l_waveforms;
292 uint32_t l_test;
293
294 std::getline(l_iss, l_line,' ');
295 l_dom_id=atoi(l_line.c_str());
296 DEBUG_MSG("DOM id="<<l_dom_id,1);
297 pDOM l_dom = boost::make_shared<KM3NeT_DOM>(*(new KM3NeT_DOM( trigger_io_service_,\
298 boost::bind(&AcousticDataFilter::Send_toa_async, this, _1),\
299 debug_file_path,\
300 window,\
301 overlap)));
302 doms_.insert(dom_entry(l_dom_id,l_dom));
303 std::getline(l_iss, l_line,' ');
304 l_waveforms=atoi(l_line.c_str());
305 DEBUG_MSG("n waveforms="<<l_waveforms,1);
306 for (uint32_t i=0; i<l_waveforms; ++i)
307 {
308 std::getline(l_iss, l_line,' ');
309 l_probe_id=atoi(l_line.c_str());
310 std::getline(l_iss, l_line,' ');
311 l_threshold=atoi(l_line.c_str());
312 DEBUG_MSG("adding probe "<<l_probe_id<<" with threshold "<<l_threshold<<" to DOM "<<l_dom_id,1);
313 l_dom->Add_probe(probes_.find(l_probe_id)->second,l_probe_id,l_threshold);
314 }
315 std::getline(l_iss, l_line,' ');
316 l_test=atoi(l_line.c_str());
317 DEBUG_MSG("setting test id="<<l_test,1);
318 l_dom->Set_test(l_test);
319 }
320 DEBUG_MSG("DOMs creation finished",1);
321}
322
324{
325 if (!running_) return;
326
327 DEBUG_MSG("Sent toa from ID="<<packet.Header.DOM_Identifier,3);
328
329 if (!toa_file_.is_open()) Open_toa_file();
330
331 if (toa_file_.tellp()>MAX_TOA_SIZE)
332 {
333 DEBUG_MSG("Toa file closed",1);
334 toa_file_.close();
336 }
337
338 if (toa_file_.is_open()) toa_file_.write((char*)&packet,sizeof(ToA_Packet));
339
340}
341
343{
344 std::string l_file=toa_file_path_;
345 std::time_t seconds = std::time(0);
346 l_file+="TOA_"+std::to_string((long long unsigned int)seconds)+".bin";
347 toa_file_.open(l_file.c_str(),std::ofstream::binary);
348 DEBUG_MSG("Toa file opened at "<<seconds,1);
349}
350
351void AcousticDataFilter::actionPause(int length, const char* buffer)
352{
353 DEBUG_MSG("SM: pause",2);
354 running_=false;
355}
356
357void AcousticDataFilter::actionContinue(int length, const char* buffer)
358{
359 DEBUG_MSG("SM: continue",2);
360 running_=true;
361}
362
363void AcousticDataFilter::actionStop(int length, const char* buffer)
364{
365 DEBUG_MSG("SM:Stop",2);
366 links_.clear();
367 doms_.clear();
368 probes_.clear();
369 if (toa_file_.is_open()) toa_file_.close();
370
371
372}
373
374void AcousticDataFilter::actionQuit(int length, const char* buffer)
375{
376 DEBUG_MSG("SM:Quit",2);
377 links_.clear();
378 doms_.clear();
379 probes_.clear();
380 if (toa_file_.is_open()) toa_file_.close();
381
382
383}
384
#define DEBUG_MSG(str, lev)
#define SERVER_THREAD_POOL
#define MAX_TOA_SIZE
std::pair< std::size_t, pProbe > probe_entry
#define TRIGGER_THREAD_POOL
std::pair< std::size_t, pDOM > dom_entry
boost::shared_ptr< KM3::TOALIB::CTOAFinder::probeSignalT > pProbe
boost::shared_ptr< KM3NeT_DOM > pDOM
Definition KM3NeT_DOM.h:65
boost::asio::io_service::strand strand_
boost::asio::ip::tcp::acceptor server_acceptor_
void Create_probes(std::vector< std::string > probes, std::size_t window)
void Send_toa(ToA_Packet packet)
virtual void actionQuit(int length, const char *buffer)
boost::atomic< bool > exit_
boost::atomic< bool > running_
boost::asio::io_service::work * trigger_work_
boost::thread_group trigger_threads_
virtual void actionContinue(int length, const char *buffer)
virtual void actionConfigure(int length, const char *buffer)
std::vector< pLink > links_
virtual void actionReset(int length, const char *buffer)
std::map< std::size_t, pDOM > doms_
boost::asio::io_service::work * server_work_
virtual void actionStop(int length, const char *buffer)
boost::asio::io_service server_io_service_
boost::asio::io_service trigger_io_service_
void Create_DOMs(std::vector< std::string > doms, std::string &debug_file_path, std::size_t window, std::size_t overlap)
const JNET::JTag & clientTag() const
boost::thread_group server_threads_
virtual void actionInit(int length, const char *buffer)
AcousticDataFilter(const std::string &address, const std::string &port, const std::string &name, const std::string &server, const std::string &wisdom, const std::string &output, const std::string &raw, JLOGGER::JLogger *logger, const int level)
void Accept_completed(pLink link, const boost::system::error_code &error)
void Send_toa_async(ToA_Packet packet)
virtual void actionPause(int length, const char *buffer)
virtual void actionStart(int length, const char *buffer)
std::map< std::size_t, pProbe > probes_
JDAQStateMachine::ev_configure_event ev_configure
Interface for logging messages.
Definition JLogger.hh:22
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
std::vector< std::string > Find(std::string tag)
std::string Find(std::string tag, size_t index, std::string value="")
std::multimap< std::string, std::string > mmap
Parser(std::string d, char t)
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
DAQ_Common_Header Header