580{
583
584 int port;
585 int backlog;
586 int timeout_us;
589
590 try {
591
593
598 zap[
'Q'] =
make_field(JClient::QUEUE_LIMIT) = 100;
601
602 zap(argc, argv);
603 }
604 catch(const exception &error) {
605 FATAL(error.what() << endl);
606 }
607
608
612
613
614 DEBUG(
"Port " << setw(10) << port << endl);
615 DEBUG(
"Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
616 DEBUG(
"Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl);
617
618
619 for ( ; ; ) {
620
622 select.setReaderMask(server);
623
624 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
625
626 if (!client->in.isReady()) {
627 select.setReaderMask(*client);
628 }
629
630 if (client->out.isReset()) {
631
632 if (!client->queue.empty()) {
633
634 DEBUG(
"Client" << *client <<
".set" << client->queue.front() << endl);
635
636 client->out.set(client->queue.front());
637 client->decrementRequest();
638
639 select.setWriterMask(*client);
640 }
641
642 } else if (client->out.isBusy()) {
643
644 select.setWriterMask(*client);
645 }
646 }
647
648 int nfds = 0;
649
650 try {
651 nfds =
select(timeout_us);
652 }
653 catch(const exception& error) {
654 ERROR(
"" << error.what() << endl);
655 }
656
657 if (nfds > 0) {
658
659 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
660
661 try {
662
663 if (
select.hasReaderMask(*client)) {
664
665 try {
666 client->in.read();
667 }
668 catch(const exception& error) {
669
670 ERROR(
"Remove (3) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
671
672 if (client->getNickname() != "") {
673 clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
674 }
675
676 client->shutdown();
677
678 client = clientList.erase(client);
679
680 continue;
681 }
682
684 }
685
686 if (client->in.isReady()) {
687
688 DEBUG(
"Message" << *client <<
' ' << client->in.prefix.c_str() <<
' ' << client->in.size() << endl);
689
690 bool special = JControlHost::maybe_special(client->in.prefix);
691
692 if (special) {
693
695
696 if (client->in.prefix.getTag() == DISPTAG_Subscribe) {
697
698 client->setSubscription(string(client->in.getRemainingData(), client->in.getRemainingSize()));
699
700 } else if (client->in.prefix.getTag() == DISPTAG_MyId) {
701
702 client->setNickname(string(client->in.getRemainingData(), client->in.getRemainingSize()));
703
704 clientList.
add(
JDispatch(DISPTAG_Born, client->getNickname()));
705
706 } else if (client->in.prefix.getTag() == DISPTAG_Gime) {
707
708 client->incrementRequest();
709
710 } else if (client->in.prefix.getTag() == DISPTAG_Always) {
711
712 client->setRequestAll();
713
714 } else if (client->in.prefix.getTag() == DISPTAG_WhereIs) {
715
716 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
717 string buffer;
718
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer += " " + i->getHostname();
722 }
723 }
724
726
727 socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
728
729 DEBUG(
"Remove (1) client" << *client << endl);
730
731 client->shutdown();
732
733 client = clientList.erase(client);
734
735 continue;
736
737 } else if (client->in.prefix.getTag() == DISPTAG_ShowStat) {
738
739 client->shutdown();
740
741 client = clientList.erase(client);
742
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
744
745 int total = 0;
746
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
749 }
750
751 cout << "client[" << i->getFileDescriptor() << "] " << i->getNickname() << endl;
752 cout << "tag - all:";
754 cout << ' ' << *tag;
755 }
756 cout << endl;
757 cout << "tag - any:";
759 cout << ' ' << *tag;
760 }
761 cout << endl;
762 cout << "queue " << i->queue.size() << ' ' << total << "B" << endl;
763 }
764
765 continue;
766
767 } else if (client->in.prefix.getTag() == DISPTAG_Debug) {
768
769 istringstream is(string(client->in.getRemainingData(), client->in.getRemainingSize()));
770
772
773 } else {
774
775 special = false;
776 }
777 }
778
779 if (!special) {
780
781 clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
782
783 if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
784
785 WARNING(
"Memory " << setw(10) << JDispatch::MEMORY_TOTAL <<
" > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
786
788 }
789 }
790
791 client->in.reset();
792 }
793
794 if (
select.hasWriterMask(*client)) {
795
796 client->out.write();
797
798 DEBUG(
"Client" << *client <<
".write" <<
static_cast<const JSocketStatus&
>(client->out) << endl);
799
800 if (client->out.isReady()) {
801 client->out.reset();
802 client->queue.pop_front();
803 }
804 }
805
806 ++client;
807 }
808 catch(const exception& error) {
809
810 DEBUG(
"Remove (2) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
811
812 if (client->getNickname() != "") {
813 clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
814 }
815
816 client->shutdown();
817
818 client = clientList.erase(client);
819 }
820 }
821
822 if (
select.hasReaderMask(server)) {
823
824 JTCPSocket socket(server.getFileDescriptor());
825
828
829 socket.setReuseAddress(true);
830 socket.setKeepAlive (true);
831 socket.setNonBlocking (true);
832
833 DEBUG(
"New client" << socket << endl);
834
835 clientList.push_back(
JClient(socket));
836 }
837 }
838 }
839}
#define DEBUG(A)
Message macros.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
List of ControlHost client managers.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
ControlHost client manager.
Data structure of a ControlHost message.
Wrapper class for select call.
Auxiliary class for non-blocking socket I/O.
Auxiliary class for non-blocking socket I/O.
Utility class to parse command line options.
static const size_t buffer_size
bool select(const Trk &trk, const Evt &evt)
Event selection.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.