41{
   45 
   46  unsigned int ts_duration = 100;
   47  int run_number = -1;
   48  int detector_id = 0;
   50  std::string acou_recipient;
   51  std::string roy_server;
   52  unsigned int acoustic_port = 
no_port;
 
   53  unsigned int optical_port = 
no_port;
 
   54 
   55  std::string file_prefix("dump_file"), file_postfix(".dqd");
   56  std::string roy_setup;
   57 
   58  std::size_t dump_size = 1024 * 1024 * 1024; 
   59 
   60  po::options_description desc("Options");
   61  desc.add_options()
   62      ("help,h",     "Print this help and exit.")
   63      ("version,v", "Print the version and exit.")
   64      ("optical,o",
   65          po::value<unsigned int>(&optical_port),
   66          "Set the port to listen for optical data.")
   67      ("acoustic,a",
   68          po::value<unsigned int>(&acoustic_port),
   69          "Set the port to listen for acoustic data.")
   70 
   71      ("timeslice,t",
   72          po::value<unsigned int>(&ts_duration)->required(),
   73          "Set the value of the time slice duration in milliseconds.")
   74 
   75      ("maxdumpsize",
   76          po::value<std::size_t>(&dump_size)->default_value(dump_size),
   77          "Set the maximum size of the dump file.")
   78 
   79      ("prefix",
   80          po::value<std::string>(&file_prefix)->default_value(file_prefix),
   81          "Set the dump file name prefix.")
   82 
   83      ("postfix",
   84          po::value<std::string>(&file_postfix)->default_value(file_postfix),
   85          "Set the dump file name postfix.")
   86 
   87      
   88
   89
   90
   91
   92 
   93      ("optical-recipients",
   94          po::value<std::vector<std::string> >(&opto_recipients)->multitoken(),
   95          "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.")
   96 
   97      ("acoustic-recipient",
   98          po::value<std::string>(&acou_recipient),
   99          "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.")
  100 
  101      ("run-number,r",
  102          po::value<int>(&run_number)->default_value(run_number),
  103          "Set the run-number. If it is set, data not belonging to the specified run will be discarded.")
  104      ("detector-id,i",
  105          po::value<int>(&detector_id)->default_value(detector_id),
  106          "Set the detector id.");
  107 
  108  bool acou = false;
  109  bool opto = false;
  110
  111 
  112 
  113  try
  114  {
  115    po::variables_map vm;
  116    po::store(
  117        po::command_line_parser(argc, argv).options(desc).run(),
  118        vm);
  119 
  120    if (vm.count("help"))
  121    {
  122      std::cout << desc << std::endl;
  123      return EXIT_SUCCESS;
  124    }
  125 
  126    if (vm.count("version"))
  127    {
  128      std::cout << dataqueue::version::v() << std::endl;
  129      return EXIT_SUCCESS;
  130    }
  131 
  132    po::notify(vm);
  133 
  134    opto = vm.count("optical");
  135 
  136    acou = vm.count("acoustic");
  137 
  138    if (! (acou || opto))
  139    {
  140      throw std::runtime_error("FATAL: Both acoustic and optical port missing.");
  141    }
  142 
  143    if (acou && !vm.count("acoustic-recipient"))
  144    {
  145      throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified.");
  146    }
  147 
  148    if (opto && !vm.count("optical-recipients"))
  149    {
  150      throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified.");
  151    }
  152 
  153    
  154
  155
  156
  157
  158
  159
  160 
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177 
  178
  179
  180
  181
  182
  183
  184  }
  185  catch (const po::error& e)
  186  {
  187    std::cerr << "DataQueue: Error: " << e.what() << '\n'
  188              << desc << std::endl;
  189    return EXIT_FAILURE;
  190  }
  191  catch (const std::runtime_error& e)
  192  {
  193    std::cerr << "DataQueue: Error: " << e.what() << '\n'
  194              << desc << std::endl;
  195    return EXIT_FAILURE;
  196  }
  197 
  198  
  199 
  201 
  204  boost::thread* acou_thread = 0;
  205  boost::thread* opto_thread = 0;
  206 
  209 
  210  if (opto)
  211  {
  212    for (std::vector<std::string>::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it)
  213    {
  214      optoRecipients.add(*it);
  215    }
  216  }
  217 
  218  if (acou)
  219  {
  220    acouRecipients.add(acou_recipient);
  221  }
  222 
  225 
  226  boost::thread* farm_threads[2] = {0, 0};
  227 
  229 
  230  if (acou)
  231  {
  232    std::cout << "Acoustics on\n";
  233    aFarm = 
new FrameFarm(ts_duration, 0, dump_size, file_prefix + 
"_a", file_postfix);
 
  236 
  237    farm_threads[1] = new boost::thread(boost::ref(*aFarm));
  239    acou_thread = new boost::thread(boost::ref(*aDFI));
  240    doms_interface.add_channel(acoustic_port, *aFarm);
  241    doms_interface.add_worker();
  242  }
  243 
  244  if (opto)
  245  {
  246    std::cout << "Optics on\n";
  247    oFarm = 
new FrameFarm(ts_duration, 0, dump_size, file_prefix + 
"_o", file_postfix);
 
  250 
  251    farm_threads[0] = new boost::thread(boost::ref(*oFarm));
  253    opto_thread = new boost::thread(boost::ref(*oDFI));
  254    doms_interface.add_channel(optical_port, *oFarm);
  255    doms_interface.add_worker();
  256  }
  257 
  258  doms_interface.start();
  259 
  260  
  261 
  262  std::cout << "Hit \'q\' and press [Return] to exit\n";
  263 
  264  char ch = 0;
  265 
  266  do
  267  {
  268    const timeval timeout = {10, 0};
  270    {
  271      std::cin >> ch;
  272      fflush(stdin);
  273    }
  274 
  278  } while (ch != 'q');
  279 
  280  std::cout << "Closing DataQueue\n";
  281 
  282  
  283 
  284  doms_interface.stop();
  285 
  286  std::cout << "DOMs interface closed\n";
  287 
  288  
  289 
  290  if (oFarm)
  292 
  293  if (aFarm)
  295 
  296  std::cout << "Farms closed\n";
  297 
  298  
  299 
  301  {
  303    acou_thread->join();
  304    delete aDFI;
  305    delete acou_thread;
  306  }
  307 
  309  {
  311    opto_thread->join();
  312    delete oDFI;
  313    delete opto_thread;
  314  }
  315 
  316  std::cout << "DataFilter Interfaces stopped\n";
  317 
  318  if (oFarm)
  319    farm_threads[0]->join();
  320 
  321  if (aFarm)
  322    farm_threads[1]->join();
  323 
  324  std::cout << "Farms returned\n";
  325 
  326  delete aFarm;
  327  delete oFarm;
  328  delete farm_threads[0];
  329  delete farm_threads[1];
  330 
  331  std::cout << "Bye bye\n";
  332}
static boost::atomic< unsigned int > n_obj
 
unsigned int detectorId(unsigned int detector_id)
 
static InBufferCollector & getCollector()
 
static boost::atomic< unsigned int > n_obj
 
void __debug_abort_on_wrong_size_(size_t size)
 
static const unsigned int no_port
 
static int wait_cin_for(timeval tv)