Jpp  18.0.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JFremantle.cc
Go to the documentation of this file.
1 #include <string>
2 #include <iostream>
3 #include <iomanip>
4 #include <vector>
5 #include <set>
6 #include <map>
7 #include <deque>
8 #include <algorithm>
9 #include <limits>
10 
11 #include <type_traits>
12 #include <functional>
13 #include <future>
14 #include <mutex>
15 #include <thread>
16 #include <vector>
17 #include <queue>
18 
19 #include "TROOT.h"
20 #include "TFile.h"
21 
22 #include "JLang/JPredicate.hh"
23 #include "JLang/JComparator.hh"
24 #include "JLang/JComparison.hh"
25 #include "JLang/JFileStream.hh"
26 
27 #include "JDetector/JDetector.hh"
29 #include "JDetector/JTripod.hh"
31 #include "JDetector/JModule.hh"
32 #include "JDetector/JHydrophone.hh"
33 
34 #include "JTools/JHashMap.hh"
35 #include "JTools/JRange.hh"
36 #include "JTools/JQuantile.hh"
37 
40 #include "JSupport/JMeta.hh"
41 
43 #include "JAcoustics/JEmitter.hh"
45 #include "JAcoustics/JHit.hh"
48 #include "JAcoustics/JEvent.hh"
49 #include "JAcoustics/JSuperEvt.hh"
51 #include "JAcoustics/JSupport.hh"
53 
54 #include "Jeep/JContainer.hh"
55 #include "Jeep/JParser.hh"
56 #include "Jeep/JMessage.hh"
57 
58 
59 namespace {
60 
61  using namespace std;
62  using namespace JPP;
63 
64  typedef vector<JHit> data_type;
65 
66  /**
67  * Thread pool for global fits.
68  */
69  class JFremantle {
70  public:
71  /**
72  * Constructor.
73  *
74  * \param katoomba global fit
75  * \param ns number of threads
76  */
77  JFremantle(const JKatoomba_t& katoomba, const size_t ns) :
78  stop(false)
79  {
80  using namespace std;
81 
82  for (size_t i = 0; i < ns; ++i) {
83 
84  thread worker([this, katoomba]() {
85 
86  data_type data;
87 
88  for (JKatoomba_t fremantle(katoomba); ; ) {
89 
90  {
91  unique_lock<mutex> lock(in);
92 
93  cv.wait(lock, [this]() { return stop || !input.empty(); });
94 
95  if (stop && input.empty()) {
96  return;
97  }
98 
99  data.swap(input.front());
100 
101  input.pop();
102  }
103 
104  const auto result = fremantle(data.begin(), data.end());
105 
106  if (result.chi2 / result.ndf <= fremantle.parameters.chi2perNDF) {
107 
108  {
109  unique_lock<mutex> lock(out);
110 
111  Q.put(result.chi2 / result.ndf);
112 
113  if (JFremantle::output != NULL) {
114  JFremantle::output->put(getSuperEvt(JHead(JFremantle::oid,
115  result.getTimeRange(),
116  data .size(),
117  result.size(),
118  result.value.getN(),
119  result.ndf,
120  result.chi2),
121  result.value,
122  result.begin,
123  result.end));
124  }
125  }
126  }
127  }
128  });
129 
130  workers.emplace_back(move(worker));
131  }
132  }
133 
134 
135  /**
136  * Destructor.
137  */
138  ~JFremantle()
139  {
140  using namespace std;
141 
142  {
143  unique_lock<mutex> lock(in);
144 
145  stop = true;
146  }
147 
148  cv.notify_all();
149 
150  for (auto& worker : workers) {
151  worker.join();
152  }
153  }
154 
155 
156  /**
157  * Get number of pending data.
158  *
159  * \return number of pending data
160  */
161  size_t backlog()
162  {
163  using namespace std;
164 
165  {
166  unique_lock<mutex> lock(in);
167 
168  return input.size();
169  }
170  }
171 
172 
173  /**
174  * Queue data.
175  *
176  * \param data data
177  */
178  void enqueue(data_type& data)
179  {
180  using namespace std;
181 
182  {
183  unique_lock<mutex> lock(in);
184 
185  if (stop) {
186  throw runtime_error("The thread pool has been stopped.");
187  }
188 
189  input.emplace(move(data));
190  }
191 
192  cv.notify_one();
193  }
194 
195  static std::string oid; //!< detector identifier
196  static JQuantile Q; //!< chi2/NDF
197  static JObjectOutput<JSuperEvt>* output; //!< optional output
198 
199  private:
200  vector<thread> workers;
201  queue <data_type> input;
202  mutex in;
203  mutex out;
204  condition_variable cv;
205  bool stop;
206  };
207 
208  std::string JFremantle::oid = "";
210  JObjectOutput<JSuperEvt>* JFremantle::output = NULL;
211 }
212 
213 
214 /**
215  * \file
216  *
217  * Application to make a global fit of the detector geometry to acoustic data.\n
218  * \author mdejong
219  */
220 int main(int argc, char **argv)
221 {
222  using namespace std;
223  using namespace JPP;
224 
225  typedef JContainer< vector<JTripod> > tripods_container;
226  typedef JContainer< vector<JTransmitter> > transmitters_container;
227  typedef JContainer< vector<JHydrophone> > hydrophones_container;
228  typedef JContainer< set<JTransmission_t> > disable_container;
229 
232  string detectorFile;
233  JLimit_t& numberOfEvents = inputFile.getLimit();
234  JSoundVelocity V = getSoundVelocity; // default sound velocity
235  tripods_container tripods; // tripods
236  transmitters_container transmitters; // transmitters
237  hydrophones_container hydrophones; // hydrophones
238  JFitParameters parameters; // fit parameters
239  bool unify; // unify weighing of pings
240  disable_container disable; // disable tansmissions
241  size_t jobs; // number of parallel jobs
242  double Tmax_s; // deadtime [s]
243  int debug;
244 
245  try {
246 
247  JParser<> zap("Application to fit position calibration model to acoustic data.");
248 
249  zap['f'] = make_field(inputFile, "output of JAcousticEventBuilder[.sh]");
250  zap['o'] = make_field(outputFile) = "";
251  zap['n'] = make_field(numberOfEvents) = JLimit::max();
252  zap['a'] = make_field(detectorFile);
254  zap['V'] = make_field(V, "sound velocity") = JPARSER::initialised();
255  zap['T'] = make_field(tripods, "tripod data");
256  zap['Y'] = make_field(transmitters, "transmitter data") = JPARSER::initialised();
257  zap['H'] = make_field(hydrophones, "hydrophone data") = JPARSER::initialised();
258  zap['M'] = make_field(getMechanics, "mechanics data") = JPARSER::initialised();
259  zap['u'] = make_field(unify, "unify weighing of pings");
260  zap['!'] = make_field(disable, "disable transmission") = JPARSER::initialised();
261  zap['N'] = make_field(jobs, "number of parallel jobs") = 1;
262  zap['D'] = make_field(Tmax_s, "deadtime [s]") = 100.0e-3;
263  zap['d'] = make_field(debug) = 1;
264 
265  zap(argc, argv);
266  }
267  catch(const exception &error) {
268  FATAL(error.what() << endl);
269  }
270 
271  ROOT::EnableThreadSafety();
272 
273  const int sleep_us = 100; // sleep time [us]
274 
276 
277  try {
278  load(detectorFile, detector);
279  }
280  catch(const JException& error) {
281  FATAL(error);
282  }
283 
284  JHashMap<int, JLocation> receivers;
285  JHashMap<int, JEmitter> emitters;
286 
287  for (JDetector::const_iterator i = detector.begin(); i != detector.end(); ++i) {
288  receivers[i->getID()] = i->getLocation();
289  }
290 
291  for (tripods_container::const_iterator i = tripods.begin(); i != tripods.end(); ++i) {
292  emitters[i->getID()] = JEmitter(i->getID(),
293  i->getUTMPosition() - detector.getUTMPosition());
294  }
295 
296  for (transmitters_container::const_iterator i = transmitters.begin(); i != transmitters.end(); ++i) {
297  try {
298  emitters[i->getID()] = JEmitter(i->getID(),
299  i->getPosition() + detector.getModule(i->getLocation()).getPosition());
300  }
301  catch(const exception&) {
302  continue; // if no module available, discard transmitter
303  }
304  }
305 
306  V.set(detector.getUTMZ()); // sound velocity at detector depth
307 
308  JGeometry geometry(detector, hydrophones);
309  JKatoomba_t katoomba(geometry, V, parameters);
310 
313 
314  { // sort input files
315  map<double, string> zmap;
316 
317  for (const string& file_name : inputFile) {
318 
319  STATUS(file_name << '\r'); DEBUG(endl);
320 
321  for (JMultipleFileScanner<JEvent> in(file_name, 1); in.hasNext(); ) {
322 
323  const JEvent* evt = in.next();
324 
325  if (JFremantle::oid == "")
326  JFremantle::oid = evt->getOID();
327  else if (JFremantle::oid != evt->getOID()) // consistency check
328  FATAL("Invalid detector identifier " << evt->getOID() << " != " << JFremantle::oid << endl);
329 
330  if (!evt->empty()) {
331  zmap[evt->begin()->getToE()] = file_name;
332  }
333  }
334  }
335  STATUS(endl);
336 
337  inputFile.clear();
338 
339  for (map<double, string>::const_iterator i = zmap.begin(); i != zmap.end(); ++i) {
340  inputFile.push_back(i->second);
341  }
342  }
343 
344  const JRange<double> unit(0.0, 1.0);
345 
346  if (outputFile.getFilename() != "" &&
347  outputFile.getFilename() != "--") {
348 
349  outputFile.open();
350 
351  JFremantle::output = &outputFile;
352  }
353  /*
354  outputFile.put(JMeta(argc, argv));
355  outputFile.put(parameters);
356  */
357 
358  try {
359 
360  JFremantle fremantle(katoomba, jobs);
361 
362  typedef deque<JEvent> buffer_type;
363 
364  for (buffer_type zbuf; inputFile.hasNext(); ) {
365 
366  STATUS(inputFile.getFilename() << '\r'); DEBUG(endl);
367 
368  // read one file at a time
369 
370  for (const string file_name = inputFile.getFilename(); inputFile.hasNext() && file_name == inputFile.getFilename(); ) {
371 
372  const JEvent* evt = inputFile.next();
373 
374  if (emitters.has(evt->getID())) {
375  zbuf.push_back(*evt);
376  }
377  }
378 
379  sort(zbuf.begin(), zbuf.end()); // sort according first time-of-emission
380 
381  for (buffer_type::iterator p = zbuf.begin(), q; p != zbuf.end(); p = q) {
382 
383  for (q = p; ++q != zbuf.end() && q->begin()->getToE() <= p->rbegin()->getToE() + parameters.Tmax_s; ) {}
384 
385  if (q == zbuf.end()) {
386 
387  if (inputFile.hasNext()) {
388 
389  zbuf.erase(zbuf.begin(), p); // remove processed data and continue reading
390 
391  break;
392  }
393  }
394 
395  overlap(p, q, Tmax_s); // empty overlapping events
396 
397  if (getNumberOfEmitters(p,q) >= parameters.Nmin) {
398 
399  map<int, int> numberOfPings;
400 
401  for (buffer_type::const_iterator i = p; i != q; ++i) {
402  numberOfPings[i->getID()] += 1;
403  }
404 
405  int minimum_number_of_pings = numeric_limits<int>::max();
406 
407  for (map<int, int>::const_iterator i = numberOfPings.begin(); i != numberOfPings.end(); ++i) {
408  minimum_number_of_pings = min(minimum_number_of_pings, i->second);
409  }
410 
411  set<int> buffer;
412  data_type data;
413 
414  for (buffer_type::iterator evt = p; evt != q; ++evt) {
415 
416  sort(evt->begin(), evt->end(), compare);
417 
418  JEvent::iterator __end = unique(evt->begin(), evt->end(), make_comparator(&JTransmission::getID, JComparison::eq()));
419 
420  const JEmitter& emitter = emitters[evt->getID()];
421  const double weight = (unify ? (double) minimum_number_of_pings / (double) numberOfPings[evt->getID()] : 1.0);
422 
423  for (JEvent::const_iterator i = evt->begin(); i != __end; ++i) {
424 
425  if (disable.count(JTransmission_t(evt->getID(), i->getID())) == 0 &&
426  disable.count(JTransmission_t(-1, i->getID())) == 0) {
427 
428  if (receivers.has(i->getID()) && geometry.hasLocation(receivers[i->getID()]) && i->getQ() >= parameters.Qmin * (unit(parameters.Qmin) ? i->getW() : 1.0)) {
429 
430  data.push_back(JHit(emitter,
431  distance(p,evt),
432  receivers[i->getID()],
433  i->getToA(),
434  parameters.sigma_s,
435  weight));
436 
437  buffer.insert(evt->getID());
438  }
439  }
440  }
441  }
442 
443  if (buffer.size() >= parameters.Nmin) {
444 
445  while (fremantle.backlog() > jobs) {
446  this_thread::sleep_for(chrono::microseconds(sleep_us));
447  }
448 
449  fremantle.enqueue(data);
450  }
451  }
452  }
453  }
454  STATUS(endl);
455  }
456  catch(const exception& error) {
457  FATAL("main " << error.what());
458  }
459  /*
460  JMultipleFileScanner<JMeta> io(inputFile);
461 
462  io >> outputFile;
463  */
464  outputFile.close();
465 
466  JFileOutputStream(3) << SCIENTIFIC(1,10) << JFremantle::Q.getMean(numeric_limits<float>::max()) << endl;
467 }
Worker class for complete fit procedure of acoustic model.
Definition: JKatoomba_t.hh:29
Object writing to file.
Utility class to parse command line options.
Definition: JParser.hh:1514
Acoustic hit.
General exception.
Definition: JException.hh:23
void overlap(T p, T q, const double Tmax_s)
Empty overlapping events.
Q(UTCMax_s-UTCMin_s)-livetime_s
int main(int argc, char *argv[])
Definition: Main.cc:15
JComparator< JResult_t T::*, JComparison::lt > make_comparator(JResult_t T::*member)
Helper method to create comparator between values of data member.
Sound velocity.
static JDetectorMechanics getMechanics
Function object to get string mechanics.
Definition: JMechanics.hh:243
std::vector< T >::difference_type distance(typename std::vector< T >::const_iterator first, typename PhysicsEvent::const_iterator< T > second)
Specialisation of STL distance.
General purpose class for hash map of unique keys.
Definition: JHashMap.hh:72
Auxiliary data structure for running average, standard deviation and quantiles.
Definition: JQuantile.hh:43
General purpose class for hash map of unique elements.
#define STATUS(A)
Definition: JMessage.hh:63
ROOT TTree parameter settings.
Detector data structure.
Definition: JDetector.hh:89
Recording of objects on file according a format that follows from the file name extension.
std::vector< size_t > ns
*fatal Wrong number of arguments esac JCookie sh typeset Z DETECTOR typeset Z SOURCE_RUN typeset Z TARGET_RUN set_variable PARAMETERS_FILE $WORKDIR parameters
Definition: diff-Tuna.sh:38
Acoustic event.
then fatal Number of tripods
Definition: JFootprint.sh:45
Empty structure for specification of parser element that is initialised (i.e. does not require input)...
Definition: JParser.hh:83
JSuperEvt getSuperEvt(const JHead &header, const JModel &model, T begin, T end)
Get super event.
V(JDAQEvent-JTriggerReprocessor)*1.0/(JDAQEvent+1.0e-10)
string outputFile
Acoustic emitter.
Data structure for detector geometry and calibration.
Acoustics hit.
Data structure for hydrophone.
size_t getNumberOfEmitters(T __begin, T __end)
Get number of emitters.
then usage $script[port]< option > nPossible stop
Acoustic super event fit toolkit.
Acoustic fit parameters.
Auxiliary class for defining the range of iterations of objects.
Definition: JLimit.hh:41
static const JSoundVelocity getSoundVelocity(1541.0,-17.0e-3,-2000.0)
Function object for velocity of sound.
Detector file.
Definition: JHead.hh:226
Data structure for transmitter.
Acoustic emitter.
Definition: JEmitter.hh:27
Acoustics toolkit.
Auxiliary wrapper for I/O of container with optional comment (see JComment).
Definition: JContainer.hh:39
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
const std::string & getOID() const
Get detector identifier.
Acoustic event fit.
Streaming of output.
Definition: JFileStream.hh:46
return result
Definition: JPolint.hh:764
ROOT I/O of application specific meta data.
JPosition3D getPosition(const Vec &pos)
Get position.
then awk string
static struct JACOUSTICS::@4 compare
Auxiliary data structure to sort transmissions.
General purpose messaging.
Monte Carlo run header.
Definition: JHead.hh:1221
Implementation for depth dependend velocity of sound.
#define FATAL(A)
Definition: JMessage.hh:67
Scanning of objects from multiple files according a format that follows from the extension of each fi...
void load(const std::string &file_name, JDetector &detector)
Load detector from input file.
Auxiliary class to define a range between two values.
General purpose class for object reading from a list of file names.
Utility class to parse command line options.
Acoustic transmission identifier.
Fit function of acoustic model.
bool hasLocation(const JLocation &location) const
Check if this detector has given location.
Definition: JGeometry.hh:561
Acoustic event.
Template interface of object output for single data type.
const JLimit & getLimit() const
Get limit.
Definition: JLimit.hh:84
do set_variable DETECTOR_TXT $WORKDIR detector
int getID() const
Get identifier.
Data structure for tripod.
then fatal Wrong number of arguments fi set_variable DETECTOR $argv[1] set_variable INPUT_FILE $argv[2] eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:46
Acoustic transmission identifier.
Auxiliary data structure for floating point format specification.
Definition: JManip.hh:484
Template definition of fit function of acoustic model.
Definition: JKatoomba.hh:105
Container I/O.
int debug
debug level
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62
Data structure for optical module.