598 zap[
'Q'] =
make_field(JClient::QUEUE_LIMIT) = 100;
604 catch(
const exception &error) {
605 FATAL(error.what() << endl);
614 DEBUG(
"Port " << setw(10) << port << endl);
615 DEBUG(
"Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
616 DEBUG(
"Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl);
622 select.setReaderMask(server);
624 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
626 if (!client->in.isReady()) {
627 select.setReaderMask(*client);
630 if (client->out.isReset()) {
632 if (!client->queue.empty()) {
634 DEBUG(
"Client" << *client <<
".set" << client->queue.front() << endl);
636 client->out.set(client->queue.front());
637 client->decrementRequest();
639 select.setWriterMask(*client);
642 }
else if (client->out.isBusy()) {
644 select.setWriterMask(*client);
651 nfds =
select(timeout_us);
653 catch(
const exception& error) {
654 ERROR(
"" << error.what() << endl);
659 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
663 if (
select.hasReaderMask(*client)) {
668 catch(
const exception& error) {
670 ERROR(
"Remove (3) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
672 if (client->getNickname() !=
"") {
678 client = clientList.erase(client);
686 if (client->in.isReady()) {
688 DEBUG(
"Message" << *client <<
' ' << client->in.prefix.c_str() <<
' ' << client->in.size() << endl);
690 bool special = JControlHost::maybe_special(client->in.prefix);
698 client->setSubscription(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
700 }
else if (client->in.prefix.getTag() ==
DISPTAG_MyId) {
702 client->setNickname(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
706 }
else if (client->in.prefix.getTag() ==
DISPTAG_Gime) {
708 client->incrementRequest();
712 client->setRequestAll();
716 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer +=
" " + i->getHostname();
727 socket.PutFullString(
DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
729 DEBUG(
"Remove (1) client" << *client << endl);
733 client = clientList.erase(client);
741 client = clientList.erase(client);
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
751 cout <<
"client[" << i->getFileDescriptor() <<
"] " << i->getNickname() << endl;
752 cout <<
"tag - all:";
757 cout <<
"tag - any:";
762 cout <<
"queue " << i->queue.size() <<
' ' << total <<
"B" << endl;
769 istringstream is(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
781 clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
783 if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
785 WARNING(
"Memory " << setw(10) << JDispatch::MEMORY_TOTAL <<
" > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
794 if (
select.hasWriterMask(*client)) {
798 DEBUG(
"Client" << *client <<
".write" <<
static_cast<const JSocketStatus&
>(client->out) << endl);
800 if (client->out.isReady()) {
802 client->queue.pop_front();
808 catch(
const exception& error) {
810 DEBUG(
"Remove (2) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
812 if (client->getNickname() !=
"") {
818 client = clientList.erase(client);
822 if (
select.hasReaderMask(server)) {
824 JTCPSocket socket(server.getFileDescriptor());
829 socket.setReuseAddress(
true);
830 socket.setKeepAlive (
true);
831 socket.setNonBlocking (
true);
833 DEBUG(
"New client" << socket << endl);
835 clientList.push_back(
JClient(socket));
#define DEBUG(A)
Message macros.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
List of ControlHost client managers.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
ControlHost client manager.
Data structure of a ControlHost message.
Wrapper class for select call.
Auxiliary class for non-blocking socket I/O.
Auxiliary class for non-blocking socket I/O.
Utility class to parse command line options.
static const size_t buffer_size
bool select(const Trk &trk, const Evt &evt)
Event selection.
static const JTag DISPTAG_Always("_Always")
static const JTag DISPTAG_Gime("_Gime")
static const int DISPATCH_PORT
Default ControlHost port.
static const JTag DISPTAG_WhereIs("_WhereIs")
static const JTag DISPTAG_Debug("_Debug")
static const JTag DISPTAG_MyId("_MyId")
static const JTag DISPTAG_Subscribe("_Subscri")
Special ControlHost tags.
static const JTag DISPTAG_Born("Born")
static const JTag DISPTAG_ShowStat("_ShowSta")
static const JTag DISPTAG_Died("Died")
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.