111 const std::string& message) :
117 memcpy(this->
buffer,
static_cast<const JPrefix_t*
>(
this),
sizeof(
JPrefix_t));
119 memcpy(this->
buffer +
sizeof(JPrefix_t), message.data(), message.size());
130 static_cast<JPrefix_t&
>(*this) = message;
378 for (istringstream is(subscription); is >> c >> tag; ) {
438 queue.push_back(message);
452 for (std::deque<JDispatch>::iterator i =
queue.begin(); i !=
queue.end(); ) {
496 for (
iterator i = this->begin(); i != this->end(); ++i) {
507 for (
iterator i = this->begin(); i != this->end(); ++i) {
523 return out <<
"(" << message.
getTag() <<
"," << message.
size() <<
")";
579int main(
int argc,
char* argv[])
598 zap[
'Q'] =
make_field(JClient::QUEUE_LIMIT) = 100;
604 catch(
const exception &error) {
605 FATAL(error.what() << endl);
614 DEBUG(
"Port " << setw(10) << port << endl);
615 DEBUG(
"Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
616 DEBUG(
"Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl);
624 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
626 if (!client->in.isReady()) {
630 if (client->out.isReset()) {
632 if (!client->queue.empty()) {
634 DEBUG(
"Client" << *client <<
".set" << client->queue.front() << endl);
636 client->out.set(client->queue.front());
637 client->decrementRequest();
642 }
else if (client->out.isBusy()) {
651 nfds = select(timeout_us);
653 catch(
const exception& error) {
654 ERROR(
"" << error.what() << endl);
659 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
668 catch(
const exception& error) {
670 ERROR(
"Remove (3) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
672 if (client->getNickname() !=
"") {
673 clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
678 client = clientList.erase(client);
686 if (client->in.isReady()) {
688 DEBUG(
"Message" << *client <<
' ' << client->in.prefix.c_str() <<
' ' << client->in.size() << endl);
690 bool special = JControlHost::maybe_special(client->in.prefix);
696 if (client->in.prefix.getTag() == DISPTAG_Subscribe) {
698 client->setSubscription(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
700 }
else if (client->in.prefix.getTag() == DISPTAG_MyId) {
702 client->setNickname(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
704 clientList.
add(
JDispatch(DISPTAG_Born, client->getNickname()));
706 }
else if (client->in.prefix.getTag() == DISPTAG_Gime) {
708 client->incrementRequest();
710 }
else if (client->in.prefix.getTag() == DISPTAG_Always) {
712 client->setRequestAll();
714 }
else if (client->in.prefix.getTag() == DISPTAG_WhereIs) {
716 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer +=
" " + i->getHostname();
727 socket.
PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
729 DEBUG(
"Remove (1) client" << *client << endl);
733 client = clientList.erase(client);
737 }
else if (client->in.prefix.getTag() == DISPTAG_ShowStat) {
741 client = clientList.erase(client);
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
751 cout <<
"client[" << i->getFileDescriptor() <<
"] " << i->getNickname() << endl;
752 cout <<
"tag - all:";
757 cout <<
"tag - any:";
762 cout <<
"queue " << i->queue.size() <<
' ' << total <<
"B" << endl;
767 }
else if (client->in.prefix.getTag() == DISPTAG_Debug) {
769 istringstream is(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
781 clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
783 if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) {
785 WARNING(
"Memory " << setw(10) << JDispatch::MEMORY_TOTAL <<
" > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
798 DEBUG(
"Client" << *client <<
".write" <<
static_cast<const JSocketStatus&
>(client->out) << endl);
800 if (client->out.isReady()) {
802 client->queue.pop_front();
808 catch(
const exception& error) {
810 DEBUG(
"Remove (2) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
812 if (client->getNickname() !=
"") {
813 clientList.
add(
JDispatch(DISPTAG_Died, client->getNickname()));
818 client = clientList.erase(client);
833 DEBUG(
"New client" << socket << endl);
835 clientList.push_back(
JClient(socket));
int main(int argc, char *argv[])
Base class for memory management.
General purpose messaging.
#define DEBUG(A)
Message macros.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
int getFileDescriptor() const
Get file descriptor.
Exception for ControlHost.
Exception for failure of memory allocation.
Memory management class for create/release policy based on malloc()/free().
static JClass_t * create()
Create object in memory.
static void release(JClass_t *p)
Release memory.
void initialise()
Initialise counter.
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
List of ControlHost client managers.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
JClientList()
Default constructor.
ControlHost client manager.
std::deque< JDispatch > queue
queue for outgoing messages
void setRequestAll()
Set no request.
const std::set< JTag > & getSubscriptionAny() const
Get subscription.
const std::string & getNickname() const
Get nick name.
std::set< JTag > subscriptionAll
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
JSocketNonblockingWriter out
writer for outgoing messages
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages for which the client has not the 'all' subscription.
bool checkRequest() const
Check request.
JClient()
Default constructor.
std::set< JTag > subscriptionAny
JClient(const JTCPSocket &socket)
Constructor.
void setNickname(const std::string &nick_name)
Set nick name.
void incrementRequest()
Increment request by one.
const std::set< JTag > & getSubscriptionAll() const
Get subscription.
void decrementRequest()
Decrement request by one.
bool setSubscription(const std::string &subscription)
Set subcription.
JSocketInputChannel_t in
reader for incoming messages
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
int PutFullString(const JTag &tag, const std::string &buffer)
Send string.
Data structure of a ControlHost message.
JDispatch(const JDispatch &message)
Copy constructor.
int size() const
Get size.
JMalloc< char > JMemory_t
void create()
Allocate memory.
void release()
Release memory.
static long long int MEMORY_TOTAL
Total size of data [Bytes].
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
JDispatch(const JTag &tag, const std::string &message)
Constructor.
JDispatch()
Default constructor.
JDispatch & operator=(const JDispatch &message)
Assignment operator.
const char * data() const
Get data.
int getSize() const
Get size.
void setSize(const long long int length)
Set size.
void setReaderMask(const JAbstractFile &file)
Set reader mask.
bool hasReaderMask(const JAbstractFile &file) const
Has reader file.
void setWriterMask(const JAbstractFile &file)
Set writer mask.
bool hasWriterMask(const JAbstractFile &file) const
Has writer file.
Wrapper class for select call.
Auxiliary class for non-blocking socket I/O.
int getSize() const
Get size of pending data.
Non-blocking socket writer.
Auxiliary class for non-blocking socket I/O.
int getCounter() const
Get number of I/O attempts.
bool isReady() const
Check ready status.
bool isBusy() const
Check busy status.
void setReceiveBufferSize(const int size)
Set receive buffer size.
void setReuseAddress(const bool on)
Set reuse address.
void setKeepAlive(const bool on)
Set keep alive of socket.
void setSendBufferSize(const int size)
Set send buffer size.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
const JTag & getTag() const
Get tag.
Utility class to parse command line options.
static const size_t buffer_size
std::ostream & operator<<(std::ostream &out, const morphology_type &object)
Write morphology to output stream.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
JSocketBuffer< const char > JSocketOutputBuffer
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.