Jpp  master_rocky-40-g5f0272dcd
the software that should make you happy
JFremantle_t.hh
Go to the documentation of this file.
1 #ifndef __JACOUSTICS__JFREMANTLE_T__
2 #define __JACOUSTICS__JFREMANTLE_T__
3 
4 #include <string>
5 #include <type_traits>
6 #include <functional>
7 #include <future>
8 #include <mutex>
9 #include <thread>
10 #include <vector>
11 #include <queue>
12 #include <memory>
13 #include <limits>
14 
15 #include "JLang/JObjectOutput.hh"
16 
17 #include "JMath/JQuantile_t.hh"
18 
19 #include "JAcoustics/JHit.hh"
20 #include "JAcoustics/JGlobalfit.hh"
21 #include "JAcoustics/JSuperEvt.hh"
23 
24 
25 namespace JACOUSTICS {}
26 namespace JPP { using namespace JACOUSTICS; }
27 
28 namespace JACOUSTICS {
29 
30  /**
31  * Thread pool for global fits.
32  */
33  class JFremantle {
34  public:
35 
38 
39 
40  /**
41  * Constructor.
42  *
43  * \param geometry detector geometry
44  * \param velocity sound velocity
45  * \param parameters parameters
46  * \param ns number of threads
47  * \param backlog backlog
48  */
49  JFremantle(const JGeometry& geometry,
50  const JSoundVelocity& velocity,
51  const JFitParameters& parameters,
52  const size_t ns,
53  const size_t backlog = std::numeric_limits<size_t>::max()) :
54  stop(false),
56  {
57  using namespace std;
58  using namespace JPP;
59 
60  Q.reset();
61 
62  for (size_t i = 0; i < ns; ++i) {
63 
64  thread worker([this, geometry, velocity, parameters]() {
65 
67 
68  for (JGlobalfit katoomba(geometry, velocity, parameters); ; ) {
69 
70  {
71  unique_lock<mutex> lock(in);
72 
73  cv.wait(lock, [this]() { return stop || !input.empty(); });
74 
75  if (stop && input.empty()) {
76  return;
77  }
78 
79  data.swap(input.front());
80 
81  input.pop();
82  }
83 
84  cw.notify_one();
85 
86  const auto result = katoomba(data.begin(), data.end());
87 
88  if (result.chi2 / result.ndf <= katoomba.parameters.chi2perNDF) {
89 
90  {
91  unique_lock<mutex> lock(out);
92 
93  Q.put(result.chi2 / result.ndf);
94 
95  if (JFremantle::output != NULL) {
97  result.getTimeRange(),
98  data .size(),
99  result.size(),
100  result.value.getN(),
101  result.ndf,
102  result.chi2),
103  result.value,
104  result.begin,
105  JFremantle::squash ? result.begin : result.end));
106  }
107  }
108  }
109  }
110  });
111 
112  workers.emplace_back(std::move(worker));
113  }
114  }
115 
116 
117  /**
118  * Destructor.
119  */
121  {
122  using namespace std;
123 
124  {
125  unique_lock<mutex> lock(in);
126 
127  stop = true;
128  }
129 
130  cv.notify_all();
131 
132  for (auto& worker : workers) {
133  worker.join();
134  }
135  }
136 
137 
138  /**
139  * Queue data.
140  *
141  * \param data data
142  */
144  {
145  using namespace std;
146 
147  {
148  unique_lock<mutex> lock(in);
149 
150  cw.wait(lock, [this]() { return stop || input.size() <= backlog; });
151 
152  if (stop) {
153  throw runtime_error("The thread pool has been stopped.");
154  }
155 
156  input.emplace(std::move(data));
157  }
158 
159  cv.notify_one();
160  }
161 
162  static int detid; //!< detector identifier
163  static JMATH::JQuantile_t Q; //!< chi2/NDF
164  static bool squash; //!< squash transmissions in output
165  static output_type* output; //!< optional output
166 
167  private:
169  std::queue <input_type> input;
170  std::mutex in;
171  std::mutex out;
172  std::condition_variable cv;
173  std::condition_variable cw;
174  bool stop;
175  size_t backlog;
176  };
177 }
178 
179 #endif
Acoustic hit.
Fit function of acoustic model.
Acoustic super event fit toolkit.
Acoustic event fit.
Thread pool for global fits.
Definition: JFremantle_t.hh:33
static output_type * output
optional output
std::condition_variable cw
JLANG::JObjectOutput< JSuperEvt > output_type
Definition: JFremantle_t.hh:37
void enqueue(input_type &data)
Queue data.
std::vector< std::thread > workers
std::vector< JHit > input_type
Definition: JFremantle_t.hh:36
static JMATH::JQuantile_t Q
chi2/NDF
std::condition_variable cv
static int detid
detector identifier
JFremantle(const JGeometry &geometry, const JSoundVelocity &velocity, const JFitParameters &parameters, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
Definition: JFremantle_t.hh:49
~JFremantle()
Destructor.
static bool squash
squash transmissions in output
std::queue< input_type > input
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Auxiliary classes and methods for acoustic position calibration.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
std::vector< size_t > ns
Definition: JSTDTypes.hh:14
Global fit of prameterised detector geometry to acoustics data.
Definition: JGlobalfit.hh:29
Acoustic event header.
Implementation for depth dependend velocity of sound.
Auxiliary data structure to convert model to super event.
Auxiliary data structure for average.
Definition: JQuantile_t.hh:25
void put(const double x)
Put value.
Definition: JQuantile_t.hh:49
void reset()
Reset.
Definition: JQuantile_t.hh:38