580{
  583 
  584  int   port;
  585  int   backlog;
  586  int   timeout_us;
  589 
  590  try {
  591 
  593 
  598    zap[
'Q'] = 
make_field(JClient::QUEUE_LIMIT)    =    100;
 
  601 
  602    zap(argc, argv);
  603  }
  604  catch(const exception &error) {
  605    FATAL(error.what() << endl);
 
  606  }
  607 
  608 
  612 
  613 
  614  DEBUG(
"Port         " << setw(10) << port                    << endl);
 
  615  DEBUG(
"Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
 
  616  DEBUG(
"Queue  limit " << setw(10) << JClient::QUEUE_LIMIT    << endl);
 
  617 
  618 
  619  for ( ; ; ) {
  620 
  622    select.setReaderMask(server);
 
  623 
  624    for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
  625 
  626      if (!client->in.isReady()) {
  627        select.setReaderMask(*client);
 
  628      }
  629 
  630      if (client->out.isReset()) {
  631 
  632        if (!client->queue.empty()) {
  633        
  634          DEBUG(
"Client" << *client << 
".set" << client->queue.front() << endl);
 
  635        
  636          client->out.set(client->queue.front());
  637          client->decrementRequest();
  638        
  639          select.setWriterMask(*client);
 
  640        }
  641 
  642      } else if (client->out.isBusy()) {
  643 
  644        select.setWriterMask(*client);
 
  645      }
  646    }
  647 
  648    int nfds = 0;
  649 
  650    try {
  651      nfds = 
select(timeout_us);
 
  652    }
  653    catch(const exception& error) {
  654      ERROR(
"" << error.what() << endl);
 
  655    }
  656 
  657    if (nfds > 0) {
  658 
  659      for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
  660        
  661        try {
  662 
  663          if (
select.hasReaderMask(*client)) {
 
  664 
  665            try {
  666              client->in.read();
  667            }
  668            catch(const exception& error) {
  669 
  670              ERROR(
"Remove (3) client" << *client << 
"<" << client->getNickname() << 
">: " << error.what() << endl);
 
  671 
  672              if (client->getNickname() != "") {
  673                clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
 
  674              }
  675 
  676              client->shutdown();
  677 
  678              client = clientList.erase(client);
  679              
  680              continue;
  681            }
  682              
  684          }
  685                
  686          if (client->in.isReady()) {
  687 
  688            DEBUG(
"Message" << *client << 
' ' << client->in.prefix.c_str() << 
' ' << client->in.size() << endl);
 
  689 
  690            bool special = JControlHost::maybe_special(client->in.prefix);
  691 
  692            if (special) {
  693 
  695          
  696              if        (client->in.prefix.getTag() == DISPTAG_Subscribe) {
  697 
  698                client->setSubscription(string(client->in.getRemainingData(), client->in.getRemainingSize()));
  699                
  700              } else if (client->in.prefix.getTag() == DISPTAG_MyId) {
  701                
  702                client->setNickname(string(client->in.getRemainingData(), client->in.getRemainingSize()));
  703                
  704                clientList.
add(
JDispatch(DISPTAG_Born, client->getNickname()));
 
  705                
  706              } else if (client->in.prefix.getTag() == DISPTAG_Gime) {
  707                
  708                client->incrementRequest();
  709                
  710              } else if (client->in.prefix.getTag() == DISPTAG_Always) {
  711                
  712                client->setRequestAll();
  713                
  714              } else if (client->in.prefix.getTag() == DISPTAG_WhereIs) {
  715                
  716                string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
  717                string buffer;
  718                
  719                for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
  720                  if (i->getNickname() == nick_name) {
  721                    buffer += " " + i->getHostname();
  722                  }
  723                }
  724                
  726                
  727                socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
  728                
  729                DEBUG(
"Remove (1) client" << *client << endl);
 
  730                
  731                client->shutdown();
  732                
  733                client = clientList.erase(client);
  734                
  735                continue;         
  736 
  737              } else if (client->in.prefix.getTag() == DISPTAG_ShowStat) {
  738 
  739                client->shutdown();
  740                
  741                client = clientList.erase(client);
  742 
  743                for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
  744 
  745                  int total = 0;
  746 
  747                  for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
  748                    total += message->size();
  749                  }
  750 
  751                  cout << "client[" << i->getFileDescriptor() << "] " << i->getNickname() << endl;
  752                  cout << "tag - all:";
  754                    cout << ' ' << *tag;
  755                  }
  756                  cout << endl;
  757                  cout << "tag - any:";
  759                    cout << ' ' << *tag;
  760                  }
  761                  cout << endl;
  762                  cout << "queue " << i->queue.size() << ' ' << total << "B" << endl;
  763                }
  764                
  765                continue;         
  766 
  767              } else if (client->in.prefix.getTag() == DISPTAG_Debug) {
  768 
  769                istringstream is(string(client->in.getRemainingData(), client->in.getRemainingSize()));
  770 
  772 
  773              } else {
  774 
  775                special = false;  
  776              }
  777            } 
  778            
  779            if (!special) {
  780 
  781              clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
 
  782 
  783              if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
  784 
  785                WARNING(
"Memory " << setw(10) << JDispatch::MEMORY_TOTAL << 
" > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
 
  786 
  788              }
  789            }
  790            
  791            client->in.reset();
  792          }
  793          
  794          if (
select.hasWriterMask(*client)) {
 
  795              
  796            client->out.write();
  797            
  798            DEBUG(
"Client" << *client << 
".write" << 
static_cast<const JSocketStatus&
>(client->out) << endl);
 
  799            
  800            if (client->out.isReady()) {
  801              client->out.reset();
  802              client->queue.pop_front();
  803            }
  804          }
  805 
  806          ++client;
  807        }
  808        catch(const exception& error) {
  809 
  810          DEBUG(
"Remove (2) client" << *client << 
"<" << client->getNickname() << 
">: " << error.what() << endl);
 
  811          
  812          if (client->getNickname() != "") {
  813            clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
 
  814          }
  815 
  816          client->shutdown();
  817 
  818          client = clientList.erase(client);
  819        }
  820      }
  821 
  822      if (
select.hasReaderMask(server)) {
 
  823 
  824        JTCPSocket socket(server.getFileDescriptor());
 
  825        
  828        
  829        socket.setReuseAddress(true);
  830        socket.setKeepAlive   (true);
  831        socket.setNonBlocking (true);
  832 
  833        DEBUG(
"New client" << socket << endl);
 
  834 
  835        clientList.push_back(
JClient(socket));
 
  836      } 
  837    }
  838  }
  839}
#define DEBUG(A)
Message macros.
 
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
 
List of ControlHost client managers.
 
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
 
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
 
ControlHost client manager.
 
Data structure of a ControlHost message.
 
Wrapper class for select call.
 
Auxiliary class for non-blocking socket I/O.
 
Auxiliary class for non-blocking socket I/O.
 
Utility class to parse command line options.
 
static const size_t buffer_size
 
bool select(const Trk &trk, const Evt &evt)
Event selection.
 
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
 
unsigned long long int getRAM()
Get RAM of this CPU.