111 const std::string& message) :
117 memcpy(this->
buffer,
static_cast<const JPrefix_t*
>(
this),
sizeof(
JPrefix_t));
119 memcpy(this->
buffer +
sizeof(JPrefix_t), message.data(), message.size());
130 static_cast<JPrefix_t&
>(*this) = message;
378 for (istringstream is(subscription); is >> c >> tag; ) {
438 queue.push_back(message);
452 for (std::deque<JDispatch>::iterator i =
queue.begin(); i !=
queue.end(); ) {
496 for (
iterator i = this->begin(); i != this->end(); ++i) {
507 for (
iterator i = this->begin(); i != this->end(); ++i) {
523 return out <<
"(" << message.
getTag() <<
"," << message.
size() <<
")";
579 int main(
int argc,
char* argv[])
604 catch(
const exception &error) {
605 FATAL(error.what() << endl);
614 DEBUG(
"Port " << setw(10) << port << endl);
622 select.setReaderMask(server);
624 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
626 if (!client->in.isReady()) {
627 select.setReaderMask(*client);
630 if (client->out.isReset()) {
632 if (!client->queue.empty()) {
634 DEBUG(
"Client" << *client <<
".set" << client->queue.front() << endl);
636 client->out.set(client->queue.front());
637 client->decrementRequest();
639 select.setWriterMask(*client);
642 }
else if (client->out.isBusy()) {
644 select.setWriterMask(*client);
651 nfds =
select(timeout_us);
653 catch(
const exception& error) {
654 ERROR(
"" << error.what() << endl);
659 for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
663 if (
select.hasReaderMask(*client)) {
668 catch(
const exception& error) {
670 ERROR(
"Remove (3) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
672 if (client->getNickname() !=
"") {
678 client = clientList.erase(client);
686 if (client->in.isReady()) {
688 DEBUG(
"Message" << *client <<
' ' << client->in.prefix.c_str() <<
' ' << client->in.size() << endl);
698 client->setSubscription(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
700 }
else if (client->in.prefix.getTag() ==
DISPTAG_MyId) {
702 client->setNickname(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
706 }
else if (client->in.prefix.getTag() ==
DISPTAG_Gime) {
708 client->incrementRequest();
712 client->setRequestAll();
716 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
719 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
720 if (i->getNickname() == nick_name) {
721 buffer +=
" " + i->getHostname();
729 DEBUG(
"Remove (1) client" << *client << endl);
733 client = clientList.erase(client);
741 client = clientList.erase(client);
743 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
747 for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
748 total += message->size();
751 cout <<
"client[" << i->getFileDescriptor() <<
"] " << i->getNickname() << endl;
752 cout <<
"tag - all:";
757 cout <<
"tag - any:";
762 cout <<
"queue " << i->queue.size() <<
' ' << total <<
"B" << endl;
769 istringstream is(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
781 clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
794 if (
select.hasWriterMask(*client)) {
798 DEBUG(
"Client" << *client <<
".write" <<
static_cast<const JSocketStatus&
>(client->out) << endl);
800 if (client->out.isReady()) {
802 client->queue.pop_front();
808 catch(
const exception& error) {
810 DEBUG(
"Remove (2) client" << *client <<
"<" << client->getNickname() <<
">: " << error.what() << endl);
812 if (client->getNickname() !=
"") {
818 client = clientList.erase(client);
822 if (
select.hasReaderMask(server)) {
833 DEBUG(
"New client" << socket << endl);
835 clientList.push_back(
JClient(socket));
int main(int argc, char *argv[])
Base class for memory management.
General purpose messaging.
#define DEBUG(A)
Message macros.
Utility class to parse command line options.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
int getFileDescriptor() const
Get file descriptor.
Exception for ControlHost.
Exception for failure of memory allocation.
Memory management class for create/release policy based on malloc()/free().
static void release(T *p)
Release memory.
static T * create()
Create object in memory.
void initialise()
Initialise counter.
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
List of ControlHost client managers.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
JClientList()
Default constructor.
ControlHost client manager.
std::deque< JDispatch > queue
queue for outgoing messages
void setRequestAll()
Set no request.
const std::set< JTag > & getSubscriptionAll() const
Get subscription.
const std::set< JTag > & getSubscriptionAny() const
Get subscription.
std::set< JTag > subscriptionAll
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
JSocketNonblockingWriter out
writer for outgoing messages
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
void drop()
Drop all messages for which the client has not the 'all' subscription.
bool checkRequest() const
Check request.
JClient()
Default constructor.
std::set< JTag > subscriptionAny
JClient(const JTCPSocket &socket)
Constructor.
void setNickname(const std::string &nick_name)
Set nick name.
void incrementRequest()
Increment request by one.
void decrementRequest()
Decrement request by one.
bool setSubscription(const std::string &subscription)
Set subcription.
JSocketInputChannel_t in
reader for incoming messages
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
const std::string & getNickname() const
Get nick name.
int PutFullString(const JTag &tag, const std::string &buffer)
Send string.
static bool maybe_special(const JTag &tag)
Check special ControlHost tags.
Data structure of a ControlHost message.
const char * data() const
Get data.
JDispatch(const JDispatch &message)
Copy constructor.
int size() const
Get size.
void create()
Allocate memory.
JMalloc< char > JMemory_t
JDispatch & operator=(const JDispatch &message)
Assignment operator.
void release()
Release memory.
static long long int MEMORY_TOTAL
Total size of data [Bytes].
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
JDispatch(const JTag &tag, const std::string &message)
Constructor.
JDispatch()
Default constructor.
int getSize() const
Get size.
void setSize(const long long int length)
Set size.
Wrapper class for select call.
Auxiliary class for non-blocking socket I/O.
int getSize() const
Get size of pending data.
Non-blocking socket writer.
Auxiliary class for non-blocking socket I/O.
int getCounter() const
Get number of I/O attempts.
bool isReady() const
Check ready status.
bool isBusy() const
Check busy status.
void setReceiveBufferSize(const int size)
Set receive buffer size.
static const int getDefaultBufferSize()
Default socket buffer size to be used on this system.
void setReuseAddress(const bool on)
Set reuse address.
void setKeepAlive(const bool on)
Set keep alive of socket.
void setSendBufferSize(const int size)
Set send buffer size.
void setNonBlocking(const bool on)
Set non-blocking of I/O.
const JTag & getTag() const
Get tag.
Utility class to parse command line options.
static const size_t buffer_size
bool select(const Trk &trk, const Evt &evt)
Event selection.
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
static const JTag DISPTAG_Always("_Always")
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
static const JTag DISPTAG_Gime("_Gime")
static const int DISPATCH_PORT
Default ControlHost port.
static const JTag DISPTAG_WhereIs("_WhereIs")
std::ostream & operator<<(std::ostream &out, const JDispatch &message)
Print message.
static const JTag DISPTAG_Debug("_Debug")
static const JTag DISPTAG_MyId("_MyId")
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
static const JTag DISPTAG_Subscribe("_Subscri")
Special ControlHost tags.
static const JTag DISPTAG_Born("Born")
static const JTag DISPTAG_ShowStat("_ShowSta")
static const JTag DISPTAG_Died("Died")
JSocketBuffer< const char > JSocketOutputBuffer
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
unsigned long long int getRAM()
Get RAM of this CPU.