Jpp  18.6.0-rc.1
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JSupernovaProcessor.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <iomanip>
3 #include <queue>
4 #include <string>
5 
6 #include "nlohmann/json.hpp"
7 
9 #include "JDAQ/JDAQEventIO.hh"
11 
12 #include "JDAQ/JDAQTags.hh"
13 
14 #include "JDetector/JDetector.hh"
16 
18 
19 #include "Jeep/JParser.hh"
20 #include "Jeep/JMessage.hh"
21 
23 
25 
26 #include "JSupport/JSupport.hh"
27 
28 #include "JSupernova.hh"
29 
30 #include "JSupernovaProcessor.hh"
31 
33 
34 /**
35  * \file
36  *
37  * Online supernova monitor
38  * \author mlincett,selhedri,iagoos
39  */
40 int main(int argc, char* argv[]) {
41  using namespace std;
42  using namespace JPP;
43  using namespace KM3NETDAQ;
44 
45  string controlhost;
46  string ligier;
47  string outputFileName;
48  bool testMode = false;
49  int nTestEntries;
50  JTag outputTag;
51  JTimeval timeout_us;
52  int numberOfTimeouts;
53  int debug;
54  int queueLength;
55  int windowLength;
56  int statPrintInterval_s;
57 
58  string detectorFile;
59  string dummyDetectorFile;
60  JROOTClassSelector selector;
61  string summaryFile;
62 
63  int TMax_ns;
64  int preTriggerThreshold;
65  JRange<int> M, M_BDT;
66  double TVeto_ns;
67  JDAQHitToTSelector totSelector_ns;
68 
69  try {
70 
71  JParser<> zap("Supernova realtime monitor");
72 
73  zap['H'] = make_field(controlhost, "CH server (input)") = "localhost";
74  zap['L'] = make_field(ligier, "Ligier server (output)") = "";
75  zap['T'] = make_field(outputTag, "Output tag for the json messages") = JTag("SNT_part");
76  zap['O'] = make_field(testMode, "Test mode if true (no output to Ligier)");
77  zap['N'] = make_field(nTestEntries, "Number of test entries to write") = 1;
78  zap['o'] = make_field(outputFileName, "Output json file name if run in test mode") = "";
79  zap['t'] = make_field(timeout_us) = 1e6;
80  zap['n'] = make_field(numberOfTimeouts) = 1e3;
81  zap['a'] = make_field(detectorFile);
82  zap['C'] = make_field(selector) = getROOTClassSelection<JDAQTimesliceTypes_t>();
83  zap['Q'] = make_field(queueLength, "number of timeslices of trigger queue") = 100;
84  zap['W'] = make_field(windowLength, "number of timeslices of trigger sliding window") = 5;
85  zap['T'] = make_field(TMax_ns, "coincidence time window [ns]") = 10;
86  zap['M'] = make_field(M, "multiplicity range for SN coincidences") = JRange<int>(6,10);
87  zap['S'] = make_field(preTriggerThreshold, "muon veto multiplicity threshold") = 4;
88  zap['V'] = make_field(TVeto_ns, "muon veto time interval") = 1000;
89  zap['s'] = make_field(summaryFile, "summary output file") = "";
90  zap['P'] = make_field(statPrintInterval_s, "statistics & file print interval [s]") = 30;
91  zap['d'] = make_field(debug) = 1;
92  zap['b'] = make_field(dummyDetectorFile) = "";
93  zap['B'] = make_field(M_BDT, "multiplicity range for SN coincidences (with BDT)") = JRange<int>(5,10);
94  zap['m'] = make_field(totSelector_ns) = JDAQHitToTSelector(4, 255);
95 
96  zap(argc, argv);
97 
98  }
99 
100  catch(const exception &error) {
101  FATAL(error.what() << endl);
102  }
103 
104  if (queueLength < windowLength) {
105  FATAL("Length of the trigger window must be smaller than the queue.");
106  }
107 
108 
110 
111  using namespace JSUPERNOVA;
112 
113  // -------------------------------
114  // load detector and build routers
115  // -------------------------------
116 
118  JDetector dummyDetector;
119 
120  try {
121  load(detectorFile, detector);
122  }
123  catch(const JException& error) {
124  FATAL(error);
125  }
126 
127  try {
128  load(dummyDetectorFile, dummyDetector);
129  }
130  catch(const JException& error) {
131  dummyDetector = JDetector();
132  }
133 
134  const JDAQHitRouter hitRouter(detector);
135 
136  const JModuleRouter& moduleRouter = hitRouter;
137 
138  const int DETID = detector.getID();
139 
140  const int detectorSize = detector.size();
141 
142  JSNFilterM F_M1(M, 1);
143 
144  // -------------------------------
145  // initialize processing queue
146  // -------------------------------
147 
148  typedef JTriggerSN trigger_type;
149 
150  typedef priority_queue<trigger_type, vector<trigger_type>, greater<trigger_type> > queue_type;
151 
152  typedef deque<trigger_type> window_type;
153 
154 
155 
156  typedef map<int, map<int, double> > rates_type; // (frameindex, DOMID) -> DOM rate [kHz]
157  typedef map<int, int> npmt_type ; // (frameindex) -> # active PMTs
158  typedef map<int, JVetoSet> veto_type ; // (frameindex) -> vetoes
159 
160  queue_type trgQueue;
161  window_type trgWindow;
162  rates_type rates;
163  npmt_type pmts;
164  veto_type veto;
165 
166  JTriggerSNStats stats(detectorSize);
167 
168  long int counter_live_ts = 0;
169  long int counter_lost_ts = 0;
170 
171  double frameTime_s = getFrameTime() / 1.0e9;
172 
173  // Initialize output file name and number of entries for testing
174  ofstream outputFile;
175  int nWrittenEntries = 0;
176  if (testMode){
177  outputFile.open(outputFileName);
178  }
179 
180  // -------------------------------
181  // main processing
182  // -------------------------------
183 
184  try {
185 
186  // setup input
187 
188  typedef JDAQTimesliceSN data_type;
189  typedef JDAQSummaryslice summary_type;
190  typedef JDAQEvent event_type;
191 
192  JControlHostObjectIterator<data_type> in(controlhost, timeout_us, true);
193 
194  // timeout for the asynchronous reading of summary and event data
195  // needs to be smaller than the timeslice duration
196  const double asyncTimeout_us = 1000.0;
197 
198  JControlHostObjectIterator<summary_type> sm(controlhost, asyncTimeout_us, true);
199  JControlHostObjectIterator<event_type> ev(controlhost, asyncTimeout_us, true);
200 
202 
203  if (!testMode && ligier != "") {
204  out.reset(new JControlHost(ligier));
205  out->MyId(argv[0]); // pid
206  }
207 
208  // setup state
209 
210  int RUN = 0;
211 
212  for (int i = 0; i != numberOfTimeouts; ) {
213 
214  if (in.hasNext()) {
215 
216  data_type* timeslice = in.next();
217 
218  DEBUG(timeslice->getDAQHeader() << endl);
219 
220  int timesliceSize = timeslice->size();
221 
222  // --------------------------------
223  // run number initialise and update
224  // --------------------------------
225 
226  const int r = timeslice->getRunNumber();
227 
228  if (r != RUN) {
229 
230  if (RUN != 0) {
231 
232  NOTICE("RUN CHANGE" << endl);
233 
234  while (trgQueue.size() > 0) { trgQueue.pop(); }
235 
236  trgWindow.clear();
237 
238  rates.clear();
239 
240  pmts.clear();
241 
242  veto.clear();
243 
244  }
245 
246  RUN = r;
247 
248  }
249 
250  // ------------------------------
251  // process pending summary slices
252  // ------------------------------
253 
254  while ( sm.hasNext() ) {
255 
256  JDAQSummaryslice* summary = sm.next();
257 
258  DEBUG("SUM " << summary->getDAQHeader() << endl);
259 
260  int frame_index = summary->getFrameIndex();
261 
262  for (JDAQSummaryslice::const_iterator summary_frame = summary->begin();
263  summary_frame != summary->end();
264  ++summary_frame) {
265 
266  int DOMID = summary_frame->getModuleID();
267 
268  for (int ipmt = 0 ; ipmt < NUMBER_OF_PMTS ; ipmt++) {
269  rates[frame_index][DOMID] += summary_frame->getRate(ipmt, 1.0/1000);
270  }
271 
272  pmts[frame_index] += summary_frame->countActiveChannels();
273  }
274 
275  }
276 
277  DEBUG("LOADING EVENTS" << endl);
278 
279  while ( ev.hasNext() ) {
280 
281  JDAQEvent* event = ev.next();
282 
283  DEBUG("EVT " << event->getDAQHeader() << endl);
284 
285  int frame_index = event->getFrameIndex();
286 
287  veto[frame_index].push_back(JVeto(*event, hitRouter));
288 
289  }
290 
291  // -----------------
292  // process timeslice
293  // -----------------
294 
295  JDataSN preTrigger(TMax_ns, preTriggerThreshold);
296 
297  preTrigger(timeslice, moduleRouter, totSelector_ns, dummyDetector);
298 
299  JTriggerSN trigger(TVeto_ns);
300 
301  trigger(preTrigger);
302 
303  trgQueue.push(trigger);
304 
305  //----------------
306  // compute trigger
307  //----------------
308 
309  vector<double> observables;
310  vector<int> multiplicities;
311 
312  if (trgQueue.size() >= (unsigned) queueLength) {
313 
314  while (trgWindow.size() <= (unsigned) windowLength) {
315 
316  trigger_type pending = trgQueue.top();
317 
318  if ( trgWindow.size() == 0 || pending > trgWindow.back() ) {
319 
320  trgWindow.push_back( pending );
321 
322  counter_live_ts++;
323 
324  } else {
325  // latecoming (out of order) timeslice
326  counter_lost_ts++;
327  }
328 
329  trgQueue.pop();
330 
331  }
332 
333  // build triggered modules
334 
335  int trg_cc_counts = 0;
336  int trg_cc_modules = 0;
337  set<int> cc_modules;
338 
339  int trg_ev_counts = 0;
340  int trg_ev_modules = 0;
341  set<int> ev_modules;
342 
343  // loop over the trigger window and count the triggers
344  for (int its = 0; its < windowLength; its++) {
345 
346  const int frame_index = trgWindow[its].frameIndex;
347 
348  JVetoSet vetoSet;
349  if (veto.count(frame_index)) {
350  vetoSet = veto.at(frame_index);
351  }
352 
353  JSNFilterMV F_MV(M, vetoSet);
354  JSNFilterMV F_MV_BDT(M_BDT, vetoSet);
355 
356  set<int> cc_vec = trgWindow[its].getModules(F_M1);
357  set<int> ev_vec = trgWindow[its].getModules(F_MV);
358 
359  cc_modules.insert(cc_vec.begin(), cc_vec.end());
360  ev_modules.insert(ev_vec.begin(), ev_vec.end());
361 
362  trg_cc_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_M1);
363  trg_ev_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_MV);
364 
365  // Write table of observables
366  for (auto &trg: trgWindow[its]){
367  auto sn_candidate = trg.getPeak();
368  if (F_MV_BDT(sn_candidate)){
369  multiplicities.push_back(sn_candidate.multiplicity);
370  observables.push_back(sn_candidate.total_ToT);
371  observables.push_back(sn_candidate.deltaT);
372  observables.push_back(sn_candidate.mean_dir_norm);
373  observables.push_back(sn_candidate.mean_dir_ctheta);
374  }
375  }
376  }
377 
378  trg_cc_modules = cc_modules.size();
379  trg_ev_modules = ev_modules.size();
380 
381  // trigger window slide of one element
382 
383  int currentFrame = trgWindow[0].frameIndex;
384  JDAQUTCExtended currentTime = trgWindow[0].timeUTC;
385 
386  trgWindow.pop_front();
387 
388  // calculate trigger
389 
390  ++stats[trg_cc_counts];
391 
392  // calculate active modules
393 
394  int activeModules = -1;
395  double detectorRate = 0.0;
396 
397  if (!rates.empty() &&
398  rates.count(currentFrame)) {
399 
400  activeModules = 0;
401 
402  for (map<int, double>::const_iterator p = rates.at(currentFrame).begin();
403  p != rates.at(currentFrame).end(); p++ ) {
404 
405  detectorRate += p->second;
406 
407  activeModules += (p->second > 0);
408 
409  }
410  } else {
411 
412  activeModules = timesliceSize;
413 
414  }
415 
416  // build summary message
417 
418  json jd;
419 
420  jd[detid_name] = DETID;
421  jd[active_doms_name] = activeModules;
422  jd[detector_rate_name] = int(detectorRate / 1000.0);
423  jd[run_number_name] = RUN;
424  jd[frame_index_name] = currentFrame;
425  jd[daq_time_name] = to_string(currentTime);
426  jd[trigger_name][cc_name][c_name] = trg_cc_counts;
427  jd[trigger_name][cc_name][m_name] = trg_cc_modules;
428  jd[trigger_name][ev_name][c_name] = trg_ev_counts;
429  jd[trigger_name][ev_name][m_name] = trg_ev_modules;
430  jd[active_pmts_name] = pmts[currentFrame];
431  jd[observables_name] = observables;
432  jd[multiplicities_name] = multiplicities;
433 
434  string msg = jd.dump();
435 
436  DEBUG(msg << endl);
437 
438  if (testMode){
439  if (nWrittenEntries < nTestEntries){
440  // write summary information to output file (first nTestEntries)
441  outputFile << msg << endl;
442  nWrittenEntries++;
443  } else {
444  // When test entries written, close output file and exit loop
445  outputFile.close();
446  break;
447  }
448  } else {
449  // send summary information to output ligier
450 
451  if (out != NULL && trg_ev_counts > 0) {
452  out->PutFullString(outputTag, msg);
453  }
454  }
455 
456  // print stats
457 
458  if ( (counter_live_ts % ((int)(statPrintInterval_s / frameTime_s)) == 0 ) ) {
459 
460  double livetime = counter_live_ts * frameTime_s;
461 
462  stats.setLiveTime(livetime);
463 
464  NOTICE(endl);
465  NOTICE(stats.toString());
466  NOTICE("=> discarded out-of-order timeslices = " << counter_lost_ts << endl);
467 
468  if (summaryFile != "") {
469  ofstream of(summaryFile.c_str());
470  of << stats.toSummaryFile();
471  of.close();
472  }
473 
474  }
475 
476  } else {
477  NOTICE("Filling trigger queue: " << trgQueue.size() << "/" << queueLength << '\r');
478  }
479 
480  } else {
481 
482  NOTICE("timeout " << setw(3) << i << endl);
483 
484  ++i;
485  }
486  }
487  }
488 
489  catch(const JSocketException& error) {
490  ERROR(error.what() << endl);
491  }
492 
493 }
Utility class to parse command line options.
Definition: JParser.hh:1711
General exception.
Definition: JException.hh:24
static const std::string run_number_name
debug
Definition: JMessage.hh:29
int main(int argc, char *argv[])
Definition: Main.cc:15
ROOT TTree parameter settings of various packages.
int MyId(const std::string &nick_name)
Identify.
std::vector< event_type > data_type
Definition: JPerth.cc:81
static const std::string multiplicities_name
static const std::string ev_name
ControlHost class.
Auxiliary class to define a veto time window on a set of optical modules.
Definition: JSupernova.hh:148
virtual bool hasNext() override
Check availability of next element.
Detector data structure.
Definition: JDetector.hh:89
virtual const pointer_type & next() override
Get next element.
Auxiliary class to select ROOT class based on class name.
SN filter based on veto window.
Definition: JSupernova.hh:441
Router for direct addressing of module data in detector data structure.
virtual void reset() override
Reset pointer.
Definition: JStorage.hh:42
data_type r[M+1]
Definition: JPolint.hh:868
string outputFile
Data structure for UTC time.
static const std::string frame_index_name
Data structure for detector geometry and calibration.
static const std::string daq_time_name
static const std::string active_doms_name
Simple wrapper around JModuleRouter class for direct addressing of PMT data in detector data structur...
int getFrameIndex() const
Get frame index.
Auxiliary data structure to store data and fit in memory.
Definition: JPerth.cc:76
string toSummaryFile()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:671
Auxiliary class for time values.
Definition: JTimeval.hh:26
static const std::string detid_name
void setDAQLongprint(const bool option)
Set DAQ print option.
Definition: JDAQPrint.hh:28
The template JSinglePointer class can be used to hold a pointer to an object.
then set_variable DETID
Definition: JEditTuneHV.sh:63
Detector file.
Definition: JHead.hh:226
static const std::string observables_name
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:2158
string toString()
put statistics into printable form outputs trigger level - rate - error
Definition: JSupernova.hh:638
SN trigger statistics, the information is stored in the form of a count as a function of the trigger ...
Definition: JSupernova.hh:617
double getFrameTime()
Get frame time duration.
Definition: JDAQClock.hh:162
then rm i $OUTPUT_FILE fi let RUN
Auxiliary class to apply the supernova trigger to SN data.
Definition: JSupernova.hh:468
#define NOTICE(A)
Definition: JMessage.hh:64
#define ERROR(A)
Definition: JMessage.hh:66
SN filter based on multiplicity selection optional suppression of multi-module coincidences WARNING: ...
Definition: JSupernova.hh:406
Timeslice data structure for SN data.
static const std::string trigger_name
General purpose messaging.
#define FATAL(A)
Definition: JMessage.hh:67
static const std::string m_name
Object iteration through ControlHost.
static const std::string detector_rate_name
Auxiliary class to build the supernova trigger dataset.
Definition: JSupernova.hh:202
virtual const char * what() const override
Get error message.
Definition: JException.hh:64
Normalisation of MUPAGE events.
Definition: JHead.hh:835
void load(const std::string &file_name, JDetector &detector)
Load detector from input file.
then fatal The output file must have the wildcard in the e g root fi eval JPrintDetector a $DETECTOR O IDENTIFIER eval JPrintDetector a $DETECTOR O SUMMARY JAcoustics sh $DETECTOR_ID source JAcousticsToolkit sh CHECK_EXIT_CODE typeset A EMITTERS get_tripods $WORKDIR tripod txt EMITTERS get_transmitters $WORKDIR transmitter txt EMITTERS for EMITTER in
Definition: JCanberra.sh:48
Utility class to parse command line options.
const JDAQHeader & getDAQHeader() const
Get DAQ header.
Definition: JDAQHeader.hh:49
std::string to_string(const T &value)
Convert value to string.
Exception for socket.
Definition: JException.hh:466
Auxiliary class to select DAQ hits based on time-over-treshold value.
Fixed parameters and ControlHost tags for KM3NeT DAQ.
void setLiveTime(const double lt)
Definition: JSupernova.hh:630
ControlHost tag.
Definition: JTag.hh:38
do set_variable DETECTOR_TXT $WORKDIR detector
static const int NUMBER_OF_PMTS
Total number of PMTs in module.
Definition: JDAQ.hh:26
static const std::string c_name
static const std::string active_pmts_name
static const std::string cc_name
int PutFullString(const JTag &tag, const std::string &buffer)
Send string.
int debug
debug level
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62
nlohmann::json json
Auxiliary class to manage a set of vetoes.
Definition: JSupernova.hh:334