Jpp 19.3.0-rc.1
the software that should make you happy
Loading...
Searching...
No Matches
JMultiThreadedReconstruction.hh
Go to the documentation of this file.
1#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
2#define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
3
4#include <future>
5#include <mutex>
6#include <thread>
7#include <vector>
8#include <queue>
9#include <map>
10
12
13#include "JDAQ/JDAQEvaluator.hh"
15
17
18
19/**
20 * \author mdejong
21 */
22
23namespace JRECONSTRUCTION {}
24namespace JPP { using namespace JRECONSTRUCTION; }
25
26namespace JRECONSTRUCTION {
27
28 /**
29 * Thread pool for event-by-event reconstruction.
30 */
31 template<class JFit_t>
33 {
34 public:
35 typedef typename JFit_t::input_type input_type;
37
38
39 /**
40 * Constructor.
41 *
42 * \param fit fit
43 * \param writer writer
44 * \param ns number of threads
45 * \param backlog backlog
46 */
49 const size_t ns,
50 const size_t backlog = std::numeric_limits<size_t>::max()) :
52 stop(false),
54 {
55 using namespace std;
56 using namespace JPP;
57
58 for (size_t id = 0; id < ns; ++id) {
59 output[id] = 0;
60 }
61
62 for (size_t id = 0; id < ns; ++id) {
63
64 thread worker([this, fit, id]() {
65
66 input_type data;
67
68 for (JFit_t f1(fit); ; ) {
69
70 {
71 unique_lock<mutex> lock(in);
72
73 cv.wait(lock, [this]() { return stop || !input.empty(); });
74
75 if (stop && input.empty()) {
76 return;
77 }
78
79 swap(data, input.front());
80
81 input.pop();
82 }
83
84 cw.notify_one();
85
86 output_type evt(id, data.getDAQEventHeader(), f1(data));
87
88 {
89 unique_lock<mutex> lock(out);
90
91 output.push(evt);
92 }
93 }
94 });
95
96 workers.emplace_back(std::move(worker));
97 }
98 }
99
100
101 /**
102 * Destructor.
103 */
105 {
106 using namespace std;
107
108 {
109 unique_lock<mutex> lock(in);
110
111 stop = true;
112 }
113
114 cv.notify_all();
115
116 for (auto& worker : workers) {
117 worker.join();
118 }
119
120 // write remaining output
121
122 while (!output.empty()) {
123
124 writer.put(output.top());
125
126 output.pop();
127 }
128 }
129
130
131 /**
132 * Add data in queue.
133 *
134 * \param data data
135 */
136 void enqueue(input_type& data)
137 {
138 using namespace std;
139
140 {
141 unique_lock<mutex> lock(in);
142
143 cw.wait(lock, [this]() { return stop || input.size() < backlog; });
144
145 if (stop) {
146 throw runtime_error("The thread pool has been stopped.");
147 }
148
149 input.emplace(std::move(data));
150 }
151
152 cv.notify_one();
153
154 {
155 unique_lock<mutex> lock(out);
156
157 while (output.is_ready()) {
158
159 writer.put(output.top());
160
161 output.pop();
162 }
163 }
164 }
165
166 private:
167 /**
168 * Output data type.
169 */
170 struct output_type :
171 public JDAQEventHeader,
172 public JEvt
173 {
174 /**
175 * Default constructor.
176 */
178 {}
179
180
181 /**
182 * Constructor.
183 *
184 * \param id thread identifier
185 * \param header header
186 * \param out result values
187 */
188 output_type(const size_t id,
189 const JDAQEventHeader& header,
190 const JEvt& out) :
191 JDAQEventHeader(header),
192 JEvt(out),
193 id(id)
194 {}
195
196
197 /**
198 * Less-than operator for priority queue.
199 *
200 * \param first first event
201 * \param second second event
202 * \return true if first event later then second event
203 */
204 friend inline bool operator<(const output_type& first, const output_type& second)
205 {
207 }
208
209 size_t id;
210 };
211
212
213 /**
214 * Type definition of output queue.
215 */
216 typedef std::priority_queue<output_type, std::vector<output_type> > queue_type;
217
218
219 /**
220 * Auxiliary data structure to maintain time order of events for writing.
221 */
223 public queue_type
224 {
225 /**
226 * Get queue counter.
227 *
228 * \param id thread identifier
229 * \return counter
230 */
231 size_t& operator[](const size_t id)
232 {
233 return queue[id];
234 }
235
236
237 /**
238 * Push object in queue.
239 *
240 * \param object object
241 */
242 void push(output_type& object)
243 {
244 queue[object.id] += 1;
245
246 static_cast<queue_type&>(*this).emplace(std::move(object));
247 }
248
249
250 /**
251 * Pop first element from queue.
252 */
253 void pop()
254 {
255 const output_type& object = this->top();
256
257 queue[object.id] -= 1;
258
259 static_cast<queue_type&>(*this).pop();
260 }
261
262
263 /**
264 * Check readiness.
265 *
266 * \return true if number of entries in queue from each thread not equal to zero; else false
267 */
268 bool is_ready() const
269 {
270 for (const auto& i : queue) {
271 if (i.second == 0) {
272 return false;
273 }
274 }
275
276 return !queue.empty();
277 }
278
279 private:
280 std::map<size_t, size_t> queue; //!< number of entries in queue per thread
281
283
286 std::queue <input_type> input;
287 std::mutex in;
288 std::mutex out;
289 std::condition_variable cv;
290 std::condition_variable cw;
291 bool stop;
292 size_t backlog;
293 };
294}
295
296#endif
297
Data structure for set of track fit results.
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Thread pool for event-by-event reconstruction.
JMultiThreadedReconstruction(const JFit_t &fit, writer_type &writer, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
JRECONSTRUCTION::JMultiThreadedReconstruction::queue_type_t output
std::priority_queue< output_type, std::vector< output_type > > queue_type
Type definition of output queue.
const JDAQEventHeader & getDAQEventHeader() const
Get DAQ event header.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
Model fits to data.
static const JDAQEvaluator getDAQValue
Function object for evaluation of DAQ objects.
output_type(const size_t id, const JDAQEventHeader &header, const JEvt &out)
Constructor.
friend bool operator<(const output_type &first, const output_type &second)
Less-than operator for priority queue.
Auxiliary data structure to maintain time order of events for writing.
std::map< size_t, size_t > queue
number of entries in queue per thread