Jpp 19.3.0
the software that should make you happy
Loading...
Searching...
No Matches
AcousticDataFilter.cc
Go to the documentation of this file.
2#include "Jeep/JMessage.hh"
3#include "parser.h"
4#ifdef DEBUG_ACOUSTICDATAFILTER
5#include <stdlib.h>
6#include <iostream>
7#include <boost/date_time/posix_time/posix_time.hpp>
9#define DEBUG_MSG(str,lev) do { if (((uint8_t)lev)>=AcousticDataFilter::DEBUG_LEVEL) { DBG_MSG << boost::posix_time::microsec_clock::local_time() << " ACOUSTICDATAFILTER(" << (void*)this << "): "<< str << '\n'; } }while( false )
10#else
11#define DEBUG_MSG(str,lev) do { } while ( false )
12#endif
13
14/**
15 * \author fsimeone
16 */
17
18AcousticDataFilter::AcousticDataFilter( const std::string& address,\
19 const std::string& port,\
20 const std::string& name,\
21 const std::string& server,\
22 const std::string& wisdom,\
23 const std::string& output,\
24 const std::string& raw,\
25 JLOGGER::JLogger* logger,\
26 const int level): KM3NETDAQ::JDAQClient(name,server,logger,level),\
27 server_acceptor_(server_io_service_),
28 strand_(trigger_io_service_)
29{
30 DEBUG_MSG("Constructor",0);
31
33
34 address_=address;
35 port_=port;
36 exit_=false;
37 running_=false;
38 wisdom_file_=wisdom;
39 toa_file_path_=output;
41 Start();
42}
43
45{
46 DEBUG_MSG("Destructor",0);
47 Stop();
48}
49
51{
52 DEBUG_MSG("Start",2);
53 server_work_ = new boost::asio::io_service::work(server_io_service_);
54 trigger_work_ = new boost::asio::io_service::work(trigger_io_service_);
55
56 boost::asio::ip::tcp::resolver l_resolver(server_io_service_);
57 boost::asio::ip::tcp::resolver::query l_query(address_,port_);
58 boost::asio::ip::tcp::endpoint l_endpoint = *l_resolver.resolve(l_query);
59 server_acceptor_.open(l_endpoint.protocol());
60 server_acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
61 server_acceptor_.bind(l_endpoint);
62 server_acceptor_.listen();
63
64 for (std::size_t i=0; i<SERVER_THREAD_POOL; ++i) { server_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&server_io_service_)); }
65 for (std::size_t i=0; i<TRIGGER_THREAD_POOL; ++i) { trigger_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&trigger_io_service_)); }
66 strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
67}
68
70{
71 DEBUG_MSG("Stop",2);
72
73 server_acceptor_.close();
74 delete server_work_;
75
77 for (i=links_.begin(); i!=links_.end(); ++i){ (*i)->Disconnect(); }
78 server_io_service_.stop();
79 server_threads_.join_all();
80 server_io_service_.reset();
81
82 delete trigger_work_;
84 trigger_threads_.join_all();
85 trigger_io_service_.reset();
86
87 links_.clear();
88 doms_.clear();
89 probes_.clear();
90
91 if (toa_file_.is_open()) toa_file_.close();
92}
93
95{
96 DEBUG_MSG("Start accept",2);
98 links_.push_back(l_link);
99 server_acceptor_.async_accept(l_link->Socket(), boost::bind(&AcousticDataFilter::Accept_completed, this, l_link, boost::asio::placeholders::error));
100}
101
102void AcousticDataFilter::Accept_completed(pLink link, const boost::system::error_code& error)
103{
104 DEBUG_MSG("Accept completed",2);
105
106 if (!error)
107 {
108 link->Start_reading();
109 strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
110 }
111}
112
113void AcousticDataFilter::actionInit(int length, const char* buffer)
114{
115 DEBUG_MSG("SM:Init",2);
116 FILE* pfile;
117 pfile=fopen(wisdom_file_.c_str(),"r");
118 if (pfile!=NULL)
119 {
120#ifdef TOAFINDER_USE_FLOAT
121 if (fftwf_import_wisdom_from_file(pfile)==1)
122#else
123 if (fftw_import_wisdom_from_file(pfile)==1)
124#endif
125 {
126 DEBUG_MSG("SM:Init wisdom imported",2);
127 }
128 fclose(pfile);
129 }
130}
131
132void AcousticDataFilter::actionReset(int length, const char* buffer)
133{
134 DEBUG_MSG("SM:Reset",2);
135 FILE* pfile;
136 pfile=fopen(wisdom_file_.c_str(),"w");
137 if (pfile!=NULL)
138 {
139DEBUG_MSG("SM:Reset wisdom saved",2);
140#ifdef TOAFINDER_USE_FLOAT
141 fftwf_export_wisdom_to_file(pfile);
142#else
143 fftw_export_wisdom_to_file(pfile);
144#endif
145 fclose(pfile);
146 }
147 links_.clear();
148 doms_.clear();
149 probes_.clear();
150 if (toa_file_.is_open()) toa_file_.close();
151}
152
153void AcousticDataFilter::actionConfigure(int length, const char* buffer)
154{
155 DEBUG_MSG("SM:Configure",2);
156 Parser dummy(std::string(buffer, length),';');
157 DEBUG_MSG("SM:Cerco",2);
158
159 std::string l_debug_file_path;
160 std::string l_window;
161 std::string l_overlap;
162 std::vector<std::string> l_toa_path;
163 std::vector<std::string> l_dbg_path;
165 std::vector<std::string> l_dom_configuration;
166
167 l_window = dummy.Find("ADF_analysis_window_size", 0);
168 l_overlap = dummy.Find("ADF_analysis_window_overlap", 0);
169 l_probes = dummy.Find("ADF_waveform");
170 l_dom_configuration = dummy.Find("ADF_DOM_configuration");
171
172 l_toa_path=dummy.Find("ADF_TOA_path");
173 if (l_toa_path.size()!=0) toa_file_path_=l_toa_path[0];
174 l_dbg_path=dummy.Find("ADF_DBG_path");
175 if (l_dbg_path.size()!=0) debug_file_path_=l_dbg_path[0];
176
177 DEBUG_MSG("SM:Configure. win="<<l_window,2);
178 DEBUG_MSG("SM:Configure. ovr="<<l_overlap,2);
179 DEBUG_MSG("SM:Configure. TOA_path="<<toa_file_path_,2);
180 DEBUG_MSG("SM:Configure. DBG_path="<<debug_file_path_,2);
181
182 DEBUG_MSG("SM:Configure. n dom="<<l_dom_configuration.size(),2);
183 DEBUG_MSG("SM:Configure. n pro="<<l_probes.size(),2);
184
185 for (auto i=0; i< l_probes.size(); i++) {
186 DEBUG_MSG("SM:Configure. l_probes[" << i << "]=" << l_probes[i] << "\n",2);
187 }
188
189 int probe_errors = Create_probes(l_probes,atoi(l_window.c_str()));
190 if (probe_errors) {
191 ERROR(probe_errors << " errors encountered when reading probes from configuration\n");
192 }
193
194 int dom_errors = Create_DOMs(l_dom_configuration,debug_file_path_,atoi(l_window.c_str()),atoi(l_overlap.c_str()));
195
196 if (dom_errors) {
197 ERROR(dom_errors << " errors encountered when reading doms from configuration\n");
198 }
199
200}
201
202void AcousticDataFilter::actionStart(int length, const char* buffer)
203{
204 DEBUG_MSG("SM start",2);
205 running_=true;
206
207}
208
210{
211 DEBUG_MSG("Create probes",1);
212
213 int nerrors{0};
214
215 for(const auto& probeString: probes)
216 {
217 auto waveform = parseWaveformSamples<real_type>(probeString);
218 if (waveform==std::nullopt) {
219 ++nerrors;
220 ERROR("could not parse probe line " << probeString << "\n");
221 continue;
222 }
223 DEBUG_MSG("creating probe id "<<waveform->id,1);
224 pProbe l_probe = boost::make_shared<KM3::TOALIB::CTOAFinder::probeSignalT>(*(new KM3::TOALIB::CTOAFinder::probeSignalT(window)));
225 DEBUG_MSG("copying data",1);
226 l_probe->copyFromBuffer(waveform->samples.data(),waveform->samples.size());
227 DEBUG_MSG("insert probe id"<<waveform->id,1);
228 probes_.insert(probe_entry(waveform->id,l_probe));
229 }
230 DEBUG_MSG("Probes creation finished",1);
231 return nerrors;
232}
233
234int AcousticDataFilter::Create_DOMs(std::vector<std::string> dom_configuration,std::string& debug_file_path,std::size_t window,std::size_t overlap)
235{
236 DEBUG_MSG("Create DOM",1);
237
238 int nerrors{0};
239
240 for (const auto& line : dom_configuration) {
241 auto config = parseDOMConfiguration(line);
242
243 if (config==std::nullopt) {
244 ++nerrors;
245 ERROR("could not parse DOM configuration line " << line << "\n");
246 continue;
247 }
248
249 const auto& [ dom_id, probe_configs, write_flag ] = config.value();
250
251 DEBUG_MSG("DOM id="<<dom_id,1);
252
253 pDOM l_dom = boost::make_shared<KM3NeT_DOM>(*(new KM3NeT_DOM(trigger_io_service_,
254 boost::bind(&AcousticDataFilter::Send_toa_async, this, _1),
255 debug_file_path,
256 window,
257 overlap)));
258 doms_.insert(dom_entry(dom_id,l_dom));
259
260 for (const auto& probe_config: probe_configs) {
261 DEBUG_MSG("adding probe "<<probe_config.id<<" with threshold "<<probe_config.threshold<<" to DOM "<<dom_id,1);
262 auto known_probe = probes_.find(probe_config.id);
263 if (known_probe== probes_.end()) {
264 ERROR("probe id " << probe_config.id << " not found in the internal map ! Not adding it to our list of probes !" << "\n");
265 continue;
266 }
267 l_dom->Add_probe(known_probe->second,probe_config.id,probe_config.threshold);
268 }
269 DEBUG_MSG("setting test id="<<write_flag,1);
270 l_dom->Set_test(write_flag);
271 }
272 DEBUG_MSG("DOMs creation finished",1);
273 return nerrors;
274}
275
277{
278 if (!running_) return;
279
280 DEBUG_MSG("Sent toa from ID="<<packet.Header.DOM_Identifier,3);
281
282 if (!toa_file_.is_open()) Open_toa_file();
283
284 if (toa_file_.tellp()>MAX_TOA_SIZE)
285 {
286 DEBUG_MSG("Toa file closed",1);
287 toa_file_.close();
289 }
290
291 if (toa_file_.is_open()) toa_file_.write((char*)&packet,sizeof(ToA_Packet));
292
293}
294
296{
297 std::string l_file=toa_file_path_;
298 std::time_t seconds = std::time(0);
299 l_file+="TOA_"+std::to_string((long long unsigned int)seconds)+".bin";
300 toa_file_.open(l_file.c_str(),std::ofstream::binary);
301 DEBUG_MSG("Toa file opened at "<<seconds,1);
302}
303
304void AcousticDataFilter::actionPause(int length, const char* buffer)
305{
306 DEBUG_MSG("SM: pause",2);
307 running_=false;
308}
309
310void AcousticDataFilter::actionContinue(int length, const char* buffer)
311{
312 DEBUG_MSG("SM: continue",2);
313 running_=true;
314}
315
316void AcousticDataFilter::actionStop(int length, const char* buffer)
317{
318 DEBUG_MSG("SM:Stop",2);
319 links_.clear();
320 doms_.clear();
321 probes_.clear();
322 if (toa_file_.is_open()) toa_file_.close();
323
324
325}
326
327void AcousticDataFilter::actionQuit(int length, const char* buffer)
328{
329 DEBUG_MSG("SM:Quit",2);
330 links_.clear();
331 doms_.clear();
332 probes_.clear();
333 if (toa_file_.is_open()) toa_file_.close();
334
335
336}
337
#define DEBUG_MSG(str, lev)
#define SERVER_THREAD_POOL
std::pair< int, pProbe > probe_entry
#define MAX_TOA_SIZE
#define TRIGGER_THREAD_POOL
std::pair< std::size_t, pDOM > dom_entry
General purpose messaging.
#define ERROR(A)
Definition JMessage.hh:66
boost::shared_ptr< KM3::TOALIB::CTOAFinder::probeSignalT > pProbe
boost::shared_ptr< KM3NeT_DOM > pDOM
Definition KM3NeT_DOM.h:65
int Create_DOMs(std::vector< std::string > doms, std::string &debug_file_path, std::size_t window, std::size_t overlap)
boost::asio::io_service::strand strand_
boost::asio::ip::tcp::acceptor server_acceptor_
void Send_toa(ToA_Packet packet)
virtual void actionQuit(int length, const char *buffer)
boost::atomic< bool > exit_
boost::atomic< bool > running_
boost::asio::io_service::work * trigger_work_
boost::thread_group trigger_threads_
virtual void actionContinue(int length, const char *buffer)
int Create_probes(std::vector< std::string > probes, std::size_t window)
virtual void actionConfigure(int length, const char *buffer)
std::vector< pLink > links_
virtual void actionReset(int length, const char *buffer)
std::map< std::size_t, pDOM > doms_
boost::asio::io_service::work * server_work_
virtual void actionStop(int length, const char *buffer)
boost::asio::io_service server_io_service_
boost::asio::io_service trigger_io_service_
const JNET::JTag & clientTag() const
boost::thread_group server_threads_
virtual void actionInit(int length, const char *buffer)
AcousticDataFilter(const std::string &address, const std::string &port, const std::string &name, const std::string &server, const std::string &wisdom, const std::string &output, const std::string &raw, JLOGGER::JLogger *logger, const int level)
void Accept_completed(pLink link, const boost::system::error_code &error)
void Send_toa_async(ToA_Packet packet)
virtual void actionPause(int length, const char *buffer)
virtual void actionStart(int length, const char *buffer)
std::map< std::size_t, pProbe > probes_
JDAQStateMachine::ev_configure_event ev_configure
Interface for logging messages.
Definition JLogger.hh:22
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
std::vector< std::string > Find(const std::string &tag)
Definition parser.cc:23
uint32_t dom_id(frame_idx_t idx)
KM3NeT DAQ data structures and auxiliaries.
Definition DataQueue.cc:39
static const JNET::JTag RC_CMD
Definition JDAQTags.hh:66
std::optional< DOMConfig > parseDOMConfiguration(const std::string &dom_configuration_line)
Parse a line that should contain the ADF configuration for a single DOM.
Definition parser.cc:102
std::optional< Waveform< T > > parseWaveformSamples(const std::string &adf_waveform_line)
Definition parser.cc:46
DAQ_Common_Header Header