1#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
2#define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__
31 template<
class JFit_t>
50 const size_t backlog = std::numeric_limits<size_t>::max()) :
58 for (
size_t id = 0;
id < ns; ++id) {
62 for (
size_t id = 0;
id < ns; ++id) {
64 thread worker([
this, fit,
id]() {
68 for (JFit_t f1(fit); ; ) {
71 unique_lock<mutex> lock(
in);
73 cv.wait(lock, [
this]() {
return stop || !
input.empty(); });
79 swap(data,
input.front());
86 output_type evt(
id, data.getDAQEventHeader(), f1(data));
89 unique_lock<mutex> lock(
out);
96 workers.emplace_back(std::move(worker));
109 unique_lock<mutex> lock(
in);
141 unique_lock<mutex> lock(
in);
146 throw runtime_error(
"The thread pool has been stopped.");
149 input.emplace(std::move(data));
163 unique_lock<mutex> lock(
out);
223 typedef std::priority_queue<output_type, std::vector<output_type> >
queue_type;
251 queue[
object.id] += 1;
253 static_cast<queue_type&
>(*this).emplace(std::move(
object));
264 queue[
object.id] -= 1;
277 for (
const auto& i :
queue) {
283 return !
queue.empty();
296 std::condition_variable
cv;
297 std::condition_variable
cw;
Data structure for set of track fit results.
Template interface of object output for single data type.
virtual bool put(const T &object)=0
Object output.
Thread pool for event-by-event reconstruction.
std::vector< std::thread > workers
std::condition_variable cw
JLANG::JObjectOutput< JEvt > writer_type
JMultiThreadedReconstruction(const JFit_t &fit, writer_type &writer, const size_t ns, const size_t backlog=std::numeric_limits< size_t >::max())
Constructor.
std::queue< input_type > input
JFit_t::input_type input_type
~JMultiThreadedReconstruction()
Destructor.
void enqueue(input_type &data)
Add data in queue.
JRECONSTRUCTION::JMultiThreadedReconstruction::queue_type_t output
std::priority_queue< output_type, std::vector< output_type > > queue_type
Type definition of output queue.
std::condition_variable cv
void flush()
Flush output.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
static const JDAQEvaluator getDAQValue
Function object for evaluation of DAQ objects.
output_type(const size_t id, const JDAQEventHeader &header, const JEvt &out)
Constructor.
output_type()
Default constructor.
friend bool operator<(const output_type &first, const output_type &second)
Less-than operator for priority queue.
Auxiliary data structure to maintain time order of events for writing.
std::map< size_t, size_t > queue
number of entries in queue per thread
size_t & operator[](const size_t id)
Get queue counter.
void pop()
Pop first element from queue.
void push(output_type &object)
Push object in queue.
bool is_ready() const
Check readiness.