1#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
2#define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
31 template<
class JFit_t>
50 const size_t backlog = std::numeric_limits<size_t>::max()) :
58 for (
size_t id = 0;
id < ns; ++id) {
62 for (
size_t id = 0;
id < ns; ++id) {
64 thread worker([
this, fit,
id]() {
68 for (JFit_t f1(fit); ; ) {
71 unique_lock<mutex> lock(
in);
73 cv.wait(lock, [
this]() {
return stop || !
input.empty(); });
79 swap(data,
input.front());
86 output_type evt(
id, data.getDAQEventHeader(), f1(data));
89 unique_lock<mutex> lock(
out);
96 workers.emplace_back(std::move(worker));
109 unique_lock<mutex> lock(
in);
141 unique_lock<mutex> lock(
in);
146 throw runtime_error(
"The thread pool has been stopped.");
149 input.emplace(std::move(data));
155 unique_lock<mutex> lock(
out);
216 typedef std::priority_queue<output_type, std::vector<output_type> >
queue_type;
244 queue[
object.id] += 1;
246 static_cast<queue_type&
>(*this).emplace(std::move(
object));
257 queue[
object.id] -= 1;
270 for (
const auto& i :
queue) {
276 return !
queue.empty();
289 std::condition_variable
cv;
290 std::condition_variable
cw;
Data structure for set of track fit results.
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Thread pool for event-by-event reconstruction.
std::vector< std::thread > workers
std::condition_variable cw
JLANG::JObjectOutput< JEvt > writer_type
JMultiThreadedReconstruction(const JFit_t &fit, writer_type &writer, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
std::queue< input_type > input
JFit_t::input_type input_type
~JMultiThreadedReconstruction()
Destructor.
void enqueue(input_type &data)
Add data in queue.
JRECONSTRUCTION::JMultiThreadedReconstruction::queue_type_t output
std::priority_queue< output_type, std::vector< output_type > > queue_type
Type definition of output queue.
std::condition_variable cv
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
static const JDAQEvaluator getDAQValue
Function object for evaluation of DAQ objects.
output_type(const size_t id, const JDAQEventHeader &header, const JEvt &out)
Constructor.
output_type()
Default constructor.
friend bool operator<(const output_type &first, const output_type &second)
Less-than operator for priority queue.
Auxiliary data structure to maintain time order of events for writing.
std::map< size_t, size_t > queue
number of entries in queue per thread
size_t & operator[](const size_t id)
Get queue counter.
void pop()
Pop first element from queue.
void push(output_type &object)
Push object in queue.
bool is_ready() const
Check readiness.