Jpp
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
AcousticDataFilter.cpp
Go to the documentation of this file.
1 #include "AcousticDataFilter.h"
2 
3 #ifdef DEBUG_ACOUSTICDATAFILTER
4 #include <stdlib.h>
5 #include <iostream>
6 #include <boost/date_time/posix_time/posix_time.hpp>
7 #include <Tools/KM3NeT_Debug.h>
8 #define DEBUG_MSG(str,lev) do { if (((uint8_t)lev)>=AcousticDataFilter::DEBUG_LEVEL) { DBG_MSG << boost::posix_time::microsec_clock::local_time() << " ACOUSTICDATAFILTER(" << (void*)this << "): "<< str << '\n'; } }while( false )
9 #else
10 #define DEBUG_MSG(str,lev) do { } while ( false )
11 #endif
12 
13 /**
14  * \author fsimeone
15  */
16 
17 class Parser
18 {
19 public:
20  Parser(std::string d,char t)
21  {
22  std::istringstream iss(d);
23  std::string token;
24 
25  while (std::getline(iss, token, t))
26  {
28  size_t pos = token.find('=');
29  std::string key=token.substr(0, pos);
30  std::string value=token.substr(pos+1);
33  mmap.insert(std::pair<std::string,std::string>(key,value));
34  }
35  }
36 
37  std::vector<std::string> Find(std::string tag)
38  {
41  i = mmap.equal_range(tag);
42  for (std::multimap<std::string,std::string>::iterator it=i.first; it!=i.second; ++it)
43  {
44  r.push_back(it->second);
45  }
46  return r;
47  }
48 
49 private:
51 };
52 
53 AcousticDataFilter::AcousticDataFilter( const std::string& address,\
54  const std::string& port,\
55  const std::string& name,\
56  const std::string& server,\
57  const std::string& wisdom,\
58  const std::string& output,\
59  const std::string& raw,\
60  JLOGGER::JLogger* logger,\
61  const int level): KM3NETDAQ::JDAQClient(name,server,logger,level),\
62  strand_(trigger_io_service_),\
63  server_acceptor_(server_io_service_)
64 {
65  DEBUG_MSG("Constructor",0);
66 
67  replaceEvent(KM3NETDAQ::RC_CMD, clientTag(), ev_configure);
68 
69  address_=address;
70  port_=port;
71  exit_=false;
72  running_=false;
73  wisdom_file_=wisdom;
74  toa_file_path_=output;
75  debug_file_path_=raw;
76 
77 }
78 
80 {
81  DEBUG_MSG("Destructor",0);
82  Stop();
83 }
84 
86 {
87  DEBUG_MSG("Start",2);
88  server_work_ = new boost::asio::io_service::work(server_io_service_);
89  trigger_work_ = new boost::asio::io_service::work(trigger_io_service_);
90 
91  boost::asio::ip::tcp::resolver l_resolver(server_io_service_);
92  boost::asio::ip::tcp::resolver::query l_query(address_,port_);
93  boost::asio::ip::tcp::endpoint l_endpoint = *l_resolver.resolve(l_query);
94  server_acceptor_.open(l_endpoint.protocol());
95  server_acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
96  server_acceptor_.bind(l_endpoint);
97  server_acceptor_.listen();
98 
99  for (std::size_t i=0; i<SERVER_THREAD_POOL; ++i) { server_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&server_io_service_)); }
100  for (std::size_t i=0; i<TRIGGER_THREAD_POOL; ++i) { trigger_threads_.create_thread(boost::bind(&boost::asio::io_service::run,&trigger_io_service_)); }
101 }
102 
104 {
105  DEBUG_MSG("Stop",2);
106 
107  server_acceptor_.close();
108  delete server_work_;
109 
111  for (i=links_.begin(); i!=links_.end(); ++i){ (*i)->Disconnect(); }
112  server_io_service_.stop();
113  server_threads_.join_all();
114  server_io_service_.reset();
115 
116  delete trigger_work_;
117  trigger_io_service_.stop();
118  trigger_threads_.join_all();
119  trigger_io_service_.reset();
120 
121  links_.clear();
122  doms_.clear();
123  probes_.clear();
124 
125  if (toa_file_.is_open()) toa_file_.close();
126 }
127 
129 {
130  DEBUG_MSG("Start accept",2);
132  links_.push_back(l_link);
133  server_acceptor_.async_accept(l_link->Socket(), boost::bind(&AcousticDataFilter::Accept_completed, this, l_link, boost::asio::placeholders::error));
134 }
135 
136 void AcousticDataFilter::Accept_completed(pLink link, const boost::system::error_code& error)
137 {
138  DEBUG_MSG("Accept completed",2);
139 
140  if (!error)
141  {
142  link->Start_reading();
143  strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
144  }
145 }
146 
147 void AcousticDataFilter::actionInit(int length, const char* buffer)
148 {
149  DEBUG_MSG("SM:Init",2);
150  FILE* pfile;
151  pfile=fopen(wisdom_file_.c_str(),"r");
152  if (pfile!=NULL)
153  {
154  if (fftwf_import_wisdom_from_file(pfile)==1)
155  {
156  DEBUG_MSG("SM:Init wisdom imported",2);
157  }
158  fclose(pfile);
159  }
160 }
161 
162 void AcousticDataFilter::actionReset(int length, const char* buffer)
163 {
164  DEBUG_MSG("SM:Reset",2);
165  FILE* pfile;
166  pfile=fopen(wisdom_file_.c_str(),"w");
167  if (pfile!=NULL)
168  {
169 DEBUG_MSG("SM:Reset wisdom saved",2);
170  fftwf_export_wisdom_to_file(pfile);
171  fclose(pfile);
172  }
173 }
174 
175 void AcousticDataFilter::actionConfigure(int length, const char* buffer)
176 {
177 
178  DEBUG_MSG("SM:Configure",2);
179  Parser dummy(std::string(buffer, length),';');
180  DEBUG_MSG("SM:Cerco",2);
181 
182  std::string l_debug_file_path;
183  std::string l_window;
184  std::string l_overlap;
185  std::vector<std::string> l_toa_path;
186  std::vector<std::string> l_dbg_path;
187  std::vector<std::string> l_probes;
188  std::vector<std::string> l_dom_configuration;
189 
190  l_window=dummy.Find("ADF_analysis_window_size")[0];
191  l_overlap=dummy.Find("ADF_analysis_window_overlap")[0];
192  l_probes=dummy.Find("ADF_waveform");
193  l_dom_configuration=dummy.Find("ADF_DOM_configuration");
194 
195  l_toa_path=dummy.Find("ADF_TOA_path");
196  if (l_toa_path.size()!=0) toa_file_path_=l_toa_path[0];
197  l_dbg_path=dummy.Find("ADF_DBG_path");
198  if (l_dbg_path.size()!=0) debug_file_path_=l_dbg_path[0];
199 
200  DEBUG_MSG("SM:Configure. win="<<l_window,2);
201  DEBUG_MSG("SM:Configure. ovr="<<l_overlap,2);
202  DEBUG_MSG("SM:Configure. TOA_path="<<toa_file_path_,2);
203  DEBUG_MSG("SM:Configure. DBG_path="<<debug_file_path_,2);
204 
205  DEBUG_MSG("SM:Configure. n dom="<<l_dom_configuration.size(),2);
206  DEBUG_MSG("SM:Configure. n pro="<<l_probes.size(),2);
207 
208  Create_probes(l_probes,atoi(l_window.c_str()));
209  Create_DOMs(l_dom_configuration,debug_file_path_,atoi(l_window.c_str()),atoi(l_overlap.c_str()));
210 
211  Start();
212 
213 }
214 
215 void AcousticDataFilter::actionStart(int length, const char* buffer)
216 {
217  DEBUG_MSG("SM start",2);
218  running_=true;
219  strand_.post(boost::bind(&AcousticDataFilter::Start_accept,this));
220 }
221 
223 {
224  DEBUG_MSG("Create probes",1);
225 
226  for(int t=0;t<probes.size();++t)
227  {
228  std::istringstream l_iss(probes.at(t));
229  std::string l_line;
230  uint32_t l_id;
231  uint32_t l_samples;
232  real_type* l_buffer;
233 
234  std::getline(l_iss, l_line,' ');
235  l_id=atoi(l_line.c_str());
236  DEBUG_MSG("id probe="<<l_id,1);
237  std::getline(l_iss, l_line,' ');
238  l_samples=atoi(l_line.c_str());
239  DEBUG_MSG("n samples="<<l_samples,1);
240  l_buffer=(real_type*)malloc(l_samples*sizeof(real_type));
241  for (uint32_t i=0; i<l_samples; ++i)
242  {
243  std::getline(l_iss, l_line,' ');
244  real_type l_dummy=atof(l_line.c_str());
245  DEBUG_MSG("sample="<<l_dummy,0);
246  l_buffer[i]=l_dummy;
247  }
248  DEBUG_MSG("creating probes",1);
249  pProbe l_probe = boost::make_shared<KM3::TOALIB::CTOAFinder::probeSignalT>(*(new KM3::TOALIB::CTOAFinder::probeSignalT(window)));
250  DEBUG_MSG("copying data",1);
251  l_probe->copyFromBuffer(l_buffer,l_samples);
252  DEBUG_MSG("insert probe",1);
253  probes_.insert(probe_entry(l_id,l_probe));
254  }
255  DEBUG_MSG("Probes creation finished",1);
256 }
257 
258 void AcousticDataFilter::Create_DOMs(std::vector<std::string> dom_configuration,std::string& debug_file_path,std::size_t window,std::size_t overlap)
259 {
260  DEBUG_MSG("Create DOM",1);
261 
262  for(int t=0;t<dom_configuration.size();++t)
263  {
264  std::istringstream l_iss(dom_configuration.at(t));
265  std::string l_line;
266  uint32_t l_dom_id;
267  uint32_t l_probe_id;
268  uint32_t l_threshold;
269  uint32_t l_waveforms;
270  uint32_t l_test;
271 
272  std::getline(l_iss, l_line,' ');
273  l_dom_id=atoi(l_line.c_str());
274  DEBUG_MSG("DOM id="<<l_dom_id,1);
275  pDOM l_dom = boost::make_shared<KM3NeT_DOM>(*(new KM3NeT_DOM( trigger_io_service_,\
276  boost::bind(&AcousticDataFilter::Send_toa_async, this, _1),\
277  debug_file_path,\
278  window,\
279  overlap)));
280  doms_.insert(dom_entry(l_dom_id,l_dom));
281  std::getline(l_iss, l_line,' ');
282  l_waveforms=atoi(l_line.c_str());
283  DEBUG_MSG("n waveforms="<<l_waveforms,1);
284  for (uint32_t i=0; i<l_waveforms; ++i)
285  {
286  std::getline(l_iss, l_line,' ');
287  l_probe_id=atoi(l_line.c_str());
288  std::getline(l_iss, l_line,' ');
289  l_threshold=atoi(l_line.c_str());
290  DEBUG_MSG("adding probe "<<l_probe_id<<" with threshold "<<l_threshold<<" to DOM "<<l_dom_id,1);
291  l_dom->Add_probe(probes_.find(l_probe_id)->second,l_probe_id,l_threshold);
292  }
293  std::getline(l_iss, l_line,' ');
294  l_test=atoi(l_line.c_str());
295  DEBUG_MSG("setting test id="<<l_test,1);
296  l_dom->Set_test(l_test);
297  }
298  DEBUG_MSG("DOMs creation finished",1);
299 }
300 
302 {
303  if (!running_) return;
304 
305  DEBUG_MSG("Sent toa from ID="<<packet.Header.DOM_Identifier,3);
306 
307  if (!toa_file_.is_open()) Open_toa_file();
308 
309  if (toa_file_.tellp()>MAX_TOA_SIZE)
310  {
311  DEBUG_MSG("Toa file closed",1);
312  toa_file_.close();
313  Open_toa_file();
314  }
315 
316  if (toa_file_.is_open()) toa_file_.write((char*)&packet,sizeof(ToA_Packet));
317 
318 }
319 
321 {
322  std::string l_file=toa_file_path_;
323  std::time_t seconds = std::time(0);
324  l_file+="TOA_"+std::to_string((long long unsigned int)seconds)+".bin";
325  toa_file_.open(l_file.c_str(),std::ofstream::binary);
326  DEBUG_MSG("Toa file opened at "<<seconds,1);
327 }
328 
329 void AcousticDataFilter::actionPause(int length, const char* buffer)
330 {
331  DEBUG_MSG("SM: pause",2);
332  running_=false;
333 }
334 
335 void AcousticDataFilter::actionContinue(int length, const char* buffer)
336 {
337  DEBUG_MSG("SM: continue",2);
338  running_=true;
339 }
340 
341 void AcousticDataFilter::actionStop(int length, const char* buffer)
342 {
343  DEBUG_MSG("SM:Stop",2);
344  Stop();
345 }
346 
347 void AcousticDataFilter::actionQuit(int length, const char* buffer)
348 {
349  DEBUG_MSG("SM:Quit",2);
350  Stop();
351 }
352 
virtual void actionReset(int length, const char *buffer)
boost::atomic< bool > exit_
void Send_toa_async(ToA_Packet packet)
virtual void actionInit(int length, const char *buffer)
boost::asio::io_service trigger_io_service_
std::map< std::size_t, pProbe > probes_
boost::asio::io_service::strand strand_
void Create_probes(std::vector< std::string > probes, std::size_t window)
boost::asio::io_service::work * server_work_
boost::thread_group trigger_threads_
Interface for logging messages.
Definition: JLogger.hh:22
std::string trim(const std::string &buffer)
Trim string.
Definition: JLangToolkit.hh:79
virtual void actionQuit(int length, const char *buffer)
virtual void actionConfigure(int length, const char *buffer)
boost::shared_ptr< KM3NeT_DOM > pDOM
Definition: KM3NeT_DOM.h:65
Parser(std::string d, char t)
const JNET::JTag & clientTag() const
std::vector< pLink > links_
std::istream & getline(std::istream &in, JString &object)
Read string from input stream until end of line.
Definition: JString.hh:468
virtual void actionStop(int length, const char *buffer)
std::multimap< std::string, std::string > mmap
boost::atomic< bool > running_
#define MAX_TOA_SIZE
AcousticDataFilter(const std::string &address, const std::string &port, const std::string &name, const std::string &server, const std::string &wisdom, const std::string &output, const std::string &raw, JLOGGER::JLogger *logger, const int level)
std::map< std::size_t, pDOM > doms_
void Accept_completed(pLink link, const boost::system::error_code &error)
#define TRIGGER_THREAD_POOL
boost::thread_group server_threads_
boost::asio::ip::tcp::acceptor server_acceptor_
virtual void actionStart(int length, const char *buffer)
boost::asio::io_service server_io_service_
std::string to_string(const T &value)
Convert value to string.
void Send_toa(ToA_Packet packet)
std::string debug_file_path_
#define SERVER_THREAD_POOL
virtual void actionContinue(int length, const char *buffer)
void replaceEvent(const JNET::JTag &oldTag, const JNET::JTag &newTag, JDAQEvent_t &event)
Replace tag of given event in event table.
Definition: JDAQClient.hh:294
boost::asio::io_service::work * trigger_work_
static const JNET::JTag RC_CMD
Definition: JDAQTags.hh:44
void Create_DOMs(std::vector< std::string > doms, std::string &debug_file_path, std::size_t window, std::size_t overlap)
boost::shared_ptr< KM3::TOALIB::CTOAFinder::probeSignalT > pProbe
DAQ_Common_Header Header
std::vector< std::string > Find(std::string tag)
std::pair< std::size_t, pProbe > probe_entry
#define DEBUG_MSG(str, lev)
std::pair< std::size_t, pDOM > dom_entry
virtual void actionPause(int length, const char *buffer)