Jpp  18.0.0-rc.3
the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JLigier.cc
Go to the documentation of this file.
1 #include <iostream>
2 #include <iomanip>
3 #include <string>
4 #include <sstream>
5 #include <deque>
6 #include <set>
7 
8 #include "Jeep/JParser.hh"
9 #include "Jeep/JMessage.hh"
10 #include "JLang/JMemory.hh"
11 #include "JLang/JSharedCounter.hh"
12 #include "JNet/JTCPSocket.hh"
13 #include "JNet/JSocketChannel.hh"
14 #include "JNet/JServerSocket.hh"
15 #include "JNet/JSelect.hh"
16 #include "JNet/JControlHost.hh"
18 #include "JMath/JConstants.hh"
19 
20 
21 namespace JNET {
22 
23 
24  using namespace JPP;
25 
26 
27  template<class T> class JMemory_t : public JMalloc<T> {}; // Memory manager
28 
29  typedef JPrefix JPrefix_t;
31 
32 
33  /**
34  * Get total size of internet packet.
35  *
36  * \param prefix prefix
37  * \return number of bytes
38  */
39  inline int getSizeOfPacket(const JPrefix_t& prefix)
40  {
41  return prefix.getSize() + sizeof(JPrefix_t);
42  }
43 
44 
45  /**
46  * Set total size of internet packet.
47  *
48  * \param size number of bytes
49  * \param prefix prefix
50  */
51  inline void setSizeOfPacket(const int size, JPrefix_t& prefix)
52  {
53  prefix.setSize(size - sizeof(JPrefix_t));
54  }
55 
56 
57  /**
58  * Data structure of a ControlHost message.
59  * This data structure consists of a copy of the ControlHost prefix and
60  * an array of bytes (including the ControlHost prefix).
61  * A new array of bytes is created by the appropriate constructor.
62  * The allocated memory is released upon destruction of the last object of this class.
63  */
64  class JDispatch :
65  public JPrefix_t,
66  public JSharedCounter
67  {
68  public:
69 
71 
72  static long long int MEMORY_TOTAL; //!< Total size of data [Bytes]
73  static long long int MEMORY_LIMIT; //!< Limit size of data [Bytes]
74 
75  /**
76  * Default constructor.
77  */
79  JPrefix_t(),
81  buffer(NULL)
82  {}
83 
84 
85  /**
86  * Constructor.
87  * Note that the input data should contain a copy of the prefix.
88  *
89  * \param prefix prefix
90  * \param data data
91  */
92  JDispatch(const JPrefix_t& prefix,
93  const char* data) :
94  JPrefix_t(prefix),
96  {
97  create();
98 
99  memcpy(this->buffer, data, size());
100  }
101 
102 
103  /**
104  * Constructor.
105  * Note that the given message is appended to a copy of the prefix.
106  *
107  * \param tag tag
108  * \param message message
109  */
110  JDispatch(const JTag& tag,
111  const std::string& message) :
112  JPrefix_t(tag, message.size()),
114  {
115  create();
116 
117  memcpy(this->buffer, static_cast<const JPrefix_t*>(this), sizeof(JPrefix_t));
118 
119  memcpy(this->buffer + sizeof(JPrefix_t), message.data(), message.size());
120  }
121 
122 
123  /**
124  * Copy constructor.
125  *
126  * \param message message
127  */
128  JDispatch(const JDispatch& message)
129  {
130  static_cast<JPrefix_t&>(*this) = message;
131 
132  buffer = message.buffer;
133 
134  attach(message);
135  }
136 
137 
138  /**
139  * Destructor.
140  */
142  {
143  if (detach()) {
144  release();
145  }
146  }
147 
148 
149  /**
150  * Assignment operator.
151  *
152  * \param message message
153  * \return this JDispatch
154  */
155  JDispatch& operator=(const JDispatch& message)
156  {
157  if (buffer != message.buffer) {
158 
159  if (detach()) {
160  release();
161  }
162 
163  buffer = message.buffer;
164 
165  attach(message);
166  }
167 
168  return *this;
169  }
170 
171 
172  /**
173  * Type conversion operator.
174  *
175  * \return socket output buffer
176  */
177  operator JSocketOutputBuffer () const
178  {
179  return JSocketOutputBuffer(this->data(), this->size());
180  }
181 
182 
183  /**
184  * Get size.
185  *
186  * \return number of bytes
187  */
188  int size() const
189  {
190  return getSizeOfPacket(static_cast<const JPrefix_t&>(*this));
191  }
192 
193 
194  /**
195  * Get data.
196  *
197  * \return pointer to data
198  */
199  const char* data() const
200  {
201  return buffer;
202  }
203 
204 
205  protected:
206  /**
207  * Allocate memory.
208  */
209  void create()
210  {
212 
213  if (buffer != NULL) {
214 
216 
217  MEMORY_TOTAL += size();
218 
219  } else {
220 
221  throw JMallocException("Not enough space in memory.");
222  }
223  }
224 
225 
226  /**
227  * Release memory.
228  */
229  void release()
230  {
232 
233  MEMORY_TOTAL -= size();
234  }
235 
236  char* buffer;
237  };
238 
239 
240  /**
241  * ControlHost client manager.
242  */
243  class JClient :
244  public JTCPSocket
245  {
246  public:
247 
248 
249  static unsigned int QUEUE_LIMIT; //!< Maximum number of messages in queue
250 
251 
252  /**
253  * Default constructor.
254  */
256  JTCPSocket(),
257  in (*this),
258  out(*this),
259  requestAll(false),
260  requestCounter(0)
261  {}
262 
263 
264  /**
265  * Constructor.
266  *
267  * \param socket socket
268  */
269  JClient(const JTCPSocket& socket) :
270  JTCPSocket(socket),
271  in (*this),
272  out(*this),
273  requestAll(false),
274  requestCounter(0)
275  {}
276 
277 
278  /**
279  * Get nick name.
280  *
281  * \return nick name
282  */
283  const std::string& getNickname() const
284  {
285  return nick_name;
286  }
287 
288 
289  /**
290  * Set nick name.
291  *
292  * \param nick_name nick name
293  */
295  {
296  this->nick_name = nick_name;
297  }
298 
299 
300  /**
301  * Check request.
302  *
303  * \return true if request can be honoured; else false
304  */
305  bool checkRequest() const
306  {
307  return requestAll || requestCounter != 0;
308  }
309 
310 
311  /**
312  * Increment request by one.
313  */
315  {
316  ++requestCounter;
317  }
318 
319 
320  /**
321  * Decrement request by one.
322  */
324  {
325  --requestCounter;
326  }
327 
328 
329  /**
330  * Set no request.
331  */
333  {
334  requestAll = true;
335  }
336 
337 
338  /**
339  * Get subscription.
340  *
341  * \return subscription
342  */
344  {
345  return subscriptionAll;
346  }
347 
348 
349  /**
350  * Get subscription.
351  *
352  * \return subscription
353  */
355  {
356  return subscriptionAny;
357  }
358 
359 
360  /**
361  * Set subcription.
362  *
363  * \param subscription subscription
364  * \return true of OK; else false
365  */
366  bool setSubscription(const std::string& subscription)
367  {
368  using namespace std;
369 
370  subscriptionAll.clear();
371  subscriptionAny.clear();
372 
373  try {
374 
375  char c;
376  JTag tag;
377 
378  for (istringstream is(subscription); is >> c >> tag; ) {
379  if (c == SUBSCRIBE_ALL) subscriptionAll.insert(tag);
380  else if (c == SUBSCRIBE_ANY) subscriptionAny.insert(tag);
381  //else if (c == SUBSCRIBE_SHARED_MEMORY) subscriptionAny.insert(tag);
382  }
383  }
384  catch(const JControlHostException& error) {
385  return false;
386  }
387 
388  return true;
389  }
390 
391 
392  /**
393  * Check subscription for given prefix.
394  *
395  * \param prefix prefix
396  * \return true if subscription valid; else false
397  */
398  bool checkSubscriptionAll(const JPrefix_t& prefix) const
399  {
400  return subscriptionAll.find(prefix) != subscriptionAll.end();
401  }
402 
403 
404  /**
405  * Check subscription for given prefix.
406  *
407  * \param prefix prefix
408  * \return true if subscription valid; else false
409  */
410  bool checkSubscriptionAny(const JPrefix_t& prefix) const
411  {
412  return subscriptionAny.find(prefix) != subscriptionAny.end() && checkRequest() && queue.size() < QUEUE_LIMIT;
413  }
414 
415 
416  /**
417  * Check subscription for given prefix.
418  *
419  * \param prefix prefix
420  * \return true if subscription valid; else false
421  */
422  bool checkSubscription(const JPrefix_t& prefix) const
423  {
424  return checkSubscriptionAll(prefix) || checkSubscriptionAny(prefix);
425  }
426 
427 
428  /**
429  * Add message to client queues depending on subscription of each client.
430  * Note that adding a message may result in dropping (other) messages.
431  *
432  * \param message message
433  */
434  void add(const JDispatch& message)
435  {
436  if (checkSubscription(message)) {
437 
438  queue.push_back(message);
439 
440  if (queue.size() > QUEUE_LIMIT) {
441  drop();
442  }
443  }
444  }
445 
446 
447  /**
448  * Drop all messages for which the client has not the 'all' subscription.
449  */
450  void drop()
451  {
452  for (std::deque<JDispatch>::iterator i = queue.begin(); i != queue.end(); ) {
453  if (!checkSubscriptionAll(*i) && (i != queue.begin() || !out.isBusy()))
454  i = queue.erase(i);
455  else
456  ++i;
457  }
458  }
459 
460 
461  JSocketInputChannel_t in; //!< reader for incoming messages
462  JSocketNonblockingWriter out; //!< writer for outgoing messages
463  std::deque<JDispatch> queue; //!< queue for outgoing messages
464 
465  protected:
471  };
472 
473 
474  /**
475  * List of ControlHost client managers.
476  */
477  class JClientList :
478  public std::vector<JClient>
479  {
480  public:
481  /**
482  * Default constructor.
483  */
485  std::vector<JClient>()
486  {}
487 
488 
489  /**
490  * Add message to client queues depending on subscription of each client.
491  *
492  * \param message message
493  */
494  void add(const JDispatch& message)
495  {
496  for (iterator i = this->begin(); i != this->end(); ++i) {
497  i->add(message);
498  }
499  }
500 
501 
502  /**
503  * Drop all messages from client queues for which the client has not the 'all' subscription.
504  */
505  void drop()
506  {
507  for (iterator i = this->begin(); i != this->end(); ++i) {
508  i->drop();
509  }
510  }
511  };
512 
513 
514  /**
515  * Print message.
516  *
517  * \param out output stream
518  * \param message message
519  * \return output stream
520  */
521  inline std::ostream& operator<<(std::ostream& out, const JDispatch& message)
522  {
523  return out << "(" << message.getTag() << "," << message.size() << ")";
524  }
525 
526 
527  /**
528  * Print socket.
529  *
530  * \param out output stream
531  * \param socket socket
532  * \return output stream
533  */
534  inline std::ostream& operator<<(std::ostream& out, const JSocket& socket)
535  {
536  return out << "[" << socket.getFileDescriptor() << "]";
537  }
538 
539 
540  /**
541  * Print socket status.
542  *
543  * \param out output stream
544  * \param status socket status
545  * \return output stream
546  */
547  inline std::ostream& operator<<(std::ostream& out, const JSocketStatus& status)
548  {
549  return out << "(" << status.isReady() << "," << status.getCounter() << ")";
550  }
551 
552 
553  /**
554  * Print socket input buffer.
555  *
556  * \param out output stream
557  * \param buffer socket buffer
558  * \return output stream
559  */
560  inline std::ostream& operator<<(std::ostream& out, const JSocketInputBuffer& buffer)
561  {
562  return out << "(" << buffer.isReady() << "," << buffer.getCounter() << "," << buffer.getSize() << ")";
563  }
564 
565 
566  long long int JDispatch::MEMORY_TOTAL = 0; //!< Total memory allocation.
567  long long int JDispatch::MEMORY_LIMIT; //!< Limit memory allocation.
568 
569  unsigned int JClient::QUEUE_LIMIT; //!< queue size limit
570 }
571 
572 
573 /**
574  * \file
575  *
576  * Quicksilver Messenger Service - a ControlHost server.
577  * \author mdejong
578  */
579 int main(int argc, char* argv[])
580 {
581  using namespace std;
582  using namespace JPP;
583 
584  int port;
585  int backlog;
586  int timeout_us;
587  int buffer_size;
588  int debug;
589 
590  try {
591 
592  JParser<> zap("ControlHost server.");
593 
594  zap['P'] = make_field(port) = DISPATCH_PORT;
595  zap['q'] = make_field(backlog) = 1024;
596  zap['T'] = make_field(timeout_us) = 1;
597  zap['s'] = make_field(buffer_size) = GIGABYTE;
598  zap['Q'] = make_field(JClient::QUEUE_LIMIT) = 100;
599  zap['M'] = make_field(JDispatch::MEMORY_LIMIT) = (JSYSTEM::getRAM() >> 1);
600  zap['d'] = make_field(debug) = 0;
601 
602  zap(argc, argv);
603  }
604  catch(const exception &error) {
605  FATAL(error.what() << endl);
606  }
607 
608 
609  JServerSocket server(port, backlog);
610  JSelect select;
611  JClientList clientList;
612 
613 
614  DEBUG("Port " << setw(10) << port << endl);
615  DEBUG("Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
616  DEBUG("Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl);
617 
618 
619  for ( ; ; ) {
620 
621  select.reset();
622  select.setReaderMask(server);
623 
624  for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) {
625 
626  if (!client->in.isReady()) {
627  select.setReaderMask(*client);
628  }
629 
630  if (client->out.isReset()) {
631 
632  if (!client->queue.empty()) {
633 
634  DEBUG("Client" << *client << ".set" << client->queue.front() << endl);
635 
636  client->out.set(client->queue.front());
637  client->decrementRequest();
638 
639  select.setWriterMask(*client);
640  }
641 
642  } else if (client->out.isBusy()) {
643 
644  select.setWriterMask(*client);
645  }
646  }
647 
648  if (select(timeout_us) > 0) {
649 
650  for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) {
651 
652  try {
653 
654  if (select.hasReaderMask(*client)) {
655 
656  try {
657  client->in.read();
658  }
659  catch(const exception& error) {
660 
661  ERROR("Remove (3) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl);
662 
663  if (client->getNickname() != "") {
664  clientList.add(JDispatch(DISPTAG_Died, client->getNickname()));
665  }
666 
667  client->shutdown();
668 
669  client = clientList.erase(client);
670 
671  continue;
672  }
673 
674  DEBUG("Client" << *client << ".read" << static_cast<const JSocketInputBuffer&>(client->in) << endl);
675  }
676 
677  if (client->in.isReady()) {
678 
679  DEBUG("Message" << *client << ' ' << client->in.prefix.c_str() << ' ' << client->in.size() << endl);
680 
681  bool special = JControlHost::maybe_special(client->in.prefix);
682 
683  if (special) {
684 
685  client->in.seekg(sizeof(JPrefix_t)); // skip prefix
686 
687  if (client->in.prefix.getTag() == DISPTAG_Subscribe) {
688 
689  client->setSubscription(string(client->in.getRemainingData(), client->in.getRemainingSize()));
690 
691  } else if (client->in.prefix.getTag() == DISPTAG_MyId) {
692 
693  client->setNickname(string(client->in.getRemainingData(), client->in.getRemainingSize()));
694 
695  clientList.add(JDispatch(DISPTAG_Born, client->getNickname()));
696 
697  } else if (client->in.prefix.getTag() == DISPTAG_Gime) {
698 
699  client->incrementRequest();
700 
701  } else if (client->in.prefix.getTag() == DISPTAG_Always) {
702 
703  client->setRequestAll();
704 
705  } else if (client->in.prefix.getTag() == DISPTAG_WhereIs) {
706 
707  string nick_name(client->in.getRemainingData(), client->in.getRemainingSize());
708  string buffer;
709 
710  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
711  if (i->getNickname() == nick_name) {
712  buffer += " " + i->getHostname();
713  }
714  }
715 
716  JControlHost socket(*client);
717 
718  socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1));
719 
720  DEBUG("Remove (1) client" << *client << endl);
721 
722  client->shutdown();
723 
724  client = clientList.erase(client);
725 
726  continue; // skip any action
727 
728  } else if (client->in.prefix.getTag() == DISPTAG_ShowStat) {
729 
730  client->shutdown();
731 
732  client = clientList.erase(client);
733 
734  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {
735 
736  int total = 0;
737 
738  for (std::deque<JDispatch>::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) {
739  total += message->size();
740  }
741 
742  cout << "client[" << i->getFileDescriptor() << "] " << i->getNickname() << endl;
743  cout << "tag - all:";
744  for (std::set<JTag>::const_iterator tag = i->getSubscriptionAll().begin(); tag != i->getSubscriptionAll().end(); ++tag) {
745  cout << ' ' << *tag;
746  }
747  cout << endl;
748  cout << "tag - any:";
749  for (std::set<JTag>::const_iterator tag = i->getSubscriptionAny().begin(); tag != i->getSubscriptionAny().end(); ++tag) {
750  cout << ' ' << *tag;
751  }
752  cout << endl;
753  cout << "queue " << i->queue.size() << ' ' << total << "B" << endl;
754  }
755 
756  continue; // skip any action
757 
758  } else if (client->in.prefix.getTag() == DISPTAG_Debug) {
759 
760  istringstream is(string(client->in.getRemainingData(), client->in.getRemainingSize()));
761 
762  is >> debug;
763 
764  } else {
765 
766  special = false; // not a reserved tag.
767  }
768  }
769 
770  if (!special) {
771 
772  clientList.add(JDispatch(client->in.prefix, client->in.data()));
773 
775 
776  WARNING("Memory " << setw(10) << JDispatch::MEMORY_TOTAL << " > " << setw(10) << JDispatch::MEMORY_LIMIT << endl);
777 
778  clientList.drop();
779  }
780  }
781 
782  client->in.reset();
783  }
784 
785  if (select.hasWriterMask(*client)) {
786 
787  client->out.write();
788 
789  DEBUG("Client" << *client << ".write" << static_cast<const JSocketStatus&>(client->out) << endl);
790 
791  if (client->out.isReady()) {
792  client->out.reset();
793  client->queue.pop_front();
794  }
795  }
796 
797  ++client;
798  }
799  catch(const exception& error) {
800 
801  DEBUG("Remove (2) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl);
802 
803  if (client->getNickname() != "") {
804  clientList.add(JDispatch(DISPTAG_Died, client->getNickname()));
805  }
806 
807  client->shutdown();
808 
809  client = clientList.erase(client);
810  }
811  }
812 
813  if (select.hasReaderMask(server)) {
814 
815  JTCPSocket socket;
816 
817  socket.accept(server.getFileDescriptor());
818 
819  socket.setSendBufferSize (buffer_size);
820  socket.setReceiveBufferSize(buffer_size);
821 
822  socket.setKeepAlive (true);
823  socket.setNonBlocking(true);
824 
825  DEBUG("New client" << socket << endl);
826 
827  clientList.push_back(JClient(socket));
828  }
829  }
830  }
831 }
JSocketNonblockingWriter out
writer for outgoing messages
Definition: JLigier.cc:462
ControlHost prefix.
Definition: JPrefix.hh:31
bool setSubscription(const std::string &subscription)
Set subcription.
Definition: JLigier.cc:366
Utility class to parse command line options.
Definition: JParser.hh:1514
static const JTag DISPTAG_Subscribe("_Subscri")
Special ControlHost tags.
std::set< JTag > subscriptionAll
Definition: JLigier.cc:466
int getSizeOfPacket(const KM3NETDAQ::JDAQAbstractPreamble &preamble)
Get size of packeet.
Definition: JDataFilter.cc:79
static const int DISPATCH_PORT
Default ControlHost port.
Definition: JHostname.hh:24
JDispatch & operator=(const JDispatch &message)
Assignment operator.
Definition: JLigier.cc:155
static const JTag DISPTAG_Died("Died")
Shared counter.
int main(int argc, char *argv[])
Definition: Main.cc:15
Data structure of a ControlHost message.
Definition: JLigier.cc:64
void setSize(const long long int length)
Set size.
Definition: JPrefix.hh:73
JSocketBuffer< const char > JSocketOutputBuffer
void setRequestAll()
Set no request.
Definition: JLigier.cc:332
ControlHost client manager.
Definition: JLigier.cc:243
std::deque< JDispatch > queue
queue for outgoing messages
Definition: JLigier.cc:463
void release()
Release memory.
Definition: JLigier.cc:229
Auxiliary class for non-blocking socket I/O.
List of ControlHost client managers.
Definition: JLigier.cc:477
std::set< JTag > subscriptionAny
Definition: JLigier.cc:467
static const JTag DISPTAG_MyId("_MyId")
void attach(const JSharedCounter &object)
Attach this counter to given shared counter object.
bool requestAll
Definition: JLigier.cc:469
void setNickname(const std::string &nick_name)
Set nick name.
Definition: JLigier.cc:294
static const JTag DISPTAG_WhereIs("_WhereIs")
then usage $script< detector file >< detectorfile > nIf the range of floors is the first detector file is aligned to the second before the comparison nIn this
bool checkRequest() const
Check request.
Definition: JLigier.cc:305
bool checkSubscription(const JPrefix_t &prefix) const
Check subscription for given prefix.
Definition: JLigier.cc:422
Socket class.
Definition: JSocket.hh:36
JClient()
Default constructor.
Definition: JLigier.cc:255
const std::string & getNickname() const
Get nick name.
Definition: JLigier.cc:283
void decrementRequest()
Decrement request by one.
Definition: JLigier.cc:323
bool checkSubscriptionAll(const JPrefix_t &prefix) const
Check subscription for given prefix.
Definition: JLigier.cc:398
is
Definition: JDAQCHSM.chsm:167
static long long int MEMORY_LIMIT
Limit size of data [Bytes].
Definition: JLigier.cc:73
int getSize() const
Get size.
Definition: JPrefix.hh:62
JClientList()
Default constructor.
Definition: JLigier.cc:484
JPrefix JPrefix_t
Definition: JLigier.cc:29
static void release(T *p)
Release memory.
Definition: JMemory.hh:112
bool checkSubscriptionAny(const JPrefix_t &prefix) const
Check subscription for given prefix.
Definition: JLigier.cc:410
int requestCounter
Definition: JLigier.cc:470
int getFileDescriptor() const
Get file descriptor.
JSocketInputChannel_t in
reader for incoming messages
Definition: JLigier.cc:461
int getCounter() const
Get number of I/O attempts.
std::ostream & operator<<(std::ostream &out, const JDispatch &message)
Print message.
Definition: JLigier.cc:521
Non-blocking socket writer.
const std::set< JTag > & getSubscriptionAll() const
Get subscription.
Definition: JLigier.cc:343
then echo Test string reversed by client(hit< return > to continue)." $DIR/JProcess -c "$DIR/JEcho-r" -C fi if (( 1 ))
void incrementRequest()
Increment request by one.
Definition: JLigier.cc:314
Mathematical constants.
#define make_field(A,...)
macro to convert parameter to JParserTemplateElement object
Definition: JParser.hh:1989
bool isReady() const
Check ready status.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
Definition: JLigier.cc:494
Memory management class for create/release policy based on malloc()/free().
Definition: JMemory.hh:82
Exception for failure of memory allocation.
Definition: JException.hh:288
static const size_t buffer_size
JDispatch()
Default constructor.
Definition: JLigier.cc:78
#define ERROR(A)
Definition: JMessage.hh:66
then awk string
void drop()
Drop all messages for which the client has not the &#39;all&#39; subscription.
Definition: JLigier.cc:450
JDispatch(const JPrefix_t &prefix, const char *data)
Constructor.
Definition: JLigier.cc:92
JDispatch(const JDispatch &message)
Copy constructor.
Definition: JLigier.cc:128
void create()
Allocate memory.
Definition: JLigier.cc:209
void drop()
Drop all messages from client queues for which the client has not the &#39;all&#39; subscription.
Definition: JLigier.cc:505
Exception for ControlHost.
Definition: JException.hh:468
const std::set< JTag > & getSubscriptionAny() const
Get subscription.
Definition: JLigier.cc:354
Socket input channel.
const JTag & getTag() const
Get tag.
Definition: JTag.hh:86
static bool maybe_special(const JTag &tag)
Check special ControlHost tags.
General purpose messaging.
void initialise()
Initialise counter.
#define FATAL(A)
Definition: JMessage.hh:67
static const JTag DISPTAG_Born("Born")
TCP socket.
Definition: JTCPSocket.hh:25
$WORKDIR ev_configure_dqsimulator txt echo process $DQ_SIMULATOR $i $SOURCE_HOST[$index] csh c(setenv ROOTSYS $ROOTSYS &&source $JPP_DIR/setenv.csh $JPP_DIR &&($DQ_SIMULATOR\-u\$NAME\$\-H\$SERVER\$\-M\$LOGGER\$\-d $DEBUG</dev/null > &/dev/null &))'
static long long int MEMORY_TOTAL
Total size of data [Bytes].
Definition: JLigier.cc:72
JMalloc< char > JMemory_t
Definition: JLigier.cc:70
static unsigned int QUEUE_LIMIT
Maximum number of messages in queue.
Definition: JLigier.cc:249
static const long long int GIGABYTE
Number of bytes in a mega-byte.
const char * data() const
Get data.
Definition: JLigier.cc:199
Utility class to parse command line options.
void add(const JDispatch &message)
Add message to client queues depending on subscription of each client.
Definition: JLigier.cc:434
unsigned long long int getRAM()
Get RAM of this CPU.
static const JTag DISPTAG_Gime("_Gime")
int getSize() const
Get size of pending data.
System auxiliaries.
~JDispatch()
Destructor.
Definition: JLigier.cc:141
char * buffer
Definition: JLigier.cc:236
JSocketInputChannel< JPrefix_t > JSocketInputChannel_t
Definition: JLigier.cc:30
void setSizeOfPacket(const int size, JPrefix_t &prefix)
Set total size of internet packet.
Definition: JLigier.cc:51
bool detach()
Detach.
ControlHost tag.
Definition: JTag.hh:38
static const JTag DISPTAG_Debug("_Debug")
then echo WARNING
Definition: JTuneHV.sh:91
static const JTag DISPTAG_Always("_Always")
Base class for memory management.
Auxiliary class for non-blocking socket I/O.
static T * create()
Create object in memory.
Definition: JMemory.hh:89
bool isBusy() const
Check busy status.
std::string nick_name
Definition: JLigier.cc:468
int size() const
Get size.
Definition: JLigier.cc:188
static const JTag DISPTAG_ShowStat("_ShowSta")
int debug
debug level
JDispatch(const JTag &tag, const std::string &message)
Constructor.
Definition: JLigier.cc:110
JClient(const JTCPSocket &socket)
Constructor.
Definition: JLigier.cc:269
#define DEBUG(A)
Message macros.
Definition: JMessage.hh:62