41{
   45 
   46  unsigned int ts_duration = 100;
   47  int run_number = -1;
   48  int detector_id = 0;
   50  std::string acou_recipient;
   51  std::string roy_server;
   52  unsigned int acoustic_port = 
no_port;
 
   53  unsigned int optical_port = 
no_port;
 
   54 
   55  std::string file_prefix("dump_file"), file_postfix(".dqd");
   56  std::string roy_setup;
   57 
   58  std::size_t dump_size = 1024 * 1024 * 1024; 
   59 
   60  po::options_description desc("Options");
   61  desc.add_options()
   62    ("help,h",     "Print this help and exit.")
   63    ("version,v", "Print the version and exit.")
   64    ("optical,o",
   65     po::value<unsigned int>(&optical_port),
   66     "Set the port to listen for optical data.")
   67    ("acoustic,a",
   68     po::value<unsigned int>(&acoustic_port),
   69     "Set the port to listen for acoustic data.")
   70 
   71    ("timeslice,t",
   72     po::value<unsigned int>(&ts_duration)->required(),
   73     "Set the value of the time slice duration in milliseconds.")
   74 
   75    ("maxdumpsize",
   76     po::value<std::size_t>(&dump_size)->default_value(dump_size),
   77     "Set the maximum size of the dump file.")
   78 
   79    ("prefix",
   80     po::value<std::string>(&file_prefix)->default_value(file_prefix),
   81     "Set the dump file name prefix.")
   82 
   83    ("postfix",
   84     po::value<std::string>(&file_postfix)->default_value(file_postfix),
   85     "Set the dump file name postfix.")
   86 
   87    
   88
   89
   90
   91
   92 
   93    ("optical-recipients",
   94     po::value<std::vector<std::string> >(&opto_recipients)->multitoken(),
   95     "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.")
   96 
   97    ("acoustic-recipient",
   98     po::value<std::string>(&acou_recipient),
   99     "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.")
  100 
  101    ("run-number,r",
  102     po::value<int>(&run_number)->default_value(run_number),
  103     "Set the run-number. If it is set, data not belonging to the specified run will be discarded.")
  104    ("detector-id,i",
  105     po::value<int>(&detector_id)->default_value(detector_id),
  106     "Set the detector id.");
  107 
  108  bool acou = false;
  109  bool opto = false;
  110  
  111 
  112 
  113  try
  114    {
  115      po::variables_map vm;
  116      po::store(
  117                po::command_line_parser(argc, argv).options(desc).run(),
  118                vm);
  119 
  120      if (vm.count("help"))
  121        {
  122          std::cout << desc << std::endl;
  123          return EXIT_SUCCESS;
  124        }
  125 
  126      if (vm.count("version"))
  127        {
  128          std::cout << dataqueue::version::v() << std::endl;
  129          return EXIT_SUCCESS;
  130        }
  131 
  132      po::notify(vm);
  133 
  134      opto = vm.count("optical");
  135 
  136      acou = vm.count("acoustic");
  137 
  138      if (! (acou || opto))
  139        {
  140          throw std::runtime_error("FATAL: Both acoustic and optical port missing.");
  141        }
  142 
  143      if (acou && !vm.count("acoustic-recipient"))
  144        {
  145          throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified.");
  146        }
  147 
  148      if (opto && !vm.count("optical-recipients"))
  149        {
  150          throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified.");
  151        }
  152 
  153      
  154
  155
  156
  157
  158
  159
  160 
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177 
  178
  179
  180
  181
  182
  183
  184    }
  185  catch (const po::error& e)
  186    {
  187      std::cerr << "DataQueue: Error: " << e.what() << '\n'
  188                << desc << std::endl;
  189      return EXIT_FAILURE;
  190    }
  191  catch (const std::runtime_error& e)
  192    {
  193      std::cerr << "DataQueue: Error: " << e.what() << '\n'
  194                << desc << std::endl;
  195      return EXIT_FAILURE;
  196    }
  197 
  198  
  199 
  201 
  204  boost::thread* acou_thread = 0;
  205  boost::thread* opto_thread = 0;
  206 
  209 
  210  if (opto)
  211    {
  212      for (std::vector<std::string>::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it)
  213        {
  214          optoRecipients.add(*it);
  215        }
  216    }
  217 
  218  if (acou)
  219    {
  220      acouRecipients.add(acou_recipient);
  221    }
  222 
  225 
  226  boost::thread* farm_threads[2] = {0, 0};
  227 
  229 
  230  if (acou)
  231    {
  232      std::cout << "Acoustics on\n";
  233      aFarm = 
new FrameFarm(ts_duration, 0, dump_size, file_prefix + 
"_a", file_postfix);
 
  236 
  237      farm_threads[1] = new boost::thread(boost::ref(*aFarm));
  239      acou_thread = new boost::thread(boost::ref(*aDFI));
  240      doms_interface.add_channel(acoustic_port, *aFarm);
  241      doms_interface.add_worker();
  242    }
  243 
  244  if (opto)
  245    {
  246      std::cout << "Optics on\n";
  247      oFarm = 
new FrameFarm(ts_duration, 0, dump_size, file_prefix + 
"_o", file_postfix);
 
  250 
  251      farm_threads[0] = new boost::thread(boost::ref(*oFarm));
  253      opto_thread = new boost::thread(boost::ref(*oDFI));
  254      doms_interface.add_channel(optical_port, *oFarm);
  255      doms_interface.add_worker();
  256    }
  257 
  258  doms_interface.start();
  259 
  260  
  261 
  262  std::cout << "Hit \'q\' and press [Return] to exit\n";
  263 
  264  char ch = 0;
  265 
  266  do
  267    {
  268      const timeval timeout = {10, 0};
  270        {
  271          std::cin >> ch;
  272          fflush(stdin);
  273        }
  274 
  278    } while (ch != 'q');
  279 
  280  std::cout << "Closing DataQueue\n";
  281 
  282  
  283 
  284  doms_interface.stop();
  285 
  286  std::cout << "DOMs interface closed\n";
  287 
  288  
  289 
  290  if (oFarm)
  292 
  293  if (aFarm)
  295 
  296  std::cout << "Farms closed\n";
  297 
  298  
  299 
  301    {
  303      acou_thread->join();
  304      delete aDFI;
  305      delete acou_thread;
  306    }
  307 
  309    {
  311      opto_thread->join();
  312      delete oDFI;
  313      delete opto_thread;
  314    }
  315 
  316  std::cout << "DataFilter Interfaces stopped\n";
  317 
  318  if (oFarm)
  319    farm_threads[0]->join();
  320 
  321  if (aFarm)
  322    farm_threads[1]->join();
  323 
  324  std::cout << "Farms returned\n";
  325 
  326  delete aFarm;
  327  delete oFarm;
  328  delete farm_threads[0];
  329  delete farm_threads[1];
  330 
  331  std::cout << "Bye bye\n";
  332}
static boost::atomic< unsigned int > n_obj
 
unsigned int detectorId(unsigned int detector_id)
 
static InBufferCollector & getCollector()
 
static boost::atomic< unsigned int > n_obj
 
void __debug_abort_on_wrong_size_(size_t size)
 
static const unsigned int no_port
 
static int wait_cin_for(timeval tv)