Jpp  17.2.1-pre0
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JFremantle.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <set>
6 #include <map>
7 #include <deque>
8 #include <algorithm>
9 
10 #include <type_traits>
11 #include <functional>
12 #include <future>
13 #include <mutex>
14 #include <thread>
15 #include <vector>
16 #include <queue>
17 
18 #include "TROOT.h"
19 #include "TFile.h"
20 
21 #include "JLang/JPredicate.hh"
22 #include "JLang/JComparator.hh"
23 #include "JLang/JComparison.hh"
24 
25 #include "JDetector/JDetector.hh"
27 #include "JDetector/JTripod.hh"
29 #include "JDetector/JModule.hh"
30 #include "JDetector/JHydrophone.hh"
31 
32 #include "JTools/JHashMap.hh"
33 #include "JTools/JRange.hh"
34 
37 #include "JSupport/JMeta.hh"
38 
40 #include "JAcoustics/JEmitter.hh"
42 #include "JAcoustics/JHit.hh"
44 #include "JAcoustics/JKatoomba.hh"
45 #include "JAcoustics/JEvent.hh"
46 #include "JAcoustics/JEvt.hh"
48 #include "JAcoustics/JSupport.hh"
50 
51 #include "Jeep/JContainer.hh"
52 #include "Jeep/JParser.hh"
53 #include "Jeep/JMessage.hh"
54 
55 
56 namespace {
57 
58  using namespace std;
59  using namespace JPP;
60 
61  typedef JHit<JPDFGauss> hit_type;
62  typedef vector<hit_type> data_type;
63 
64  /**
65  * Thread pool for global fits.
66  */
67  class JFremantle {
68  public:
69  /**
70  * Constructor.
71  *
72  * \param oid detector identifier
73  * \param katoomba global fit
74  * \param output output
75  * \param N number of threads
76  */
77  JFremantle(const std::string& oid,
78  const JKatoomba_t& katoomba,
79  JObjectOutput<JEvt>& output,
80  const size_t N) :
81  stop(false),
82  oid(oid),
83  output(output)
84  {
85  using namespace std;
86 
87  for (size_t i = 0; i < N; ++i) {
88 
89  thread worker([this, katoomba]() {
90 
91  data_type data;
92 
93  for (JKatoomba_t fremantle(katoomba); ; ) {
94 
95  {
96  unique_lock<mutex> lock(in);
97 
98  cv.wait(lock, [this]() { return stop || !input.empty(); });
99 
100  if (stop && input.empty()) {
101  return;
102  }
103 
104  data.swap(input.front());
105 
106  input.pop();
107  }
108 
109  const auto result = fremantle(data.begin(), data.end());
110 
111  if (result.chi2 / result.ndf <= fremantle.parameters.chi2perNDF) {
112 
114 
115  for (data_type::const_iterator i = result.begin; i != result.end; ++i) {
116  range.include(i->getValue());
117  }
118 
119  const JEvt evt = getEvt(JHead(this->oid,
120  range.getLowerLimit(),
121  range.getUpperLimit(),
122  data.size(),
123  result.value.getN(),
124  result.ndf,
125  result.chi2),
126  result.value);
127 
128  {
129  unique_lock<mutex> lock(out);
130 
131  this->output.put(evt);
132  }
133  }
134  }
135  });
136 
137  workers.emplace_back(move(worker));
138  }
139  }
140 
141 
142  /**
143  * Destructor.
144  */
145  ~JFremantle()
146  {
147  using namespace std;
148 
149  {
150  unique_lock<mutex> lock(in);
151 
152  stop = true;
153  }
154 
155  cv.notify_all();
156 
157  for (auto& worker : workers) {
158  worker.join();
159  }
160  }
161 
162 
163  /**
164  * Get number of pending data.
165  *
166  * \return number of pending data
167  */
168  size_t backlog()
169  {
170  using namespace std;
171 
172  {
173  unique_lock<mutex> lock(in);
174 
175  return input.size();
176  }
177  }
178 
179 
180  /**
181  * Queue data.
182  *
183  * \param data data
184  */
185  void enqueue(data_type& data)
186  {
187  using namespace std;
188 
189  {
190  unique_lock<mutex> lock(in);
191 
192  if (stop) {
193  throw runtime_error("The thread pool has been stopped.");
194  }
195 
196  input.emplace(move(data));
197  }
198 
199  cv.notify_one();
200  }
201 
202  private:
203  vector<thread> workers;
204  queue <data_type> input;
205  mutex in;
206  mutex out;
207  condition_variable cv;
208  bool stop;
209  std::string oid;
210  JObjectOutput<JEvt>& output;
211  };
212 }
213 
214 
215 /**
216  * \file
217  *
218  * Application to make a global fit of the detector geometry to acoustic data.\n
219  * \author mdejong
220  */
221 int main(int argc, char **argv)
222 {
223  using namespace std;
224  using namespace JPP;
225 
226  typedef JContainer< vector<JTripod> > tripods_container;
227  typedef JContainer< vector<JTransmitter> > transmitters_container;
228  typedef JContainer< vector<JHydrophone> > hydrophones_container;
229  typedef JContainer< set<JTransmission_t> > disable_container;
230 
233  string detectorFile;
234  JLimit_t& numberOfEvents = inputFile.getLimit();
235  JSoundVelocity V = getSoundVelocity; // default sound velocity
236  tripods_container tripods; // tripods
237  transmitters_container transmitters; // transmitters
238  hydrophones_container hydrophones; // hydrophones
239  JFitParameters parameters; // fit parameters
240  bool unify; // unify weighing of pings
241  disable_container disable; // disable tansmissions
242  size_t jobs; // number of parallel jobs
243  int sleep_us; // sleep time [us]
244  double Tmax_s; // deadtime [s]
245  int debug;
246 
247  try {
248 
249  JParser<> zap("Application to fit position calibration model to acoustic data.");
250 
251  zap['f'] = make_field(inputFile, "output of JAcousticEventBuilder[.sh]");
252  zap['o'] = make_field(outputFile);
253  zap['n'] = make_field(numberOfEvents) = JLimit::max();
254  zap['a'] = make_field(detectorFile);
256  zap['V'] = make_field(V, "sound velocity") = JPARSER::initialised();
257  zap['T'] = make_field(tripods, "tripod data");
258  zap['Y'] = make_field(transmitters, "transmitter data") = JPARSER::initialised();
259  zap['H'] = make_field(hydrophones, "hydrophone data") = JPARSER::initialised();
260  zap['M'] = make_field(getMechanics, "mechanics data") = JPARSER::initialised();
261  zap['u'] = make_field(unify, "unify weighing of pings");
262  zap['!'] = make_field(disable, "disable transmission") = JPARSER::initialised();
263  zap['N'] = make_field(jobs, "number of parallel jobs") = 1;
264  zap['s'] = make_field(sleep_us, "sleep time [us]") = 100;
265  zap['D'] = make_field(Tmax_s, "deadtime [s]") = 100.0e-3;
266  zap['d'] = make_field(debug) = 1;
267 
268  zap(argc, argv);
269  }
270  catch(const exception &error) {
271  FATAL(error.what() << endl);
272  }
273 
274  ROOT::EnableThreadSafety();
275 
276  cout.tie(&cerr);
277 
279 
280  try {
281  load(detectorFile, detector);
282  }
283  catch(const JException& error) {
284  FATAL(error);
285  }
286 
287  JHashMap<int, JLocation> receivers;
288  JHashMap<int, JEmitter> emitters;
289 
290  for (JDetector::const_iterator i = detector.begin(); i != detector.end(); ++i) {
291  receivers[i->getID()] = i->getLocation();
292  }
293 
294  for (tripods_container::const_iterator i = tripods.begin(); i != tripods.end(); ++i) {
295  emitters[i->getID()] = JEmitter(i->getID(),
296  i->getUTMPosition() - detector.getUTMPosition());
297  }
298 
299  for (transmitters_container::const_iterator i = transmitters.begin(); i != transmitters.end(); ++i) {
300  try {
301  emitters[i->getID()] = JEmitter(i->getID(),
302  i->getPosition() + detector.getModule(i->getLocation()).getPosition());
303  }
304  catch(const exception&) {
305  continue; // if no module available, discard transmitter
306  }
307  }
308 
309  V.set(detector.getUTMZ()); // sound velocity at detector depth
310 
311  JGeometry geometry(detector, hydrophones);
312  JKatoomba_t katoomba(geometry, V, parameters);
313 
316 
319 
320  DEBUG(geometry);
321 
322  string oid; // detector identifier
323 
324  { // sort input files
325  map<double, string> zmap;
326 
327  for (const string& file_name : inputFile) {
328 
329  STATUS(file_name << '\r'); DEBUG(endl);
330 
331  for (JMultipleFileScanner<JEvent> in(file_name, 1); in.hasNext(); ) {
332 
333  const JEvent* evt = in.next();
334 
335  if (oid == "")
336  oid = evt->getOID();
337  else if (oid != evt->getOID()) // consistency check
338  FATAL("Invalid detector identifier " << evt->getOID() << " != " << oid << endl);
339 
340  if (!evt->empty()) {
341  zmap[evt->begin()->getToE()] = file_name;
342  }
343  }
344  }
345  STATUS(endl);
346 
347  inputFile.clear();
348 
349  for (map<double, string>::const_iterator i = zmap.begin(); i != zmap.end(); ++i) {
350  inputFile.push_back(i->second);
351  }
352  }
353 
354  outputFile.open();
355 
356  outputFile.put(JMeta(argc, argv));
357  outputFile.put(parameters);
358 
359  try {
360 
361  JFremantle fremantle(oid, katoomba, outputFile, jobs);
362 
363  typedef deque<JEvent> buffer_type;
364 
365  for (buffer_type zbuf; inputFile.hasNext(); ) {
366 
367  STATUS(inputFile.getFilename() << '\r'); DEBUG(endl);
368 
369  // read one file at a time
370 
371  for (const string file_name = inputFile.getFilename(); inputFile.hasNext() && file_name == inputFile.getFilename(); ) {
372 
373  const JEvent* evt = inputFile.next();
374 
375  if (emitters.has(evt->getID())) {
376  zbuf.push_back(*evt);
377  }
378  }
379 
380  sort(zbuf.begin(), zbuf.end()); // sort according first time-of-emission
381 
382  for (buffer_type::iterator p = zbuf.begin(), q; p != zbuf.end(); p = q) {
383 
384  for (q = p; ++q != zbuf.end() && q->begin()->getToE() <= p->rbegin()->getToE() + parameters.Tmax_s; ) {}
385 
386  if (q == zbuf.end()) {
387 
388  if (inputFile.hasNext()) {
389 
390  zbuf.erase(zbuf.begin(), p); // remove processed data and continue reading
391 
392  break;
393  }
394  }
395 
396  // empty overlapping events
397 
398  for (buffer_type::iterator __q = p, __p = __q++; __p != q && __q != q; __p = __q++) {
399 
400  if (__q->begin()->getToE() < __p->rbegin()->getToE() + Tmax_s) {
401 
402  __p->clear(); // clear first
403 
404  for (__p = __q++; __p != q && __q != q; __p = __q++) {
405 
406  if (__q->begin()->getToE() < __p->rbegin()->getToE() + Tmax_s)
407  __p->clear();
408  else
409  break;
410  }
411 
412  __p->clear(); // clear last
413  }
414  }
415 
416  if (getNumberOfEmitters(p,q) >= parameters.Nmin) {
417 
418  map<int, int> numberOfPings;
419 
420  for (buffer_type::const_iterator i = p; i != q; ++i) {
421  numberOfPings[i->getID()] += 1;
422  }
423 
424  for (map<int, int>::const_iterator i = numberOfPings.begin(); i != numberOfPings.end(); ++i) {
425  DEBUG("Number of pings " << setw(2) << i->first << ' ' << setw(3) << i->second << endl);
426  }
427 
428  int minimum_number_of_pings = numeric_limits<int>::max();
429 
430  for (map<int, int>::const_iterator i = numberOfPings.begin(); i != numberOfPings.end(); ++i) {
431  minimum_number_of_pings = min(minimum_number_of_pings, i->second);
432  }
433 
434  set<int> buffer;
435  data_type data;
436 
437  for (buffer_type::iterator evt = p; evt != q; ++evt) {
438 
439  sort(evt->begin(), evt->end(), compare);
440 
441  JEvent::iterator __end = unique(evt->begin(), evt->end(), make_comparator(&JTransmission::getID, JComparison::eq()));
442 
443  const JEmitter& emitter = emitters[evt->getID()];
444  const double signal = (unify ? (double) minimum_number_of_pings / (double) numberOfPings[evt->getID()] : 1.0);
445 
446  for (JEvent::const_iterator i = evt->begin(); i != __end; ++i) {
447 
448  if (disable.count(JTransmission_t(evt->getID(), i->getID())) == 0 &&
449  disable.count(JTransmission_t(-1, i->getID())) == 0) {
450 
451  if (receivers.has(i->getID()) && geometry.hasLocation(receivers[i->getID()]) && i->getQ() >= parameters.Qmin) {
452 
453  data.push_back(hit_type(emitter,
454  distance(zbuf.begin(),evt),
455  receivers[i->getID()],
456  JPDFGauss(i->getToA(), parameters.sigma_s, signal, parameters.background)));
457 
458  buffer.insert(evt->getID());
459  }
460  }
461  }
462  }
463 
464  if (buffer.size() >= parameters.Nmin) {
465 
466  while (fremantle.backlog() > jobs) {
467  this_thread::sleep_for(chrono::microseconds(sleep_us));
468  }
469 
470  fremantle.enqueue(data);
471  }
472  }
473  }
474  }
475  STATUS(endl);
476  }
477  catch(const exception& error) {
478  FATAL("main " << error.what());
479  }
480 
481  JMultipleFileScanner<JMeta> io(inputFile);
482 
483  io >> outputFile;
484 
485  outputFile.close();
486 }
Worker class for fit function of acoustic model.
Definition: JKatoomba.hh:869
Auxiliary class for ROOT I/O of application specific meta data.
Definition: JMeta.hh:70
Object writing to file.
Utility class to parse command line options.
Definition: JParser.hh:1517
Acoustic hit.
General exception.
Definition: JException.hh:23
int main(int argc, char *argv[])
Definition: Main.cc:15
JComparator< JResult_t T::*, JComparison::lt > make_comparator(JResult_t T::*member)
Helper method to create comparator between values of data member.
Definition: JComparator.hh:185
Sound velocity.
static JDetectorMechanics getMechanics
Function object to get string mechanics.
Definition: JMechanics.hh:243
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
General purpose class for hash map of unique keys.
Definition: JHashMap.hh:72
General purpose class for hash map of unique elements.
#define STATUS(A)
Definition: JMessage.hh:63
ROOT TTree parameter settings.
Detector data structure.
Definition: JDetector.hh:89
Recording of objects on file according a format that follows from the file name extension.
then JShowerPostfit f $INPUT_FILE o $OUTPUT_FILE N
JEvt getEvt(const JHead &header, const JModel &model)
Get event.
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Acoustic event.
then fatal Number of tripods
Definition: JFootprint.sh:45
Empty structure for specification of parser element that is initialised (i.e. does not require input)...
Definition: JParser.hh:83
V(JDAQEvent-JTriggerReprocessor)*1.0/(JDAQEvent+1.0e-10)
string outputFile
Acoustic emitter.
Data structure for detector geometry and calibration.
Acoustics hit.
Data structure for hydrophone.
size_t getNumberOfEmitters(T __begin, T __end)
Get number of emitters.
then usage $script[port]< option > nPossible stop
Acoustic fit parameters.
Acoustic event fit.
Auxiliary class for defining the range of iterations of objects.
Definition: JLimit.hh:41
static const JSoundVelocity getSoundVelocity(1541.0,-17.0e-3,-2000.0)
Function object for velocity of sound.
Detector file.
Definition: JHead.hh:226
Acoustic event fit.
Data structure for transmitter.
Acoustic emitter.
Definition: JEmitter.hh:27
Acoustics toolkit.
Auxiliary wrapper for I/O of container with optional comment (see JComment).
Definition: JContainer.hh:39
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1993
const std::string & getOID() const
Get detector identifier.
return result
Definition: JPolint.hh:764
ROOT I/O of application specific meta data.
JPosition3D getPosition(const Vec &pos)
Get position.
then awk string
static struct JACOUSTICS::@4 compare
Auxiliary data structure to sort transmissions.
General purpose messaging.
Monte Carlo run header.
Definition: JHead.hh:1167
Implementation for depth dependend velocity of sound.
#define FATAL(A)
Definition: JMessage.hh:67
Scanning of objects from multiple files according a format that follows from the extension of each fi...
z range($ZMAX-$ZMIN)< $MINIMAL_DZ." fi fi typeset -Z 4 STRING typeset -Z 2 FLOOR JPlot1D -f $
void load(const std::string &file_name, JDetector &detector)
Load detector from input file.
Auxiliary class to define a range between two values.
General purpose class for object reading from a list of file names.
Utility class to parse command line options.
Acoustic transmission identifier.
bool hasLocation(const JLocation &location) const
Check if this detector has given location.
Definition: JGeometry.hh:546
Acoustic event.
Template interface of object output for single data type.
Custom probability density function of time-of-arrival.
const JLimit & getLimit() const
Get limit.
Definition: JLimit.hh:73
Fit functions of acoustic model.
virtual bool put(const T &object)=0
Object output.
do set_variable DETECTOR_TXT $WORKDIR detector
int getID() const
Get identifier.
Data structure for tripod.
Acoustic event fit.
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Acoustic transmission identifier.
Template definition of fit function of acoustic model.
Definition: JKatoomba.hh:115
Container I/O.
int debug
debug level
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62
Data structure for optical module.