Jpp 19.3.0-rc.1
the software that should make you happy
Loading...
Searching...
No Matches
JLigier.cc File Reference

Quicksilver Messenger Service - a ControlHost server. More...

#include <iostream>
#include <iomanip>
#include <string>
#include <sstream>
#include <deque>
#include <set>
#include "Jeep/JParser.hh"
#include "Jeep/JMessage.hh"
#include "JLang/JMemory.hh"
#include "JLang/JSharedCounter.hh"
#include "JNet/JTCPSocket.hh"
#include "JNet/JSocketChannel.hh"
#include "JNet/JServerSocket.hh"
#include "JNet/JSelect.hh"
#include "JNet/JControlHost.hh"
#include "JSystem/JSystemToolkit.hh"
#include "JMath/JConstants.hh"

Go to the source code of this file.

Classes

class  JNET::JMemory_t< T >
 
class  JNET::JDispatch
 Data structure of a ControlHost message. More...
 
class  JNET::JClient
 ControlHost client manager. More...
 
class  JNET::JClientList
 List of ControlHost client managers. More...
 

Namespaces

namespace  JNET
 

Typedefs

typedef JPrefix JNET::JPrefix_t
 
typedef JSocketInputChannel< JPrefix_tJNET::JSocketInputChannel_t
 

Functions

int JNET::getSizeOfPacket (const JPrefix_t &prefix)
 Get total size of internet packet.
 
void JNET::setSizeOfPacket (const int size, JPrefix_t &prefix)
 Set total size of internet packet.
 
std::ostream & JNET::operator<< (std::ostream &out, const JDispatch &message)
 Print message.
 
std::ostream & JNET::operator<< (std::ostream &out, const JSocket &socket)
 Print socket.
 
std::ostream & JNET::operator<< (std::ostream &out, const JSocketStatus &status)
 Print socket status.
 
std::ostream & JNET::operator<< (std::ostream &out, const JSocketInputBuffer &buffer)
 Print socket input buffer.
 
int main (int argc, char *argv[])
 

Detailed Description

Quicksilver Messenger Service - a ControlHost server.

Author
mdejong

Definition in file JLigier.cc.

Function Documentation

◆ main()

int main ( int argc,
char * argv[] )

Definition at line 579 of file JLigier.cc.

580{
581 using namespace std;
582 using namespace JPP;
583
584 int port;
585 int backlog;
586 int timeout_us;
587 int buffer_size;
588 int debug;
589
590 try {
591
592 JParser<> zap("ControlHost server.");
593
594 zap['P'] = make_field(port) = DISPATCH_PORT;
595 zap['q'] = make_field(backlog) = 1024;
596 zap['T'] = make_field(timeout_us) = 5;
597 zap['s'] = make_field(buffer_size) = JSocket::getDefaultBufferSize();
598 zap['Q'] = make_field(JClient::QUEUE_LIMIT) = 100;
599 zap['M'] = make_field(JDispatch::MEMORY_LIMIT) = (JSYSTEM::getRAM() >> 1);
600 zap['d'] = make_field(debug) = 0;
601
602 zap(argc, argv);
603 }
604 catch(const exception &error) {
605 FATAL(error.what() << endl);
606 }
607
608
609 JServerSocket server(port, backlog);
611 JClientList clientList;
612
613
614 DEBUG("Port " << setw(10) << port << endl);
615 DEBUG("Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
616 DEBUG("Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl);
617
618
619 for ( ; ; ) {
620
621 select.reset();
622 select.setReaderMask(server);
623
624 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
625
626 if (!client->in.isReady()) {
627 select.setReaderMask(*client);
628 }
629
630 if (client->out.isReset()) {
631
632 if (!client->queue.empty()) {
633
634 DEBUG("Client" << *client << ".set" << client->queue.front() << endl);
635
636 client->out.set(client->queue.front());
637 client->decrementRequest();
638
639 select.setWriterMask(*client);
640 }
641
642 } else if (client->out.isBusy()) {
643
644 select.setWriterMask(*client);
645 }
646 }
647
648 int nfds = 0;
649
650 try {
651 nfds = select(timeout_us);
652 }
653 catch(const exception& error) {
654 ERROR("" << error.what() << endl);
655 }
656
657 if (nfds > 0) {
658
659 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
660
661 try {
662
663 if (select.hasReaderMask(*client)) {
664
665 try {
666 client->in.read();
667 }
668 catch(const exception& error) {
669
670 ERROR("Remove (3) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl);
671
672 if (client->getNickname() != "") {
673 clientList.add(JDispatch(DISPTAG_Died, client->getNickname()));
674 }
675
676 client->shutdown();
677
678 client = clientList.erase(client);
679
680 continue;
681 }
682
683 DEBUG("Client" << *client << ".read" << static_cast<const JSocketInputBuffer&>(client->in) << endl);
684 }
685
686 if (client->in.isReady()) {
687
688 DEBUG("Message" << *client << ' ' << client->in.prefix.c_str() << ' ' << client->in.size() << endl);
689
690 bool special = JControlHost::maybe_special(client->in.prefix);
691
692 if (special) {
693
694 client->in.seekg(sizeof(JPrefix_t)); // skip prefix
695
696 if (client->in.prefix.getTag() == DISPTAG_Subscribe) {
697
698 client->setSubscription(string(client->in.getRemainingData(), client->in.getRemainingSize()));
699
700 } else if (client->in.prefix.getTag() == DISPTAG_MyId) {
701
702 client->setNickname(string(client->in.getRemainingData(), client->in.getRemainingSize()));
703
704 clientList.add(JDispatch(DISPTAG_Born, client->getNickname()));
705
706 } else if (client->in.prefix.getTag() == DISPTAG_Gime) {
707
708 client->incrementRequest();
709
710 } else if (client->in.prefix.getTag() == DISPTAG_Always) {
711
712 client->setRequestAll();
713
714 } else if (client->in.prefix.getTag() == DISPTAG_WhereIs) {
715
716 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
717 string buffer;
718
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer += " " + i->getHostname();
722 }
723 }
724
725 JControlHost socket(*client);
726
727 socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
728
729 DEBUG("Remove (1) client" << *client << endl);
730
731 client->shutdown();
732
733 client = clientList.erase(client);
734
735 continue; // skip any action
736
737 } else if (client->in.prefix.getTag() == DISPTAG_ShowStat) {
738
739 client->shutdown();
740
741 client = clientList.erase(client);
742
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
744
745 int total = 0;
746
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
749 }
750
751 cout << "client[" << i->getFileDescriptor() << "] " << i->getNickname() << endl;
752 cout << "tag - all:";
753 for (std::set<JTag>::const_iterator tag = i->getSubscriptionAll().begin(); tag != i->getSubscriptionAll().end(); ++tag) {
754 cout << ' ' << *tag;
755 }
756 cout << endl;
757 cout << "tag - any:";
758 for (std::set<JTag>::const_iterator tag = i->getSubscriptionAny().begin(); tag != i->getSubscriptionAny().end(); ++tag) {
759 cout << ' ' << *tag;
760 }
761 cout << endl;
762 cout << "queue " << i->queue.size() << ' ' << total << "B" << endl;
763 }
764
765 continue; // skip any action
766
767 } else if (client->in.prefix.getTag() == DISPTAG_Debug) {
768
769 istringstream is(string(client->in.getRemainingData(), client->in.getRemainingSize()));
770
771 is >> debug;
772
773 } else {
774
775 special = false; // not a reserved tag.
776 }
777 }
778
779 if (!special) {
780
781 clientList.add(JDispatch(client->in.prefix, client->in.data()));
782
783 if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
784
785 WARNING("Memory " << setw(10) << JDispatch::MEMORY_TOTAL << " > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
786
787 clientList.drop();
788 }
789 }
790
791 client->in.reset();
792 }
793
794 if (select.hasWriterMask(*client)) {
795
796 client->out.write();
797
798 DEBUG("Client" << *client << ".write" << static_cast<const JSocketStatus&>(client->out) << endl);
799
800 if (client->out.isReady()) {
801 client->out.reset();
802 client->queue.pop_front();
803 }
804 }
805
806 ++client;
807 }
808 catch(const exception& error) {
809
810 DEBUG("Remove (2) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl);
811
812 if (client->getNickname() != "") {
813 clientList.add(JDispatch(DISPTAG_Died, client->getNickname()));
814 }
815
816 client->shutdown();
817
818 client = clientList.erase(client);
819 }
820 }
821
822 if (select.hasReaderMask(server)) {
823
824 JTCPSocket socket(server.getFileDescriptor());
825
826 socket.setSendBufferSize (buffer_size);
827 socket.setReceiveBufferSize(buffer_size);
828
829 socket.setReuseAddress(true);
830 socket.setKeepAlive (true);
831 socket.setNonBlocking (true);
832
833 DEBUG("New client" << socket << endl);
834
835 clientList.push_back(JClient(socket));
836 }
837 }
838 }
839}
#define DEBUG(A)
Message macros.
Definition JMessage.hh:62
#define ERROR(A)
Definition JMessage.hh:66
#define FATAL(A)
Definition JMessage.hh:67
int debug
debug level
Definition JSirene.cc:72
#define WARNING(A)
Definition JMessage.hh:65
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition JParser.hh:2142
List of ControlHost client managers.
Definition JLigier.cc:479
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
Definition JLigier.cc:494
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
Definition JLigier.cc:505
ControlHost client manager.
Definition JLigier.cc:245
ControlHost class.
Data structure of a ControlHost message.
Definition JLigier.cc:67
ControlHost prefix.
Definition JPrefix.hh:33
Wrapper class for select call.
Definition JSelect.hh:37
TCP server socket.
Auxiliary class for non-blocking socket I/O.
Auxiliary class for non-blocking socket I/O.
TCP socket.
Definition JTCPSocket.hh:30
Utility class to parse command line options.
Definition JParser.hh:1698
static const size_t buffer_size
bool select(const Trk &trk, const Evt &evt)
Event selection.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.