Jpp  19.1.0-rc.1
the software that should make you happy
JSupernovaProcessor.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <iomanip>
3 #include <queue>
4 #include <string>
5 
6 #include "nlohmann/json.hpp"
7 
9 #include "JDAQ/JDAQEventIO.hh"
11 
12 #include "JDAQ/JDAQTags.hh"
13 
14 #include "JDetector/JDetector.hh"
16 
18 
19 #include "Jeep/JParser.hh"
20 #include "Jeep/JMessage.hh"
21 
23 
25 
26 #include "JSupport/JSupport.hh"
27 
28 #include "JSupernova.hh"
29 
30 #include "JSupernovaProcessor.hh"
31 
33 
34 /**
35  * \file
36  *
37  * Online supernova monitor
38  * \author mlincett,selhedri,iagoos
39  */
40 int main(int argc, char* argv[]) {
41  using namespace std;
42  using namespace JPP;
43  using namespace KM3NETDAQ;
44 
45  string controlhost;
46  string ligier;
47  string outputFileName;
48  bool testMode = false;
49  int nTestEntries;
50  JTag outputTag;
51  JTimeval timeout_us;
52  int numberOfTimeouts;
53  int debug;
54  int queueLength;
55  int windowLength;
56  int statPrintInterval_s;
57 
58  string detectorFile;
59  string dummyDetectorFile;
60  JROOTClassSelector selector;
61  string summaryFile;
62 
63  int TMax_ns;
64  int preTriggerThreshold;
65  JRange<int> M, M_BDT;
66  double TVeto_ns;
67  JDAQHitToTSelector totSelector_ns;
68 
69  try {
70 
71  JParser<> zap("Supernova realtime monitor");
72 
73  zap['H'] = make_field(controlhost, "CH server (input)") = "localhost";
74  zap['L'] = make_field(ligier, "Ligier server (output)") = "";
75  zap['T'] = make_field(outputTag, "Output tag for the json messages") = JTag("SNT_part");
76  zap['O'] = make_field(testMode, "Test mode if true (no output to Ligier)");
77  zap['N'] = make_field(nTestEntries, "Number of test entries to write") = 1;
78  zap['o'] = make_field(outputFileName, "Output json file name if run in test mode") = "";
79  zap['t'] = make_field(timeout_us) = 1e6;
80  zap['n'] = make_field(numberOfTimeouts) = 1e3;
81  zap['a'] = make_field(detectorFile);
82  zap['C'] = make_field(selector) = getROOTClassSelection<JDAQTimesliceTypes_t>();
83  zap['Q'] = make_field(queueLength, "number of timeslices of trigger queue") = 100;
84  zap['W'] = make_field(windowLength, "number of timeslices of trigger sliding window") = 5;
85  zap['T'] = make_field(TMax_ns, "coincidence time window [ns]") = 10;
86  zap['M'] = make_field(M, "multiplicity range for SN coincidences") = JRange<int>(6,10);
87  zap['S'] = make_field(preTriggerThreshold, "muon veto multiplicity threshold") = 4;
88  zap['V'] = make_field(TVeto_ns, "muon veto time interval") = 1000;
89  zap['s'] = make_field(summaryFile, "summary output file") = "";
90  zap['P'] = make_field(statPrintInterval_s, "statistics & file print interval [s]") = 30;
91  zap['d'] = make_field(debug) = 1;
92  zap['b'] = make_field(dummyDetectorFile) = "";
93  zap['B'] = make_field(M_BDT, "multiplicity range for SN coincidences (with BDT)") = JRange<int>(5,10);
94  zap['m'] = make_field(totSelector_ns) = JDAQHitToTSelector(4, 255);
95 
96  zap(argc, argv);
97 
98  }
99 
100  catch(const exception &error) {
101  FATAL(error.what() << endl);
102  }
103 
104  if (queueLength < windowLength) {
105  FATAL("Length of the trigger window must be smaller than the queue.");
106  }
107 
108 
110 
111  using namespace JSUPERNOVA;
112 
113  // -------------------------------
114  // load detector and build routers
115  // -------------------------------
116 
118  JDetector dummyDetector;
119 
120  try {
121  load(detectorFile, detector);
122  }
123  catch(const JException& error) {
124  FATAL(error);
125  }
126 
127  try {
128  load(dummyDetectorFile, dummyDetector);
129  }
130  catch(const JException& error) {
131  dummyDetector = JDetector();
132  }
133 
134  const JDAQHitRouter hitRouter(detector);
135 
136  const JModuleRouter& moduleRouter = hitRouter;
137 
138  const int DETID = detector.getID();
139 
140  const int detectorSize = detector.size();
141 
142  JSNFilterM F_M1(M, 1);
143 
144  // -------------------------------
145  // initialize processing queue
146  // -------------------------------
147 
148  typedef JTriggerSN trigger_type;
149 
150  typedef priority_queue<trigger_type, vector<trigger_type>, greater<trigger_type> > queue_type;
151 
152  typedef deque<trigger_type> window_type;
153 
154 
155 
156  typedef map<int, map<int, double> > rates_type; // (frameindex, DOMID) -> DOM rate [kHz]
157  typedef map<int, int> npmt_type ; // (frameindex) -> # active PMTs
158  typedef map<int, JVetoSet> veto_type ; // (frameindex) -> vetoes
159 
160  queue_type trgQueue;
161  window_type trgWindow;
162  rates_type rates;
163  npmt_type pmts;
164  veto_type veto;
165 
166  JTriggerSNStats stats(detectorSize);
167 
168  long int counter_live_ts = 0;
169  long int counter_lost_ts = 0;
170 
171  double frameTime_s = getFrameTime() / 1.0e9;
172 
173  // Initialize output file name and number of entries for testing
174  ofstream outputFile;
175  int nWrittenEntries = 0;
176  if (testMode){
177  outputFile.open(outputFileName);
178  }
179 
180  // -------------------------------
181  // main processing
182  // -------------------------------
183 
184  try {
185 
186  // setup input
187 
188  typedef JDAQTimesliceSN data_type;
189  typedef JDAQSummaryslice summary_type;
190  typedef JDAQEvent event_type;
191 
192  JControlHostObjectIterator<data_type> in(controlhost, timeout_us, true);
193 
194  // timeout for the asynchronous reading of summary and event data
195  // needs to be smaller than the timeslice duration
196  const double asyncTimeout_us = 1000.0;
197 
198  JControlHostObjectIterator<summary_type> sm(controlhost, asyncTimeout_us, true);
199  JControlHostObjectIterator<event_type> ev(controlhost, asyncTimeout_us, true);
200 
202 
203  if (!testMode && ligier != "") {
204  out.reset(new JControlHost(ligier));
205  out->MyId(argv[0]); // pid
206  }
207 
208  // setup state
209 
210  int RUN = 0;
211 
212  for (int i = 0; i != numberOfTimeouts; ) {
213 
214  if (in.hasNext()) {
215 
216  data_type* timeslice = in.next();
217 
218  DEBUG(timeslice->getDAQHeader() << endl);
219 
220  int timesliceSize = timeslice->size();
221 
222  // --------------------------------
223  // run number initialise and update
224  // --------------------------------
225 
226  const int r = timeslice->getRunNumber();
227 
228  if (r != RUN) {
229 
230  if (RUN != 0) {
231 
232  NOTICE("RUN CHANGE" << endl);
233 
234  while (trgQueue.size() > 0) { trgQueue.pop(); }
235 
236  trgWindow.clear();
237 
238  rates.clear();
239 
240  pmts.clear();
241 
242  veto.clear();
243 
244  }
245 
246  RUN = r;
247 
248  }
249 
250  // ------------------------------
251  // process pending summary slices
252  // ------------------------------
253 
254  while ( sm.hasNext() ) {
255 
256  JDAQSummaryslice* summary = sm.next();
257 
258  DEBUG("SUM " << summary->getDAQHeader() << endl);
259 
260  int frame_index = summary->getFrameIndex();
261 
262  for (JDAQSummaryslice::const_iterator summary_frame = summary->begin();
263  summary_frame != summary->end();
264  ++summary_frame) {
265 
266  int DOMID = summary_frame->getModuleID();
267 
268  for (int ipmt = 0 ; ipmt < NUMBER_OF_PMTS ; ipmt++) {
269  rates[frame_index][DOMID] += summary_frame->getRate(ipmt, 1.0/1000);
270  }
271 
272  pmts[frame_index] += summary_frame->countActiveChannels();
273  }
274 
275  }
276 
277  DEBUG("LOADING EVENTS" << endl);
278 
279  while ( ev.hasNext() ) {
280 
281  JDAQEvent* event = ev.next();
282 
283  DEBUG("EVT " << event->getDAQHeader() << endl);
284 
285  int frame_index = event->getFrameIndex();
286 
287  veto[frame_index].push_back(JVeto(*event, hitRouter));
288 
289  }
290 
291  // -----------------
292  // process timeslice
293  // -----------------
294 
295  JDataSN preTrigger(TMax_ns, preTriggerThreshold);
296 
297  preTrigger(timeslice, moduleRouter, totSelector_ns, dummyDetector);
298 
299  JTriggerSN trigger(TVeto_ns);
300 
301  trigger(preTrigger);
302 
303  trgQueue.push(trigger);
304 
305  //----------------
306  // compute trigger
307  //----------------
308 
309  vector<double> observables;
310  vector<int> multiplicities;
311 
312  if (trgQueue.size() >= (unsigned) queueLength) {
313 
314  while (trgWindow.size() <= (unsigned) windowLength) {
315 
316  trigger_type pending = trgQueue.top();
317 
318  if ( trgWindow.size() == 0 || pending > trgWindow.back() ) {
319 
320  trgWindow.push_back( pending );
321 
322  counter_live_ts++;
323 
324  } else {
325  // latecoming (out of order) timeslice
326  counter_lost_ts++;
327  }
328 
329  trgQueue.pop();
330 
331  }
332 
333  // build triggered modules
334 
335  int trg_cc_counts = 0;
336  int trg_cc_modules = 0;
337  set<int> cc_modules;
338 
339  int trg_ev_counts = 0;
340  int trg_ev_modules = 0;
341  set<int> ev_modules;
342 
343  // loop over the trigger window and count the triggers
344  for (int its = 0; its < windowLength; its++) {
345 
346  const int frame_index = trgWindow[its].frameIndex;
347 
348  JVetoSet vetoSet;
349  if (veto.count(frame_index)) {
350  vetoSet = veto.at(frame_index);
351  }
352 
353  JSNFilterMV F_MV(M, vetoSet);
354  JSNFilterMV F_MV_BDT(M_BDT, vetoSet);
355 
356  set<int> cc_vec = trgWindow[its].getModules(F_M1);
357  set<int> ev_vec = trgWindow[its].getModules(F_MV);
358 
359  cc_modules.insert(cc_vec.begin(), cc_vec.end());
360  ev_modules.insert(ev_vec.begin(), ev_vec.end());
361 
362  trg_cc_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_M1);
363  trg_ev_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_MV);
364 
365  // Write table of observables
366  for (auto &trg: trgWindow[its]){
367  auto sn_candidate = trg.getPeak();
368  if (F_MV_BDT(sn_candidate)){
369  multiplicities.push_back(sn_candidate.multiplicity);
370  observables.push_back(sn_candidate.total_ToT);
371  observables.push_back(sn_candidate.deltaT);
372  observables.push_back(sn_candidate.mean_dir_norm);
373  observables.push_back(sn_candidate.mean_dir_ctheta);
374  }
375  }
376  }
377 
378  trg_cc_modules = cc_modules.size();
379  trg_ev_modules = ev_modules.size();
380 
381  // trigger window slide of one element
382 
383  int currentFrame = trgWindow[0].frameIndex;
384  JDAQUTCExtended currentTime = trgWindow[0].timeUTC;
385 
386  trgWindow.pop_front();
387 
388  // calculate trigger
389 
390  ++stats[trg_cc_counts];
391 
392  // calculate active modules
393 
394  int activeModules = -1;
395  double detectorRate = 0.0;
396 
397  if (!rates.empty() &&
398  rates.count(currentFrame)) {
399 
400  activeModules = 0;
401 
402  for (map<int, double>::const_iterator p = rates.at(currentFrame).begin();
403  p != rates.at(currentFrame).end(); p++ ) {
404 
405  detectorRate += p->second;
406 
407  activeModules += (p->second > 0);
408 
409  }
410  } else {
411 
412  activeModules = timesliceSize;
413 
414  }
415 
416  // build summary message
417 
418  json jd;
419 
420  jd[detid_name] = DETID;
421  jd[active_doms_name] = activeModules;
422  jd[detector_rate_name] = int(detectorRate / 1000.0);
423  jd[run_number_name] = RUN;
424  jd[frame_index_name] = currentFrame;
425  jd[daq_time_name] = to_string(currentTime);
426  jd[trigger_name][cc_name][c_name] = trg_cc_counts;
427  jd[trigger_name][cc_name][m_name] = trg_cc_modules;
428  jd[trigger_name][ev_name][c_name] = trg_ev_counts;
429  jd[trigger_name][ev_name][m_name] = trg_ev_modules;
430  jd[active_pmts_name] = pmts[currentFrame];
431  jd[observables_name] = observables;
432  jd[multiplicities_name] = multiplicities;
433 
434  string msg = jd.dump();
435 
436  DEBUG(msg << endl);
437 
438  if (testMode){
439  if (nWrittenEntries < nTestEntries){
440  // write summary information to output file (first nTestEntries)
441  outputFile << msg << endl;
442  nWrittenEntries++;
443  } else {
444  // When test entries written, close output file and exit loop
445  outputFile.close();
446  break;
447  }
448  } else {
449  // send summary information to output ligier
450 
451  if (out != NULL && trg_ev_counts > 0) {
452  out->PutFullString(outputTag, msg);
453  }
454  }
455 
456  // print stats
457 
458  if ( (counter_live_ts % ((int)(statPrintInterval_s / frameTime_s)) == 0 ) ) {
459 
460  double livetime = counter_live_ts * frameTime_s;
461 
462  stats.setLiveTime(livetime);
463 
464  NOTICE(endl);
465  NOTICE(stats.toString());
466  NOTICE("=> discarded out-of-order timeslices = " << counter_lost_ts << endl);
467 
468  if (summaryFile != "") {
469  ofstream of(summaryFile.c_str());
470  of << stats.toSummaryFile();
471  of.close();
472  }
473 
474  }
475 
476  } else {
477  NOTICE("Filling trigger queue: " << trgQueue.size() << "/" << queueLength << '\r');
478  }
479 
480  } else {
481 
482  NOTICE("timeout " << setw(3) << i << endl);
483 
484  ++i;
485  }
486  }
487  }
488 
489  catch(const JSocketException& error) {
490  ERROR(error.what() << endl);
491  }
492 
493 }
Fixed parameters and ControlHost tags for KM3NeT DAQ.
string outputFile
Data structure for detector geometry and calibration.
General purpose messaging.
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62
#define ERROR(A)
Definition: JMessage.hh:66
#define NOTICE(A)
Definition: JMessage.hh:64
#define FATAL(A)
Definition: JMessage.hh:67
int debug
debug level
Definition: JSirene.cc:69
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2158
int main(int argc, char *argv[])
nlohmann::json json
static const std::string active_pmts_name
static const std::string cc_name
static const std::string daq_time_name
static const std::string detid_name
static const std::string c_name
static const std::string m_name
static const std::string run_number_name
static const std::string observables_name
static const std::string detector_rate_name
static const std::string ev_name
static const std::string trigger_name
static const std::string active_doms_name
static const std::string frame_index_name
static const std::string multiplicities_name
ROOT TTree parameter settings of various packages.
Simple wrapper around JModuleRouter class for direct addressing of PMT data in detector data structur...
Detector data structure.
Definition: JDetector.hh:96
Router for direct addressing of module data in detector data structure.
virtual const pointer_type & next() override
Get next element.
virtual bool hasNext() override
Check availability of next element.
General exception.
Definition: JException.hh:24
The template JSinglePointer class can be used to hold a pointer to an object.
Exception for socket.
Definition: JException.hh:468
virtual void reset() override
Reset pointer.
Definition: JStorage.hh:42
Auxiliary class for time values.
Definition: JTimeval.hh:29
Object iteration through ControlHost.
ControlHost class.
ControlHost tag.
Definition: JTag.hh:38
Utility class to parse command line options.
Definition: JParser.hh:1714
Auxiliary class to build the supernova trigger dataset.
Definition: JSupernova.hh:203
SN filter based on veto window.
Definition: JSupernova.hh:441
SN filter based on multiplicity selection optional suppression of multi-module coincidences WARNING: ...
Definition: JSupernova.hh:406
SN trigger statistics, the information is stored in the form of a count as a function of the trigger ...
Definition: JSupernova.hh:617
string toString()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:638
void setLiveTime(const double lt)
Definition: JSupernova.hh:630
string toSummaryFile()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:671
Auxiliary class to apply the supernova trigger to SN data.
Definition: JSupernova.hh:468
Auxiliary class to manage a set of vetoes.
Definition: JSupernova.hh:334
Auxiliary class to define a veto time window on a set of optical modules.
Definition: JSupernova.hh:148
int getFrameIndex() const
Get frame index.
const JDAQHeader & getDAQHeader() const
Get DAQ header.
Definition: JDAQHeader.hh:49
Data structure for UTC time.
void load(const std::string &file_name, JDetector &detector)
Load detector from input file.
@ debug_t
debug
Definition: JMessage.hh:29
std::string to_string(const T &value)
Convert value to string.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
std::vector< event_type > data_type
Definition: JPerth.cc:81
data_type r[M+1]
Definition: JPolint.hh:868
KM3NeT DAQ data structures and auxiliaries.
Definition: DataQueue.cc:39
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
void setDAQLongprint(const bool option)
Set DAQ print option.
Definition: JDAQPrint.hh:28
static const int NUMBER_OF_PMTS
Total number of PMTs in module.
Definition: JDAQ.hh:26
Definition: JSTDTypes.hh:14
Detector file.
Definition: JHead.hh:227
Normalisation of MUPAGE events.
Definition: JHead.hh:835
Data structure for measured coincidence rates of all pairs of PMTs in optical module.
Definition: JFitK40.hh:103
Auxiliary data structure to store data and fit in memory.
Definition: JPerth.cc:76
Auxiliary class to select ROOT class based on class name.
Auxiliary class to select DAQ hits based on time-over-treshold value.
Timeslice data structure for SN data.