111               const std::string& message) :
 
  117       memcpy(this->
buffer, 
static_cast<const JPrefix_t*
>(
this), 
sizeof(
JPrefix_t));
 
  119       memcpy(this->
buffer + 
sizeof(JPrefix_t), message.data(), message.size());
 
  130       static_cast<JPrefix_t&
>(*this) = message;
 
  378         for (istringstream is(subscription); is >> c >> tag; ) {
 
  438         queue.push_back(message);
 
  452       for (std::deque<JDispatch>::iterator i = 
queue.begin(); i != 
queue.end(); ) {
 
  496       for (
iterator i = this->begin(); i != this->end(); ++i) {
 
  507       for (
iterator i = this->begin(); i != this->end(); ++i) {
 
  523     return out << 
"(" << message.
getTag() << 
"," << message.
size() << 
")";
 
  579 int main(
int argc, 
char* argv[])
 
  604   catch(
const exception &error) {
 
  605     FATAL(error.what() << endl);
 
  614   DEBUG(
"Port         " << setw(10) << port                    << endl);
 
  622     select.setReaderMask(server);
 
  624     for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
 
  626       if (!client->in.isReady()) {
 
  627         select.setReaderMask(*client);
 
  630       if (client->out.isReset()) {
 
  632         if (!client->queue.empty()) {
 
  634           DEBUG(
"Client" << *client << 
".set" << client->queue.front() << endl);
 
  636           client->out.set(client->queue.front());
 
  637           client->decrementRequest();
 
  639           select.setWriterMask(*client);
 
  642       } 
else if (client->out.isBusy()) {
 
  644         select.setWriterMask(*client);
 
  651       nfds = 
select(timeout_us);
 
  653     catch(
const exception& error) {
 
  654       ERROR(
"" << error.what() << endl);
 
  659       for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
 
  663           if (
select.hasReaderMask(*client)) {
 
  668             catch(
const exception& error) {
 
  670               ERROR(
"Remove (3) client" << *client << 
"<" << client->getNickname() << 
">: " << error.what() << endl);
 
  672               if (client->getNickname() != 
"") {
 
  678               client = clientList.erase(client);
 
  686           if (client->in.isReady()) {
 
  688             DEBUG(
"Message" << *client << 
' ' << client->in.prefix.c_str() << 
' ' << client->in.size() << endl);
 
  698                 client->setSubscription(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
 
  700               } 
else if (client->in.prefix.getTag() == 
DISPTAG_MyId) {
 
  702                 client->setNickname(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
 
  706               } 
else if (client->in.prefix.getTag() == 
DISPTAG_Gime) {
 
  708                 client->incrementRequest();
 
  712                 client->setRequestAll();
 
  716                 string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
 
  719                 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
 
  720                   if (i->getNickname() == nick_name) {
 
  721                     buffer += 
" " + i->getHostname();
 
  729                 DEBUG(
"Remove (1) client" << *client << endl);
 
  733                 client = clientList.erase(client);
 
  741                 client = clientList.erase(client);
 
  743                 for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
 
  747                   for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
 
  748                     total += message->size();
 
  751                   cout << 
"client[" << i->getFileDescriptor() << 
"] " << i->getNickname() << endl;
 
  752                   cout << 
"tag - all:";
 
  757                   cout << 
"tag - any:";
 
  762                   cout << 
"queue " << i->queue.size() << 
' ' << total << 
"B" << endl;
 
  769                 istringstream is(
string(client->in.getRemainingData(), client->in.getRemainingSize()));
 
  781               clientList.
add(
JDispatch(client->in.prefix, client->in.data()));
 
  794           if (
select.hasWriterMask(*client)) {
 
  798             DEBUG(
"Client" << *client << 
".write" << 
static_cast<const JSocketStatus&
>(client->out) << endl);
 
  800             if (client->out.isReady()) {
 
  802               client->queue.pop_front();
 
  808         catch(
const exception& error) {
 
  810           DEBUG(
"Remove (2) client" << *client << 
"<" << client->getNickname() << 
">: " << error.what() << endl);
 
  812           if (client->getNickname() != 
"") {
 
  818           client = clientList.erase(client);
 
  822       if (
select.hasReaderMask(server)) {
 
  833         DEBUG(
"New client" << socket << endl);
 
  835         clientList.push_back(
JClient(socket));
 
int main(int argc, char *argv[])
 
Base class for memory management.
 
General purpose messaging.
 
#define DEBUG(A)
Message macros.
 
Utility class to parse command line options.
 
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
 
int getFileDescriptor() const
Get file descriptor.
 
Exception for ControlHost.
 
Exception for failure of memory allocation.
 
Memory management class for create/release policy based on malloc()/free().
 
static void release(T *p)
Release memory.
 
static T * create()
Create object in memory.
 
void initialise()
Initialise counter.
 
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
 
List of ControlHost client managers.
 
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
 
void drop()
Drop all messages from client queues for which the client has not the 'all' subscription.
 
JClientList()
Default constructor.
 
ControlHost client manager.
 
std::deque< JDispatch > queue
queue for outgoing messages
 
void setRequestAll()
Set no request.
 
const std::set< JTag > & getSubscriptionAll() const
Get subscription.
 
const std::set< JTag > & getSubscriptionAny() const
Get subscription.
 
std::set< JTag > subscriptionAll
 
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
 
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
 
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
 
JSocketNonblockingWriter out
writer for outgoing messages
 
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
 
void drop()
Drop all messages for which the client has not the 'all' subscription.
 
bool checkRequest() const
Check request.
 
JClient()
Default constructor.
 
std::set< JTag > subscriptionAny
 
JClient(const JTCPSocket &socket)
Constructor.
 
void setNickname(const std::string &nick_name)
Set nick name.
 
void incrementRequest()
Increment request by one.
 
void decrementRequest()
Decrement request by one.
 
bool setSubscription(const std::string &subscription)
Set subcription.
 
JSocketInputChannel_t in
reader for incoming messages
 
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
 
const std::string & getNickname() const
Get nick name.
 
int PutFullString(const JTag &tag, const std::string &buffer)
Send string.
 
static bool maybe_special(const JTag &tag)
Check special ControlHost tags.
 
Data structure of a ControlHost message.
 
const char * data() const
Get data.
 
JDispatch(const JDispatch &message)
Copy constructor.
 
int size() const
Get size.
 
void create()
Allocate memory.
 
JMalloc< char > JMemory_t
 
JDispatch & operator=(const JDispatch &message)
Assignment operator.
 
void release()
Release memory.
 
static long long int MEMORY_TOTAL
Total size of data [Bytes].
 
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
 
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
 
JDispatch(const JTag &tag, const std::string &message)
Constructor.
 
JDispatch()
Default constructor.
 
int getSize() const
Get size.
 
void setSize(const long long int length)
Set size.
 
Wrapper class for select call.
 
Auxiliary class for non-blocking socket I/O.
 
int getSize() const
Get size of pending data.
 
Non-blocking socket writer.
 
Auxiliary class for non-blocking socket I/O.
 
int getCounter() const
Get number of I/O attempts.
 
bool isReady() const
Check ready status.
 
bool isBusy() const
Check busy status.
 
void setReceiveBufferSize(const int size)
Set receive buffer size.
 
static const int getDefaultBufferSize()
Default socket buffer size to be used on this system.
 
void setReuseAddress(const bool on)
Set reuse address.
 
void setKeepAlive(const bool on)
Set keep alive of socket.
 
void setSendBufferSize(const int size)
Set send buffer size.
 
void setNonBlocking(const bool on)
Set non-blocking of I/O.
 
const JTag & getTag() const
Get tag.
 
Utility class to parse command line options.
 
static const size_t buffer_size
 
bool select(const Trk &trk, const Evt &evt)
Event selection.
 
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
 
static const JTag DISPTAG_Always("_Always")
 
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
 
static const JTag DISPTAG_Gime("_Gime")
 
static const int DISPATCH_PORT
Default ControlHost port.
 
static const JTag DISPTAG_WhereIs("_WhereIs")
 
std::ostream & operator<<(std::ostream &out, const JDispatch &message)
Print message.
 
static const JTag DISPTAG_Debug("_Debug")
 
static const JTag DISPTAG_MyId("_MyId")
 
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
 
static const JTag DISPTAG_Subscribe("_Subscri")
Special ControlHost tags.
 
static const JTag DISPTAG_Born("Born")
 
static const JTag DISPTAG_ShowStat("_ShowSta")
 
static const JTag DISPTAG_Died("Died")
 
JSocketBuffer< const char > JSocketOutputBuffer
 
This name space includes all other name spaces (except KM3NETDAQ, KM3NET and ANTARES).
 
unsigned long long int getRAM()
Get RAM of this CPU.