Jpp test-rotations-old
the software that should make you happy
Loading...
Searching...
No Matches
JMultiThreadedReconstruction.hh
Go to the documentation of this file.
1#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
2#define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
3
4#include <future>
5#include <mutex>
6#include <thread>
7#include <vector>
8#include <queue>
9#include <map>
10
12
13#include "JDAQ/JDAQEvaluator.hh"
15
17
18
19/**
20 * \author mdejong
21 */
22
23namespace JRECONSTRUCTION {}
24namespace JPP { using namespace JRECONSTRUCTION; }
25
26namespace JRECONSTRUCTION {
27
28 /**
29 * Thread pool for event-by-event reconstruction.
30 */
31 template<class JFit_t>
33 {
34 public:
35 typedef typename JFit_t::input_type input_type;
37
38
39 /**
40 * Constructor.
41 *
42 * \param fit fit
43 * \param writer writer
44 * \param ns number of threads
45 * \param backlog backlog
46 */
49 const size_t ns,
50 const size_t backlog = std::numeric_limits<size_t>::max()) :
52 stop(false),
54 {
55 using namespace std;
56 using namespace JPP;
57
58 for (size_t id = 0; id < ns; ++id) {
59 output[id] = 0;
60 }
61
62 for (size_t id = 0; id < ns; ++id) {
63
64 thread worker([this, fit, id]() {
65
66 input_type data;
67
68 for (JFit_t f1(fit); ; ) {
69
70 {
71 unique_lock<mutex> lock(in);
72
73 cv.wait(lock, [this]() { return stop || !input.empty(); });
74
75 if (stop && input.empty()) {
76 return;
77 }
78
79 swap(data, input.front());
80
81 input.pop();
82 }
83
84 cw.notify_one();
85
86 output_type evt(id, data.getDAQEventHeader(), f1(data));
87
88 {
89 unique_lock<mutex> lock(out);
90
91 output.push(evt);
92 }
93 }
94 });
95
96 workers.emplace_back(std::move(worker));
97 }
98 }
99
100
101 /**
102 * Destructor.
103 */
105 {
106 using namespace std;
107
108 {
109 unique_lock<mutex> lock(in);
110
111 stop = true;
112 }
113
114 cv.notify_all();
115
116 for (auto& worker : workers) {
117 worker.join();
118 }
119
120 // write remaining output
121
122 while (!output.empty()) {
123
124 writer.put(output.top());
125
126 output.pop();
127 }
128 }
129
130
131 /**
132 * Add data in queue.
133 *
134 * \param data data
135 */
136 void enqueue(input_type& data)
137 {
138 using namespace std;
139
140 {
141 unique_lock<mutex> lock(in);
142
143 cw.wait(lock, [this]() { return stop || input.size() < backlog; });
144
145 if (stop) {
146 throw runtime_error("The thread pool has been stopped.");
147 }
148
149 input.emplace(std::move(data));
150 }
151
152 cv.notify_one();
153
154 flush();
155 }
156
157
158 /**
159 * Flush output.
160 */
161 void flush()
162 {
163 unique_lock<mutex> lock(out);
164
165 while (output.is_ready()) {
166
167 writer.put(output.top());
168
169 output.pop();
170 }
171 }
172
173 private:
174 /**
175 * Output data type.
176 */
177 struct output_type :
178 public JDAQEventHeader,
179 public JEvt
180 {
181 /**
182 * Default constructor.
183 */
185 {}
186
187
188 /**
189 * Constructor.
190 *
191 * \param id thread identifier
192 * \param header header
193 * \param out result values
194 */
195 output_type(const size_t id,
196 const JDAQEventHeader& header,
197 const JEvt& out) :
198 JDAQEventHeader(header),
199 JEvt(out),
200 id(id)
201 {}
202
203
204 /**
205 * Less-than operator for priority queue.
206 *
207 * \param first first event
208 * \param second second event
209 * \return true if first event later then second event
210 */
211 friend inline bool operator<(const output_type& first, const output_type& second)
212 {
214 }
215
216 size_t id;
217 };
218
219
220 /**
221 * Type definition of output queue.
222 */
223 typedef std::priority_queue<output_type, std::vector<output_type> > queue_type;
224
225
226 /**
227 * Auxiliary data structure to maintain time order of events for writing.
228 */
230 public queue_type
231 {
232 /**
233 * Get queue counter.
234 *
235 * \param id thread identifier
236 * \return counter
237 */
238 size_t& operator[](const size_t id)
239 {
240 return queue[id];
241 }
242
243
244 /**
245 * Push object in queue.
246 *
247 * \param object object
248 */
249 void push(output_type& object)
250 {
251 queue[object.id] += 1;
252
253 static_cast<queue_type&>(*this).emplace(std::move(object));
254 }
255
256
257 /**
258 * Pop first element from queue.
259 */
260 void pop()
261 {
262 const output_type& object = this->top();
263
264 queue[object.id] -= 1;
265
266 static_cast<queue_type&>(*this).pop();
267 }
268
269
270 /**
271 * Check readiness.
272 *
273 * \return true if number of entries in queue from each thread not equal to zero; else false
274 */
275 bool is_ready() const
276 {
277 for (const auto& i : queue) {
278 if (i.second == 0) {
279 return false;
280 }
281 }
282
283 return !queue.empty();
284 }
285
286 private:
287 std::map<size_t, size_t> queue; //!< number of entries in queue per thread
288
290
293 std::queue <input_type> input;
294 std::mutex in;
295 std::mutex out;
296 std::condition_variable cv;
297 std::condition_variable cw;
298 bool stop;
299 size_t backlog;
300 };
301}
302
303#endif
304
Data structure for set of track fit results.
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Thread pool for event-by-event reconstruction.
JMultiThreadedReconstruction(const JFit_t &fit, writer_type &writer, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
JRECONSTRUCTION::JMultiThreadedReconstruction::queue_type_t output
std::priority_queue< output_type, std::vector< output_type > > queue_type
Type definition of output queue.
const JDAQEventHeader & getDAQEventHeader() const
Get DAQ event header.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
Model fits to data.
static const JDAQEvaluator getDAQValue
Function object for evaluation of DAQ objects.
output_type(const size_t id, const JDAQEventHeader &header, const JEvt &out)
Constructor.
friend bool operator<(const output_type &first, const output_type &second)
Less-than operator for priority queue.
Auxiliary data structure to maintain time order of events for writing.
std::map< size_t, size_t > queue
number of entries in queue per thread