Jpp  19.1.0
the software that should make you happy
JSupernovaProcessor.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <iomanip>
3 #include <queue>
4 #include <string>
5 
6 #include "nlohmann/json.hpp"
7 
9 #include "JDAQ/JDAQEventIO.hh"
11 #include "JDAQ/JDAQTags.hh"
12 #include "JDAQ/JDAQEvaluator.hh"
13 
14 #include "JDetector/JDetector.hh"
16 
18 #include "JNet/JControlHost.hh"
19 
20 #include "JLang/JObjectIterator.hh"
21 
22 #include "Jeep/JParser.hh"
23 #include "Jeep/JMessage.hh"
24 #include "Jeep/JProperties.hh"
25 
27 
29 
30 #include "JSupport/JSupport.hh"
31 #include "JSupport/JTreeScanner.hh"
32 
33 #include "JSupernova.hh"
34 
35 #include "JSupernovaProcessor.hh"
36 
38 
39 /**
40  * \file
41  *
42  * Online supernova processor
43  * \author mlincett,selhedri,iagoos,gvannoye
44  */
45 int main(int argc, char* argv[]) {
46  using namespace std;
47  using namespace JPP;
48  using namespace KM3NETDAQ;
49 
50  string inputLigier;
51  string inputFileName;
52  string outputLigier;
53  JTag outputTag;
54  string outputFileName;
55  int nEntries;
56  int debug;
57 
58  string detectorFile;
59  string domDetectorFile;
60 
61  JTimeval timeout_us = JTimeval(1e6); // timeout for input [us]
62  int numberOfTimeouts = 1e3; // number of timeouts before stop
63  int queueLength = 100; // number of timeslices of trigger queue
64  int windowLength = 5; // number of timeslices of trigger sliding window
65 
66  int TMax_ns = 10; // coincidence time window [ns]
67  int preTriggerThreshold = 4; // muon veto multiplicity threshold
68  JDAQHitToTSelector totSelector_ns = JDAQHitToTSelector(4, 255); // hit selector
69  double TVeto_ns = 1000; // muon veto time interval [ns]
70  JRange<int> M = JRange<int>(6, 10); // multiplicity range for SN coincidence level
71  JRange<int> M_BDT = JRange<int>(6, 10); // multiplicity range for SN coincidence level (with BDT)
72 
73  string summaryFile = ""; // summary output file
74  int statPrintInterval_s = 30; // statistics & file print interval [s]
75 
76  try {
77 
78  JProperties properties;
79  string properties_description = "\n";
80 
81  properties.setEndOfLine(";");
82 
83  properties.insert(gmake_property(timeout_us));
84  properties_description.append("\ttimeout_us: timeout for input [us]\n");
85  properties.insert(gmake_property(numberOfTimeouts));
86  properties_description.append("\tnumberOfTimeouts: number of timeouts before stop\n");
87  properties.insert(gmake_property(queueLength));
88  properties_description.append("\tqueueLength: number of timeslices of trigger queue\n");
89  properties.insert(gmake_property(windowLength));
90  properties_description.append("\twindowLength: number of timeslices of trigger sliding window\n");
91 
92  properties.insert(gmake_property(TMax_ns));
93  properties_description.append("\tTMax_ns: coincidence time window [ns]\n");
94  properties.insert(gmake_property(preTriggerThreshold));
95  properties_description.append("\tpreTriggerThreshold: muon veto multiplicity threshold\n");
96  properties.insert(gmake_property(totSelector_ns));
97  properties_description.append("\ttotSelector_ns: hit selector\n");
98  properties.insert(gmake_property(TVeto_ns));
99  properties_description.append("\tTVeto_ns: muon veto time interval [ns]\n");
100  properties.insert(gmake_property(M));
101  properties_description.append("\tM: multiplicity range for SN coincidence level\n");
102  properties.insert(gmake_property(M_BDT));
103  properties_description.append("\tM_BDT: multiplicity range for SN coincidence level (with BDT)\n");
104 
105  properties.insert(gmake_property(summaryFile));
106  properties_description.append("\tsummaryFile: summary output file\n");
107  properties.insert(gmake_property(statPrintInterval_s));
108  properties_description.append("\tstatPrintInterval_s: statistics & file print interval [s]");
109 
110  JParser<> zap("Supernova realtime processor");
111 
112  zap['H'] = make_field(inputLigier, "Input Ligier server") = "";
113  zap['f'] = make_field(inputFileName) = "";
114  zap['L'] = make_field(outputLigier, "Output Ligier server") = "";
115  zap['T'] = make_field(outputTag, "Output tag for the json messages") = JTag();
116  zap['o'] = make_field(outputFileName, "Output json file name (no output to Ligier)") = "";
117  zap['n'] = make_field(nEntries, "Number of entries to write") = -1;
118  zap['a'] = make_field(detectorFile);
119  zap['b'] = make_field(domDetectorFile) = "";
120  zap['d'] = make_field(debug) = 1;
121  zap['@'] = make_field(properties, properties_description) = JPARSER::initialised();
122 
123  zap(argc, argv);
124 
125  }
126 
127  catch(const exception &error) {
128  FATAL(error.what() << endl);
129  }
130 
131  if (queueLength <= windowLength) {
132  FATAL("Length of the trigger window must be smaller than the queue.");
133  }
134 
135 
137 
138  using namespace JSUPERNOVA;
139 
140  // -------------------------------
141  // load detector and build routers
142  // -------------------------------
143 
145  JDetector domDetector;
146 
147  try {
148  load(detectorFile, detector);
149  }
150  catch(const JException& error) {
151  FATAL(error);
152  }
153 
154  try {
155  load(domDetectorFile, domDetector);
156  }
157  catch(const JException& error) {
158  domDetector = JDetector();
159  }
160 
161  const JDAQHitRouter hitRouter(detector);
162 
163  const JModuleRouter& moduleRouter = hitRouter;
164 
165  const int DETID = detector.getID();
166 
167  const int detectorSize = detector.size();
168 
169  JSNFilterM F_M1(M, 1);
170 
171  // -------------------------------
172  // initialize processing queue
173  // -------------------------------
174 
175  typedef JTriggerSN trigger_type;
176 
177  typedef priority_queue<trigger_type, vector<trigger_type>, greater<trigger_type> > queue_type;
178 
179  typedef deque<trigger_type> window_type;
180 
181 
182 
183  typedef map<int, map<int, double> > rates_type; // (frameindex, DOMID) -> DOM rate [kHz]
184  typedef map<int, int> npmt_type ; // (frameindex) -> # active PMTs
185  typedef map<int, JVetoSet> veto_type ; // (frameindex) -> vetoes
186 
187  queue_type trgQueue;
188  window_type trgWindow;
189  rates_type rates;
190  npmt_type pmts;
191  veto_type veto;
192 
193  JTriggerSNStats stats(detectorSize);
194 
195  long int counter_live_ts = 0;
196  long int counter_lost_ts = 0;
197 
198  double frameTime_s = getFrameTime() / 1.0e9;
199 
200  // Initialize output file name and number of entries for testing
201  ofstream outputFile;
202  int nWrittenEntries = 0;
203  if (outputFileName != ""){
204  outputFile.open(outputFileName);
205  }
206 
207  // -------------------------------
208  // main processing
209  // -------------------------------
210 
211  try {
212 
213  // setup input
214 
215  typedef JDAQTimesliceSN data_type;
216  typedef JDAQSummaryslice summary_type;
217  typedef JDAQEvent event_type;
218 
219  JObjectIterator<data_type>* in = NULL;
221  JObjectIterator<event_type>* ev = NULL;
222 
223  if (inputFileName != "") {
224  in = new JTreeScanner<data_type, JDAQEvaluator>(inputFileName);
225  sm = new JTreeScanner<summary_type, JDAQEvaluator>(inputFileName);
226  ev = new JTreeScanner<event_type, JDAQEvaluator>(inputFileName);
227 
228  }
229  else if (inputLigier != "") {
230  in = new JControlHostObjectIterator<data_type>(inputLigier, timeout_us, true);
231 
232  // timeout for the asynchronous reading of summary and event data
233  // needs to be smaller than the timeslice duration
234  const double asyncTimeout_us = 1000.0;
235 
236  sm = new JControlHostObjectIterator<summary_type>(inputLigier, asyncTimeout_us, true);
237  ev = new JControlHostObjectIterator<event_type>(inputLigier, asyncTimeout_us, true);
238  }
239  else { FATAL("Need either a root file or a ligier as input!" << endl); }
240 
241  JControlHost* out = NULL;
242  if (outputLigier != "") {
243  out = new JControlHost(outputLigier);
244  }
245 
246  ofstream outputFile;
247  if (outputFileName != "") {
248  outputFile.open(outputFileName);
249  }
250 
251  // setup state
252 
253  int RUN = 0;
254  int timesliceSize = 0;
255 
256  for (int i = 0; i != numberOfTimeouts; ) {
257 
258  if (in->hasNext()) {
259 
260  data_type* timeslice = in->next();
261 
262  DEBUG(timeslice->getDAQHeader() << endl);
263 
264  timesliceSize = timeslice->size();
265 
266  // --------------------------------
267  // run number initialise and update
268  // --------------------------------
269 
270  const int r = timeslice->getRunNumber();
271 
272  if (r != RUN) {
273 
274  if (RUN != 0) {
275 
276  NOTICE("RUN CHANGE" << endl);
277 
278  while (trgQueue.size() > 0) { trgQueue.pop(); }
279 
280  trgWindow.clear();
281 
282  rates.clear();
283 
284  pmts.clear();
285 
286  veto.clear();
287 
288  }
289 
290  RUN = r;
291 
292  }
293 
294  // ------------------------------
295  // process pending summary slices
296  // ------------------------------
297 
298  while ( sm->hasNext() ) {
299 
300  JDAQSummaryslice* summary = sm->next();
301 
302  DEBUG("SUM " << summary->getDAQHeader() << endl);
303 
304  int frame_index = summary->getFrameIndex();
305 
306  for (JDAQSummaryslice::const_iterator summary_frame = summary->begin();
307  summary_frame != summary->end();
308  ++summary_frame) {
309 
310  int DOMID = summary_frame->getModuleID();
311 
312  for (int ipmt = 0 ; ipmt < NUMBER_OF_PMTS ; ipmt++) {
313  rates[frame_index][DOMID] += summary_frame->getRate(ipmt, 1.0/1000);
314  }
315 
316  pmts[frame_index] += summary_frame->countActiveChannels();
317  }
318 
319  }
320 
321  while ( ev->hasNext() ) {
322 
323  JDAQEvent* event = ev->next();
324 
325  DEBUG("EVT " << event->getDAQHeader() << endl);
326 
327  int frame_index = event->getFrameIndex();
328 
329  veto[frame_index].push_back(JVeto(*event, hitRouter));
330 
331  }
332 
333  // -----------------
334  // process timeslice
335  // -----------------
336 
337  JDataSN preTrigger(TMax_ns, preTriggerThreshold);
338 
339  preTrigger(timeslice, moduleRouter, totSelector_ns, domDetector);
340 
341  JTriggerSN trigger(TVeto_ns);
342 
343  trigger(preTrigger);
344 
345  trgQueue.push(trigger);
346 
347  //----------------
348  // compute trigger
349  //----------------
350 
351  if ( trgQueue.size() >= (unsigned) queueLength ) {
352 
353  while ( trgWindow.size() <= (unsigned) windowLength ) {
354 
355  trigger_type pending = trgQueue.top();
356 
357  if ( trgWindow.size() == 0 || pending > trgWindow.back() ) {
358 
359  trgWindow.push_back( pending );
360 
361  counter_live_ts++;
362 
363  }
364  else {
365  // latecoming (out of order) timeslice
366  counter_lost_ts++;
367  }
368 
369  trgQueue.pop();
370 
371  }
372  }
373  else {
374  NOTICE("Filling trigger queue: " << trgQueue.size() << "/" << queueLength << '\r');
375  }
376  }
377 
378  else if ( inputFileName != "" ) {
379  // if we are reading from a file and we don't have timeslices anymore, we are at the end of the file
380  // we can push the content of the queue to the trigger window to process it
381  if ( trgQueue.size() > 0 ) {
382  while ( trgQueue.size() > 0 ) {
383  trgWindow.push_back(trgQueue.top());
384  trgQueue.pop();
385  }
386  }
387  else if ( trgWindow.size() < (unsigned) windowLength ) {
388  i = numberOfTimeouts;
389  }
390  }
391  else {
392  NOTICE("timeout " << setw(3) << i << endl);
393 
394  ++i;
395  }
396 
397  if (trgWindow.size() >= (unsigned) windowLength) {
398 
399  // build triggered modules
400 
401  vector<double> observables;
402  vector<int> multiplicities;
403 
404  int trg_cc_counts = 0;
405  int trg_cc_modules = 0;
406  set<int> cc_modules;
407 
408  int trg_ev_counts = 0;
409  int trg_ev_modules = 0;
410  set<int> ev_modules;
411 
412  // loop over the trigger window and count the triggers
413  for (int its = 0; its < windowLength; its++) {
414 
415  const int frame_index = trgWindow[its].frameIndex;
416 
417  JVetoSet vetoSet;
418  if (veto.count(frame_index)) {
419  vetoSet = veto.at(frame_index);
420  }
421 
422  JSNFilterMV F_MV(M, vetoSet);
423  JSNFilterMV F_MV_BDT(M_BDT, vetoSet);
424 
425  set<int> cc_vec = trgWindow[its].getModules(F_M1);
426  set<int> ev_vec = trgWindow[its].getModules(F_MV);
427 
428  cc_modules.insert(cc_vec.begin(), cc_vec.end());
429  ev_modules.insert(ev_vec.begin(), ev_vec.end());
430 
431  trg_cc_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_M1);
432  trg_ev_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_MV);
433 
434  // Write table of observables
435  for (auto &trg: trgWindow[its]){
436  auto sn_candidate = trg.getPeak();
437  if (F_MV_BDT(sn_candidate)){
438  multiplicities.push_back(sn_candidate.multiplicity);
439  observables.push_back(sn_candidate.total_ToT);
440  observables.push_back(sn_candidate.deltaT);
441  observables.push_back(sn_candidate.mean_dir_norm);
442  observables.push_back(sn_candidate.mean_dir_ctheta);
443  }
444  }
445  }
446 
447  trg_cc_modules = cc_modules.size();
448  trg_ev_modules = ev_modules.size();
449 
450  // trigger window slide of one element
451 
452  int currentFrame = trgWindow[0].frameIndex;
453  JDAQUTCExtended currentTime = trgWindow[0].timeUTC;
454 
455  trgWindow.pop_front();
456 
457  // calculate trigger
458 
459  ++stats[trg_cc_counts];
460 
461  // calculate active modules
462 
463  int activeModules = -1;
464  double detectorRate = 0.0;
465 
466  if (!rates.empty() &&
467  rates.count(currentFrame)) {
468 
469  activeModules = 0;
470 
471  for (map<int, double>::const_iterator p = rates.at(currentFrame).begin();
472  p != rates.at(currentFrame).end(); p++ ) {
473 
474  detectorRate += p->second;
475 
476  activeModules += (p->second > 0);
477 
478  }
479  } else {
480 
481  activeModules = timesliceSize;
482 
483  }
484 
485  // build summary message
486 
487  json jd;
488 
489  jd[detid_name] = DETID;
490  jd[active_doms_name] = activeModules;
491  jd[detector_rate_name] = int(detectorRate / 1000.0);
492  jd[run_number_name] = RUN;
493  jd[frame_index_name] = currentFrame;
494  jd[daq_time_name] = to_string(currentTime);
495  jd[trigger_name][cc_name][c_name] = trg_cc_counts;
496  jd[trigger_name][cc_name][m_name] = trg_cc_modules;
497  jd[trigger_name][ev_name][c_name] = trg_ev_counts;
498  jd[trigger_name][ev_name][m_name] = trg_ev_modules;
499  jd[active_pmts_name] = pmts[currentFrame];
500  jd[observables_name] = observables;
501  jd[multiplicities_name] = multiplicities;
502 
503  string msg = jd.dump();
504 
505  DEBUG(msg << endl);
506 
507  if (outputFileName != ""){
508  if (nWrittenEntries < nEntries || nEntries == -1) {
509  // write summary information to output file (first nEntries)
510  outputFile << msg << endl;
511  nWrittenEntries++;
512  }
513  else {
514  // When entries written, close output file and exit loop
515  outputFile.close();
516  break;
517  }
518  }
519  else {
520  // send summary information to output ligier
521 
522  if (out != NULL) {
523  out->PutFullString(outputTag, msg);
524  }
525  }
526 
527  // print stats
528 
529  if ( (counter_live_ts % ((int)(statPrintInterval_s / frameTime_s)) == 0 ) ) {
530 
531  double livetime = counter_live_ts * frameTime_s;
532 
533  stats.setLiveTime(livetime);
534 
535  NOTICE(endl);
536  NOTICE(stats.toString());
537  NOTICE("=> discarded out-of-order timeslices = " << counter_lost_ts << endl);
538 
539  if (summaryFile != "") {
540  ofstream of(summaryFile.c_str());
541  of << stats.toSummaryFile();
542  of.close();
543  }
544 
545  }
546  }
547  }
548  }
549 
550  catch(const JSocketException& error) {
551  ERROR(error.what() << endl);
552  }
553 
554  if (outputFileName != "") { outputFile.close(); }
555 
556 }
Fixed parameters and ControlHost tags for KM3NeT DAQ.
string outputFile
Data structure for detector geometry and calibration.
General purpose messaging.
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62
#define ERROR(A)
Definition: JMessage.hh:66
#define NOTICE(A)
Definition: JMessage.hh:64
#define FATAL(A)
Definition: JMessage.hh:67
int debug
debug level
Definition: JSirene.cc:69
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2142
Utility class to parse parameter values.
#define gmake_property(A)
macros to convert (template) parameter to JPropertiesElement object
int main(int argc, char *argv[])
nlohmann::json json
static const std::string active_pmts_name
static const std::string cc_name
static const std::string daq_time_name
static const std::string detid_name
static const std::string c_name
static const std::string m_name
static const std::string run_number_name
static const std::string observables_name
static const std::string detector_rate_name
static const std::string ev_name
static const std::string trigger_name
static const std::string active_doms_name
static const std::string frame_index_name
static const std::string multiplicities_name
ROOT TTree parameter settings of various packages.
Simple wrapper around JModuleRouter class for direct addressing of PMT data in detector data structur...
Detector data structure.
Definition: JDetector.hh:96
Router for direct addressing of module data in detector data structure.
Utility class to parse parameter values.
Definition: JProperties.hh:501
void setEndOfLine(const std::string &eol)
Set end of line characters.
General exception.
Definition: JException.hh:24
Interface of object iteration for a single data type.
virtual const pointer_type & next()=0
Get next element.
virtual bool hasNext()=0
Check availability of next element.
Exception for socket.
Definition: JException.hh:468
Auxiliary class for time values.
Definition: JTimeval.hh:29
Object iteration through ControlHost.
ControlHost class.
int PutFullString(const JTag &tag, const std::string &buffer)
Send string.
ControlHost tag.
Definition: JTag.hh:38
Utility class to parse command line options.
Definition: JParser.hh:1698
Auxiliary class to build the supernova trigger dataset.
Definition: JSupernova.hh:203
SN filter based on veto window.
Definition: JSupernova.hh:441
SN filter based on multiplicity selection optional suppression of multi-module coincidences WARNING: ...
Definition: JSupernova.hh:406
SN trigger statistics, the information is stored in the form of a count as a function of the trigger ...
Definition: JSupernova.hh:617
string toString()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:638
void setLiveTime(const double lt)
Definition: JSupernova.hh:630
string toSummaryFile()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:671
Auxiliary class to apply the supernova trigger to SN data.
Definition: JSupernova.hh:468
Auxiliary class to manage a set of vetoes.
Definition: JSupernova.hh:334
Auxiliary class to define a veto time window on a set of optical modules.
Definition: JSupernova.hh:148
Template definition for direct access of elements in ROOT TChain.
int getFrameIndex() const
Get frame index.
const JDAQHeader & getDAQHeader() const
Get DAQ header.
Definition: JDAQHeader.hh:49
Data structure for UTC time.
void load(const std::string &file_name, JDetector &detector)
Load detector from input file.
@ debug_t
debug
Definition: JMessage.hh:29
std::string to_string(const T &value)
Convert value to string.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
std::vector< event_type > data_type
Definition: JPerth.cc:82
data_type r[M+1]
Definition: JPolint.hh:868
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
void setDAQLongprint(const bool option)
Set DAQ print option.
Definition: JDAQPrint.hh:28
static const int NUMBER_OF_PMTS
Total number of PMTs in module.
Definition: JDAQ.hh:26
Definition: JSTDTypes.hh:14
Detector file.
Definition: JHead.hh:227
Normalisation of MUPAGE events.
Definition: JHead.hh:835
Data structure for measured coincidence rates of all pairs of PMTs in optical module.
Definition: JFitK40.hh:103
Empty structure for specification of parser element that is initialised (i.e. does not require input)...
Definition: JParser.hh:68
Auxiliary data structure to store data and fit in memory.
Definition: JPerth.cc:77
Auxiliary class to select DAQ hits based on time-over-treshold value.
Timeslice data structure for SN data.