Jpp
AcousticDataFilter.cpp
Go to the documentation of this file.
1 #include "AcousticDataFilter.h"
2 #ifdef DEBUG_ACOUSTICDATAFILTER
3 #include <stdlib.h>
4 #include <iostream>
5 #include <boost/date_time/posix_time/posix_time.hpp>
6 #include <Tools/KM3NeT_Debug.h>
7 #define DEBUG_MSG(str,lev) do { if (((uint8_t)lev)>=AcousticDataFilter::DEBUG_LEVEL) { DBG_MSG << boost::posix_time::microsec_clock::local_time() << " ACOUSTICDATAFILTER(" << (void*)this << "): "<< str << '\n'; } }while( false )
8 #else
9 #define DEBUG_MSG(str,lev) do { } while ( false )
10 #endif
11 
12 /**
13  * \author fsimeone
14  */
15 
16 class Parser
17 {
18 public:
19  Parser(std::string d,char t)
20  {
21  std::istringstream iss(d);
22  std::string token;
23 
24  while (std::getline(iss, token, t))
25  {
27  size_t pos = token.find('=');
28  std::string key=token.substr(0, pos);
29  std::string value=token.substr(pos+1);
32  mmap.insert(std::pair<std::string,std::string>(key,value));
33  }
34  }
35 
36  std::vector<std::string> Find(std::string tag)
37  {
40  i = mmap.equal_range(tag);
41  for (std::multimap<std::string,std::string>::iterator it=i.first; it!=i.second; ++it)
42  {
43  r.push_back(it->second);
44  }
45 
46  return r;
47  }
48 
49  std::string Find(std::string tag, size_t index, std::string value = "")
50  {
52 
53  if (result.size() > index)
54  return result[index];
55  else
56  return value;
57  }
58 
59 private:
61 };
62 
63 AcousticDataFilter::AcousticDataFilter( const std::string& address,\
64  const std::string& port,\
65  const std::string& name,\
66  const std::string& server,\
67  const std::string& wisdom,\
68  const std::string& output,\
69  const std::string& raw,\
70  JLOGGER::JLogger* logger,\
71  const int level): KM3NETDAQ::JDAQClient(name,server,logger,level),\
72  strand_(trigger_io_service_),\
73  server_acceptor_(server_io_service_)
74 {
75  DEBUG_MSG("Constructor",0);
76 
78 
79  address_=address;
80  port_=port;
81  exit_=false;
82  running_=false;
83  wisdom_file_=wisdom;
84  toa_file_path_=output;
85  debug_file_path_=raw;
86 
87 }
88 
90 {
91  DEBUG_MSG("Destructor",0);
92  Stop();
93 }
94 
96 {
97  DEBUG_MSG("Start",2);
98  server_work_ = new boost::asio::io_service::work(server_io_service_);
99  trigger_work_ = new boost::asio::io_service::work(trigger_io_service_);
100 
101  boost::asio::ip::tcp::resolver l_resolver(server_io_service_);
102  boost::asio::ip::tcp::resolver::query l_query(address_,port_);
103  boost::asio::ip::tcp::endpoint l_endpoint = *l_resolver.resolve(l_query);
104  server_acceptor_.open(l_endpoint.protocol());
105  server_acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
106  server_acceptor_.bind(l_endpoint);
107  server_acceptor_.listen();
108 
109  for (std::size_t i=0; i<SERVER_THREAD_POOL; ++i) { server_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&server_io_service_)); }
110  for (std::size_t i=0; i<TRIGGER_THREAD_POOL; ++i) { trigger_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&trigger_io_service_)); }
111 }
112 
114 {
115  DEBUG_MSG("Stop",2);
116 
117  server_acceptor_.close();
118  delete server_work_;
119 
121  for (i=links_.begin(); i!=links_.end(); ++i){ (*i)->Disconnect(); }
122  server_io_service_.stop();
123  server_threads_.join_all();
124  server_io_service_.reset();
125 
126  delete trigger_work_;
127  trigger_io_service_.stop();
128  trigger_threads_.join_all();
129  trigger_io_service_.reset();
130 
131  links_.clear();
132  doms_.clear();
133  probes_.clear();
134 
135  if (toa_file_.is_open()) toa_file_.close();
136 }
137 
139 {
140  DEBUG_MSG("Start accept",2);
142  links_.push_back(l_link);
143  server_acceptor_.async_accept(l_link->Socket(), boost::bind(&AcousticDataFilter::Accept_completed, this, l_link, boost::asio::placeholders::error));
144 }
145 
146 void AcousticDataFilter::Accept_completed(pLink link, const boost::system::error_code& error)
147 {
148  DEBUG_MSG("Accept completed",2);
149 
150  if (!error)
151  {
152  link->Start_reading();
153  strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
154  }
155 }
156 
157 void AcousticDataFilter::actionInit(int length, const char* buffer)
158 {
159  DEBUG_MSG("SM:Init",2);
160  FILE* pfile;
161  pfile=fopen(wisdom_file_.c_str(),"r");
162  if (pfile!=NULL)
163  {
164  if (fftwf_import_wisdom_from_file(pfile)==1)
165  {
166  DEBUG_MSG("SM:Init wisdom imported",2);
167  }
168  fclose(pfile);
169  }
170 }
171 
172 void AcousticDataFilter::actionReset(int length, const char* buffer)
173 {
174  DEBUG_MSG("SM:Reset",2);
175  FILE* pfile;
176  pfile=fopen(wisdom_file_.c_str(),"w");
177  if (pfile!=NULL)
178  {
179 DEBUG_MSG("SM:Reset wisdom saved",2);
180  fftwf_export_wisdom_to_file(pfile);
181  fclose(pfile);
182  }
183 }
184 
185 void AcousticDataFilter::actionConfigure(int length, const char* buffer)
186 {
187  DEBUG_MSG("SM:Configure",2);
188  Parser dummy(std::string(buffer, length),';');
189  DEBUG_MSG("SM:Cerco",2);
190 
191  std::string l_debug_file_path;
192  std::string l_window;
193  std::string l_overlap;
194  std::vector<std::string> l_toa_path;
195  std::vector<std::string> l_dbg_path;
196  std::vector<std::string> l_probes;
197  std::vector<std::string> l_dom_configuration;
198 
199  l_window = dummy.Find("ADF_analysis_window_size", 0);
200  l_overlap = dummy.Find("ADF_analysis_window_overlap", 0);
201  l_probes = dummy.Find("ADF_waveform");
202  l_dom_configuration = dummy.Find("ADF_DOM_configuration");
203 
204  l_toa_path=dummy.Find("ADF_TOA_path");
205  if (l_toa_path.size()!=0) toa_file_path_=l_toa_path[0];
206  l_dbg_path=dummy.Find("ADF_DBG_path");
207  if (l_dbg_path.size()!=0) debug_file_path_=l_dbg_path[0];
208 
209  DEBUG_MSG("SM:Configure. win="<<l_window,2);
210  DEBUG_MSG("SM:Configure. ovr="<<l_overlap,2);
211  DEBUG_MSG("SM:Configure. TOA_path="<<toa_file_path_,2);
212  DEBUG_MSG("SM:Configure. DBG_path="<<debug_file_path_,2);
213 
214  DEBUG_MSG("SM:Configure. n dom="<<l_dom_configuration.size(),2);
215  DEBUG_MSG("SM:Configure. n pro="<<l_probes.size(),2);
216 
217  Create_probes(l_probes,atoi(l_window.c_str()));
218  Create_DOMs(l_dom_configuration,debug_file_path_,atoi(l_window.c_str()),atoi(l_overlap.c_str()));
219 
220  Start();
221 
222 }
223 
224 void AcousticDataFilter::actionStart(int length, const char* buffer)
225 {
226  DEBUG_MSG("SM start",2);
227  running_=true;
228  strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
229 }
230 
232 {
233  DEBUG_MSG("Create probes",1);
234 
235  for(int t=0;t<probes.size();++t)
236  {
237  std::istringstream l_iss(probes.at(t));
238  std::string l_line;
239  uint32_t l_id;
240  uint32_t l_samples;
241  real_type* l_buffer;
242 
243  std::getline(l_iss, l_line,' ');
244  l_id=atoi(l_line.c_str());
245  DEBUG_MSG("id probe="<<l_id,1);
246  std::getline(l_iss, l_line,' ');
247  l_samples=atoi(l_line.c_str());
248  DEBUG_MSG("n samples="<<l_samples,1);
249  l_buffer=(real_type*)malloc(l_samples*sizeof(real_type));
250  for (uint32_t i=0; i<l_samples; ++i)
251  {
252  std::getline(l_iss, l_line,' ');
253  real_type l_dummy=atof(l_line.c_str());
254  DEBUG_MSG("sample="<<l_dummy,0);
255  l_buffer[i]=l_dummy;
256  }
257  DEBUG_MSG("creating probes",1);
258  pProbe l_probe = boost::make_shared<KM3::TOALIB::CTOAFinder::probeSignalT>(*(new KM3::TOALIB::CTOAFinder::probeSignalT(window)));
259  DEBUG_MSG("copying data",1);
260  l_probe->copyFromBuffer(l_buffer,l_samples);
261  DEBUG_MSG("insert probe",1);
262  probes_.insert(probe_entry(l_id,l_probe));
263  }
264  DEBUG_MSG("Probes creation finished",1);
265 }
266 
267 void AcousticDataFilter::Create_DOMs(std::vector<std::string> dom_configuration,std::string& debug_file_path,std::size_t window,std::size_t overlap)
268 {
269  DEBUG_MSG("Create DOM",1);
270 
271  for(int t=0;t<dom_configuration.size();++t)
272  {
273  std::istringstream l_iss(dom_configuration.at(t));
274  std::string l_line;
275  uint32_t l_dom_id;
276  uint32_t l_probe_id;
277  uint32_t l_threshold;
278  uint32_t l_waveforms;
279  uint32_t l_test;
280 
281  std::getline(l_iss, l_line,' ');
282  l_dom_id=atoi(l_line.c_str());
283  DEBUG_MSG("DOM id="<<l_dom_id,1);
284  pDOM l_dom = boost::make_shared<KM3NeT_DOM>(*(new KM3NeT_DOM( trigger_io_service_,\
285  boost::bind(&AcousticDataFilter::Send_toa_async, this, _1),\
286  debug_file_path,\
287  window,\
288  overlap)));
289  doms_.insert(dom_entry(l_dom_id,l_dom));
290  std::getline(l_iss, l_line,' ');
291  l_waveforms=atoi(l_line.c_str());
292  DEBUG_MSG("n waveforms="<<l_waveforms,1);
293  for (uint32_t i=0; i<l_waveforms; ++i)
294  {
295  std::getline(l_iss, l_line,' ');
296  l_probe_id=atoi(l_line.c_str());
297  std::getline(l_iss, l_line,' ');
298  l_threshold=atoi(l_line.c_str());
299  DEBUG_MSG("adding probe "<<l_probe_id<<" with threshold "<<l_threshold<<" to DOM "<<l_dom_id,1);
300  l_dom->Add_probe(probes_.find(l_probe_id)->second,l_probe_id,l_threshold);
301  }
302  std::getline(l_iss, l_line,' ');
303  l_test=atoi(l_line.c_str());
304  DEBUG_MSG("setting test id="<<l_test,1);
305  l_dom->Set_test(l_test);
306  }
307  DEBUG_MSG("DOMs creation finished",1);
308 }
309 
311 {
312  if (!running_) return;
313 
314  DEBUG_MSG("Sent toa from ID="<<packet.Header.DOM_Identifier,3);
315 
316  if (!toa_file_.is_open()) Open_toa_file();
317 
318  if (toa_file_.tellp()>MAX_TOA_SIZE)
319  {
320  DEBUG_MSG("Toa file closed",1);
321  toa_file_.close();
322  Open_toa_file();
323  }
324 
325  if (toa_file_.is_open()) toa_file_.write((char*)&packet,sizeof(ToA_Packet));
326 
327 }
328 
330 {
331  std::string l_file=toa_file_path_;
332  std::time_t seconds = std::time(0);
333  l_file+="TOA_"+std::to_string((long long unsigned int)seconds)+".bin";
334  toa_file_.open(l_file.c_str(),std::ofstream::binary);
335  DEBUG_MSG("Toa file opened at "<<seconds,1);
336 }
337 
338 void AcousticDataFilter::actionPause(int length, const char* buffer)
339 {
340  DEBUG_MSG("SM: pause",2);
341  running_=false;
342 }
343 
344 void AcousticDataFilter::actionContinue(int length, const char* buffer)
345 {
346  DEBUG_MSG("SM: continue",2);
347  running_=true;
348 }
349 
350 void AcousticDataFilter::actionStop(int length, const char* buffer)
351 {
352  DEBUG_MSG("SM:Stop",2);
353  Stop();
354 }
355 
356 void AcousticDataFilter::actionQuit(int length, const char* buffer)
357 {
358  DEBUG_MSG("SM:Quit",2);
359  Stop();
360 }
361 
JLOGGER::JLogger
Interface for logging messages.
Definition: JLogger.hh:22
AcousticDataFilter::actionPause
virtual void actionPause(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:338
AcousticDataFilter::clientTag
const JNET::JTag & clientTag() const
Definition: AcousticDataFilter.h:79
probe_entry
std::pair< std::size_t, pProbe > probe_entry
Definition: AcousticDataFilter.h:41
AcousticDataFilter::toa_file_path_
std::string toa_file_path_
Definition: AcousticDataFilter.h:109
AcousticDataFilter::Create_probes
void Create_probes(std::vector< std::string > probes, std::size_t window)
Definition: AcousticDataFilter.cpp:231
AcousticDataFilter::actionConfigure
virtual void actionConfigure(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:185
AcousticDataFilter::trigger_io_service_
boost::asio::io_service trigger_io_service_
Definition: AcousticDataFilter.h:104
AcousticDataFilter::toa_file_
std::ofstream toa_file_
Definition: AcousticDataFilter.h:110
AcousticDataFilter::exit_
boost::atomic< bool > exit_
Definition: AcousticDataFilter.h:90
AcousticDataFilter::actionStop
virtual void actionStop(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:350
KM3NeT_Debug.h
AcousticDataFilter::actionContinue
virtual void actionContinue(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:344
AcousticDataFilter::links_
std::vector< pLink > links_
Definition: AcousticDataFilter.h:97
Parser::Parser
Parser(std::string d, char t)
Definition: AcousticDataFilter.cpp:19
AcousticDataFilter::server_work_
boost::asio::io_service::work * server_work_
Definition: AcousticDataFilter.h:101
AcousticDataFilter::port_
std::string port_
Definition: AcousticDataFilter.h:93
ToA_Packet
Definition: Audio_Structures.h:85
std::vector< std::string >
AcousticDataFilter::server_threads_
boost::thread_group server_threads_
Definition: AcousticDataFilter.h:99
AcousticDataFilter::Send_toa
void Send_toa(ToA_Packet packet)
Definition: AcousticDataFilter.cpp:310
pDOM
boost::shared_ptr< KM3NeT_DOM > pDOM
Definition: KM3NeT_DOM.h:65
AcousticDataFilter::Accept_completed
void Accept_completed(pLink link, const boost::system::error_code &error)
Definition: AcousticDataFilter.cpp:146
AcousticDataFilter::doms_
std::map< std::size_t, pDOM > doms_
Definition: AcousticDataFilter.h:96
AcousticDataFilter::actionStart
virtual void actionStart(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:224
DAQ_Common_Header::DOM_Identifier
uint32_t DOM_Identifier
Definition: Audio_Structures.h:21
KM3NeT_DOM
Definition: KM3NeT_DOM.h:32
JTOOLS::overlap
bool overlap(const JRange< T, JComparator_t > &first, const JRange< T, JComparator_t > &second)
Test overlap between ranges.
Definition: JRange.hh:653
AcousticDataFilter::trigger_threads_
boost::thread_group trigger_threads_
Definition: AcousticDataFilter.h:103
Parser::mmap
std::multimap< std::string, std::string > mmap
Definition: AcousticDataFilter.cpp:60
KM3NETDAQ::JDAQClient::replaceEvent
void replaceEvent(const JTag &oldTag, const JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:492
AcousticDataFilter::server_io_service_
boost::asio::io_service server_io_service_
Definition: AcousticDataFilter.h:100
AcousticDataFilter::strand_
boost::asio::io_service::strand strand_
Definition: AcousticDataFilter.h:106
AcousticDataFilter::AcousticDataFilter
AcousticDataFilter(const std::string &address, const std::string &port, const std::string &name, const std::string &server, const std::string &wisdom, const std::string &output, const std::string &raw, JLOGGER::JLogger *logger, const int level)
Definition: AcousticDataFilter.cpp:63
JTOOLS::result
return result
Definition: JPolint.hh:695
pProbe
boost::shared_ptr< KM3::TOALIB::CTOAFinder::probeSignalT > pProbe
Definition: KM3NeT_Audio_Channel.h:21
AcousticDataFilter::Stop
void Stop()
Definition: AcousticDataFilter.cpp:113
AcousticDataFilter::actionInit
virtual void actionInit(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:157
SERVER_THREAD_POOL
#define SERVER_THREAD_POOL
Definition: AcousticDataFilter.h:32
AcousticDataFilter::~AcousticDataFilter
virtual ~AcousticDataFilter()
Definition: AcousticDataFilter.cpp:89
MAX_TOA_SIZE
#define MAX_TOA_SIZE
Definition: AcousticDataFilter.h:34
AcousticDataFilter::Send_toa_async
void Send_toa_async(ToA_Packet packet)
Definition: AcousticDataFilter.h:62
std::pair
Definition: JSTDTypes.hh:15
JLANG::to_string
std::string to_string(const T &value)
Convert value to string.
Definition: JLangToolkit.hh:192
ToA_Packet::Header
DAQ_Common_Header Header
Definition: Audio_Structures.h:87
AcousticDataFilter::trigger_work_
boost::asio::io_service::work * trigger_work_
Definition: AcousticDataFilter.h:105
AcousticDataFilter::address_
std::string address_
Definition: AcousticDataFilter.h:92
std::multimap
Definition: JSTDTypes.hh:17
AcousticDataFilter::debug_file_path_
std::string debug_file_path_
Definition: AcousticDataFilter.h:108
AcousticDataFilter::Create_DOMs
void Create_DOMs(std::vector< std::string > doms, std::string &debug_file_path, std::size_t window, std::size_t overlap)
Definition: AcousticDataFilter.cpp:267
AcousticDataFilter.h
AcousticDataFilter::running_
boost::atomic< bool > running_
Definition: AcousticDataFilter.h:91
Parser
Definition: AcousticDataFilter.cpp:16
AcousticDataFilter::actionQuit
virtual void actionQuit(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:356
JDAQStateMachine::ev_configure
JDAQStateMachine::ev_configure_event ev_configure
Parser::Find
std::vector< std::string > Find(std::string tag)
Definition: AcousticDataFilter.cpp:36
DEBUG_MSG
#define DEBUG_MSG(str, lev)
Definition: AcousticDataFilter.cpp:9
AcousticDataFilter::server_acceptor_
boost::asio::ip::tcp::acceptor server_acceptor_
Definition: AcousticDataFilter.h:102
TRIGGER_THREAD_POOL
#define TRIGGER_THREAD_POOL
Definition: AcousticDataFilter.h:33
KM3NETDAQ
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
AcousticDataFilter::probes_
std::map< std::size_t, pProbe > probes_
Definition: AcousticDataFilter.h:95
Parser::Find
std::string Find(std::string tag, size_t index, std::string value="")
Definition: AcousticDataFilter.cpp:49
AcousticDataFilter::actionReset
virtual void actionReset(int length, const char *buffer)
Definition: AcousticDataFilter.cpp:172
KM3NETDAQ::RC_CMD
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
dom_entry
std::pair< std::size_t, pDOM > dom_entry
Definition: AcousticDataFilter.h:42
AcousticDataFilter::Start
void Start()
Definition: AcousticDataFilter.cpp:95
JTOOLS::r
data_type r[M+1]
Definition: JPolint.hh:709
AcousticDataFilter::Start_accept
void Start_accept()
Definition: AcousticDataFilter.cpp:138
JLANG::trim
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
AcousticDataFilter::wisdom_file_
std::string wisdom_file_
Definition: AcousticDataFilter.h:111
JLANG::getline
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:468
AcousticDataFilter::Open_toa_file
void Open_toa_file()
Definition: AcousticDataFilter.cpp:329