Go to the documentation of this file.
111 const std::string& message) :
117 memcpy(this->
buffer, static_cast<const JPrefix_t*>(
this),
sizeof(
JPrefix_t));
119 memcpy(this->
buffer +
sizeof(JPrefix_t), message.data(), message.size());
130 static_cast<JPrefix_t&>(*
this) = message;
355 for (istringstream is(subscription); is >> c >> tag; ) {
415 queue.push_back(message);
429 for (std::deque<JDispatch>::iterator i =
queue.begin(); i !=
queue.end(); ) {
473 for (
iterator i = this->begin(); i != this->end(); ++i) {
484 for (
iterator i = this->begin(); i != this->end(); ++i) {
500 return out <<
"(" << message.
getTag() <<
"," << message.
size() <<
")";
556 int main(
int argc,
char* argv[])
581 catch(
const exception &error) {
582 FATAL(error.what() << endl);
586 JServerSocket server(port, backlog);
588 JClientList clientList;
591 DEBUG(
"Port " << setw(10) << port << endl);
599 select.setReaderMask(server);
601 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
603 if (!client->in.isReady()) {
604 select.setReaderMask(*client);
607 if (client->out.isReset()) {
609 if (!client->queue.empty()) {
611 DEBUG(
"Client" << *client <<
".set" << client->queue.front() << endl);
613 client->out.set(client->queue.front());
614 client->decrementRequest();
616 select.setWriterMask(*client);
619 }
else if (client->out.isBusy()) {
621 select.setWriterMask(*client);
625 if (select(timeout_us) > 0) {
627 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
631 if (select.hasReaderMask(*client)) {
636 catch(
const exception& error) {
638 ERROR(
"Remove (3) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
640 if (client->getNickname() !=
"") {
641 clientList.add(JDispatch(
DISPTAG_Died, client->getNickname()));
646 client = clientList.erase(client);
651 DEBUG(
"Client" << *client <<
".read" << static_cast<const JSocketInputBuffer&>(client->in) << endl);
654 if (client->in.isReady()) {
656 DEBUG(
"Message" << *client <<
' ' << client->in.prefix.c_str() <<
' ' << client->in.size() << endl);
666 client->setSubscription(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
668 }
else if (client->in.prefix.getTag() ==
DISPTAG_MyId) {
670 client->setNickname(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
672 clientList.add(JDispatch(
DISPTAG_Born, client->getNickname()));
674 }
else if (client->in.prefix.getTag() ==
DISPTAG_Gime) {
676 client->incrementRequest();
680 client->setRequestAll();
684 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
687 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
688 if (i->getNickname() == nick_name) {
689 buffer +=
" " + i->getHostname();
693 JControlHost socket(*client);
695 socket.PutFullString(
DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
697 DEBUG(
"Remove (1) client" << *client << endl);
701 client = clientList.erase(client);
707 istringstream is(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
719 clientList.add(JDispatch(client->in.prefix, client->in.data()));
732 if (select.hasWriterMask(*client)) {
736 DEBUG(
"Client" << *client <<
".write" << static_cast<const JSocketStatus&>(client->out) << endl);
738 if (client->out.isReady()) {
740 client->queue.pop_front();
746 catch(
const exception& error) {
748 DEBUG(
"Remove (2) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
750 if (client->getNickname() !=
"") {
751 clientList.add(JDispatch(
DISPTAG_Died, client->getNickname()));
756 client = clientList.erase(client);
760 if (select.hasReaderMask(server)) {
764 socket.accept(server.getFileDescriptor());
769 socket.setKeepAlive (
true);
770 socket.setNonBlocking(
true);
772 DEBUG(
"New client" << socket << endl);
774 clientList.push_back(JClient(socket));
Auxiliary class for non-blocking socket I/O.
int getFileDescriptor() const
Get file descriptor.
static const JTag DISPTAG_Debug("_Debug")
void setNickname(const std::string &nick_name)
Set nick name.
static const JTag DISPTAG_Subscribe("_Subscri")
Special ControlHost tags.
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
void incrementRequest()
Increment request by one.
JClient(const JSocket &socket)
Constructor.
int main(int argc, char *argv[])
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
Exception for ControlHost.
static const JTag DISPTAG_WhereIs("_WhereIs")
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
const JTag & getTag() const
Get tag.
ControlHost client manager.
bool isReady() const
Check ready status.
Utility class to parse command line options.
JDispatch & operator=(const JDispatch &message)
Assignment operator.
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
Interprocess communication.
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
const std::string & getNickname() const
Get nick name.
Data structure of a ControlHost message.
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
void release()
Release memory.
List of ControlHost client managers.
std::set< JTag > subscriptionAll
static const JTag DISPTAG_Always("_Always")
void setRequestAll()
Set no request.
static bool maybe_special(const JTag &tag)
Check special ControlHost tags.
int getSize() const
Get size of pending data.
static const int DISPATCH_PORT
Default ControlHost port.
static T * create()
Create object in memory.
Exception for failure of memory allocation.
void decrementRequest()
Decrement request by one.
bool isBusy() const
Check busy status.
const static size_t buffer_size
static long long int MEMORY_TOTAL
Total size of data [Bytes].
JMalloc< char > JMemory_t
void initialise()
Initialise counter.
Auxiliary class for non-blocking socket I/O.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
static const JTag DISPTAG_Died("Died")
bool setSubscription(const std::string &subscription)
Set subcription.
bool checkRequest() const
Check request.
JDispatch()
Default constructor.
const char * data() const
Get data.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
JDispatch(const JTag &tag, const std::string &message)
Constructor.
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
JDispatch(const JDispatch &message)
Copy constructor.
void drop()
Drop all messages for which the client has not the 'all' subscription.
JClient()
Default constructor.
#define DEBUG(A)
Message macros.
int getCounter() const
Get number of I/O attempts.
int size() const
Get size.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
JClientList()
Default constructor.
std::ostream & operator<<(std::ostream &out, const JDispatch &message)
Print message.
std::deque< JDispatch > queue
queue for outgoing messages
JSocketInputChannel_t in
reader for incoming messages
JSocketNonblockingWriter out
writer for outgoing messages
JSocketBuffer< const char > JSocketOutputBuffer
unsigned long long int getRAM()
Get RAM of this CPU.
void setSize(const long long int length)
Set size.
static const JTag DISPTAG_Born("Born")
void create()
Allocate memory.
static void release(T *p)
Release memory.
Memory management class for create/release policy based on malloc()/free().
static const JTag DISPTAG_MyId("_MyId")
int getSize() const
Get size.
Non-blocking socket writer.
std::set< JTag > subscriptionAny
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
static const JTag DISPTAG_Gime("_Gime")