Jpp 20.0.0-rc.2
the software that should make you happy
Loading...
Searching...
No Matches
JMultiThreadedReconstruction.hh
Go to the documentation of this file.
1#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
2#define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
3
4#include <future>
5#include <mutex>
6#include <thread>
7#include <vector>
8#include <queue>
9#include <map>
10
12
13#include "JDAQ/JDAQEvaluator.hh"
15
17
18
19/**
20 * \author mdejong
21 */
22
23namespace JRECONSTRUCTION {}
24namespace JPP { using namespace JRECONSTRUCTION; }
25
26namespace JRECONSTRUCTION {
27
28 /**
29 * Thread pool for event-by-event reconstruction.
30 */
31 template<class JFit_t>
33 {
34 public:
35 typedef typename JFit_t::input_type input_type;
37
38
39 /**
40 * Constructor.
41 *
42 * \param fit fit
43 * \param writer writer
44 * \param ns number of threads
45 * \param backlog backlog
46 */
49 const size_t ns,
50 const size_t backlog = std::numeric_limits<size_t>::max()) :
52 stop(false),
54 {
55 using namespace std;
56 using namespace JPP;
57
58 for (size_t id = 0; id < ns; ++id) {
59 output[id] = 0;
60 }
61
62 for (size_t id = 0; id < ns; ++id) {
63
64 thread worker([this, fit, id]() {
65
66 input_type data;
67
68 for (JFit_t f1(fit); ; ) {
69
70 {
71 unique_lock<mutex> lock(in);
72
73 cv.wait(lock, [this]() { return stop || !input.empty(); });
74
75 if (stop && input.empty()) {
76 return;
77 }
78
79 swap(data, input.front());
80
81 input.pop();
82 }
83
84 cw.notify_one();
85
86 output_type evt(id, data.getDAQEventHeader(), f1(data));
87
88 {
89 unique_lock<mutex> lock(out);
90
91 output.push(evt);
92 }
93 }
94 });
95
96 workers.emplace_back(std::move(worker));
97 }
98 }
99
100
101 /**
102 * Destructor.
103 */
105 {
106 using namespace std;
107
108 {
109 unique_lock<mutex> lock(in);
110
111 stop = true;
112 }
113
114 cv.notify_all();
115
116 for (auto& worker : workers) {
117 worker.join();
118 }
119
120 // write remaining output
121
122 while (!output.empty()) {
123
124 writer.put(output.top());
125
126 output.pop();
127 }
128 }
129
130
131 /**
132 * Add data in queue.
133 *
134 * \param data data
135 */
136 void enqueue(input_type& data)
137 {
138 using namespace std;
139
140 {
141 unique_lock<mutex> lock(in);
142
143 cw.wait(lock, [this]() { return stop || input.size() < backlog; });
144
145 if (stop) {
146 throw runtime_error("The thread pool has been stopped.");
147 }
148
149 input.emplace(std::move(data));
150 }
151
152 cv.notify_one();
153
154 flush();
155 }
156
157
158 /**
159 * Flush output.
160 */
161 void flush()
162 {
163 using namespace std;
164
165 unique_lock<mutex> lock(out);
166
167 while (output.is_ready()) {
168
169 writer.put(output.top());
170
171 output.pop();
172 }
173 }
174
175 private:
176 /**
177 * Output data type.
178 */
179 struct output_type :
180 public JDAQEventHeader,
181 public JEvt
182 {
183 /**
184 * Default constructor.
185 */
187 {}
188
189
190 /**
191 * Constructor.
192 *
193 * \param id thread identifier
194 * \param header header
195 * \param out result values
196 */
197 output_type(const size_t id,
198 const JDAQEventHeader& header,
199 const JEvt& out) :
200 JDAQEventHeader(header),
201 JEvt(out),
202 id(id)
203 {}
204
205
206 /**
207 * Less-than operator for priority queue.
208 *
209 * \param first first event
210 * \param second second event
211 * \return true if first event later then second event
212 */
213 friend inline bool operator<(const output_type& first, const output_type& second)
214 {
216 }
217
218 size_t id;
219 };
220
221
222 /**
223 * Type definition of output queue.
224 */
225 typedef std::priority_queue<output_type, std::vector<output_type> > queue_type;
226
227
228 /**
229 * Auxiliary data structure to maintain time order of events for writing.
230 */
232 public queue_type
233 {
234 /**
235 * Get queue counter.
236 *
237 * \param id thread identifier
238 * \return counter
239 */
240 size_t& operator[](const size_t id)
241 {
242 return queue[id];
243 }
244
245
246 /**
247 * Push object in queue.
248 *
249 * \param object object
250 */
251 void push(output_type& object)
252 {
253 queue[object.id] += 1;
254
255 static_cast<queue_type&>(*this).emplace(std::move(object));
256 }
257
258
259 /**
260 * Pop first element from queue.
261 */
262 void pop()
263 {
264 const output_type& object = this->top();
265
266 queue[object.id] -= 1;
267
268 static_cast<queue_type&>(*this).pop();
269 }
270
271
272 /**
273 * Check readiness.
274 *
275 * \return true if number of entries in queue from each thread not equal to zero; else false
276 */
277 bool is_ready() const
278 {
279 for (const auto& i : queue) {
280 if (i.second == 0) {
281 return false;
282 }
283 }
284
285 return !queue.empty();
286 }
287
288 private:
289 std::map<size_t, size_t> queue; //!< number of entries in queue per thread
290
292
295 std::queue <input_type> input;
296 std::mutex in;
297 std::mutex out;
298 std::condition_variable cv;
299 std::condition_variable cw;
300 bool stop;
301 size_t backlog;
302 };
303}
304
305#endif
306
Data structure for set of track fit results.
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Thread pool for event-by-event reconstruction.
JMultiThreadedReconstruction(const JFit_t &fit, writer_type &writer, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
JRECONSTRUCTION::JMultiThreadedReconstruction::queue_type_t output
std::priority_queue< output_type, std::vector< output_type > > queue_type
Type definition of output queue.
const JDAQEventHeader & getDAQEventHeader() const
Get DAQ event header.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
static const JDAQEvaluator getDAQValue
Function object for evaluation of DAQ objects.
output_type(const size_t id, const JDAQEventHeader &header, const JEvt &out)
Constructor.
friend bool operator<(const output_type &first, const output_type &second)
Less-than operator for priority queue.
Auxiliary data structure to maintain time order of events for writing.
std::map< size_t, size_t > queue
number of entries in queue per thread