Jpp - the software that should make you happy
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
recipient.hh
Go to the documentation of this file.
1 #ifndef RECIPIENT_HH
2 #define RECIPIENT_HH
3 
4 #include <boost/asio.hpp>
5 #include <FrameFactory/frame.hh>
6 #include <boost/circular_buffer.hpp>
7 #include "log.hh"
8 
9 /**
10  * \author cpellegrino
11  */
12 
13 typedef boost::circular_buffer<Frame> CircularBuffer;
14 
15 class Recipient
16 {
17  boost::asio::ip::tcp::socket m_sock;
18  boost::asio::ip::tcp::endpoint m_endpoint;
23  const long heartbeat_interval=5000; //time to wait if the links with the filters are temporarly unavailable
24  public:
25 
26  Recipient(boost::asio::io_service& service,
27  const boost::asio::ip::tcp::endpoint& endpoint,
28  size_t circbuff_size)
29  :
30  m_sock(service),
31  m_endpoint(endpoint),
32  m_cbuffer(circbuff_size),
33  m_connected(false)
34  {
35  LOG_NOTICE << "Trying to connect to " << m_endpoint;
36  connect();
37  if (!m_connected) {
38  LOG_ERROR << "Connection to " << m_endpoint << " failed";
39  link_active=false;
40  }else
41  link_active=true;
42  heartbeat_c=0;
43  }
44 
45  void sock_reset()
46  {
47  stop();
48  connect();
49  }
50 
51  bool sendIfPossible(const Frame& data)
52  {
53 
54  //if Recipient receives a EAGAIN message, it stops sending tcp frames for the heartbeat interval
55  //otherwise the RAM usage could increase out of control
56  if (! link_active){
57  m_cbuffer.push_back(data);
58  heartbeat_c++;
60  link_active=true;
61  heartbeat_c=0;
62  }
63  return false;
64  }
65  if (! m_connected)
66  {
67  sock_reset();
68 
69  if (! m_connected)
70  {
71  m_cbuffer.push_back(data);
72  return false;
73  }
74  }
75 
76  while (m_cbuffer.size())
77  {
78  if (send(m_cbuffer.front()))
79  {
80  m_cbuffer.pop_front();
81  }
82  else
83  {
84  break;
85  }
86  }
87  if (! send(data))
88  {
89  m_cbuffer.push_back(data);
90  }
91 
92  return m_connected;
93  }
94 
96  {
97  stop();
98  }
99 
100  friend class RecipientsHandler;
101 
102  private:
103 
104  void connect()
105  {
106  boost::system::error_code ec;
107  m_sock.connect(m_endpoint, ec);
108  m_connected = !ec;
109 
110  if (m_connected) {
111  boost::asio::socket_base::send_buffer_size option(67108864);
112  m_sock.set_option(option);
113  boost::asio::socket_base::non_blocking_io command(true);
114  m_sock.io_control(command);
115  LOG_NOTICE << "Connection to " << m_endpoint << " succeeded";
116  boost::system::error_code ec;
117  m_sock.shutdown(boost::asio::ip::tcp::socket::shutdown_receive, ec);
118  }
119  }
120 
121  void stop()
122  {
123  boost::system::error_code ec;
124  m_sock.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
125  m_sock.close(ec);
126  }
127 /**
128  * Send data
129  *
130  * \param data the Frame to send.
131  * \return true if OK; else false
132  */
133  bool send(const Frame& data)
134  {
135  boost::system::error_code ec;
136  ssize_t nsent = boost::asio::write(m_sock, boost::asio::buffer(data.data(), data.getFrameLength()), ec);
137  m_connected = !ec;
138 
139  if (ec) {
140  if(ec.value()==11 && nsent<100){ //errno 11 = EAGAIN or EWOULDBLOCK
141  link_active=false;
142  LOG_ERROR << "The link with " << m_endpoint <<
143  " gives EAGAIN: stop sending frame for the heartbeat interval in order to prevent strong RAM usage";
144  }else if(ec.value()!=11)
145  LOG_ERROR << "Error transmitting data to " << m_endpoint << ": " << ec;
146 
147  }
148  return m_connected;
149  }
150 };
151 
152 #endif // RECIPIENT_HH
boost::circular_buffer< Frame > CircularBuffer
Definition: recipient.hh:13
void stop()
Definition: recipient.hh:121
Recipient(boost::asio::io_service &service, const boost::asio::ip::tcp::endpoint &endpoint, size_t circbuff_size)
Definition: recipient.hh:26
CircularBuffer m_cbuffer
Definition: recipient.hh:19
boost::asio::ip::tcp::socket m_sock
Definition: recipient.hh:17
#define LOG_NOTICE
Definition: log.hh:112
void connect()
Definition: recipient.hh:104
const long heartbeat_interval
Definition: recipient.hh:23
bool send(const Frame &data)
Send data.
Definition: recipient.hh:133
#define LOG_ERROR
Definition: log.hh:111
bool link_active
Definition: recipient.hh:21
long heartbeat_c
Definition: recipient.hh:22
bool sendIfPossible(const Frame &data)
Definition: recipient.hh:51
boost::asio::ip::tcp::endpoint m_endpoint
Definition: recipient.hh:18
~Recipient()
Definition: recipient.hh:95
unsigned int getFrameLength() const
Definition: frame.hh:34
bool write(const Vec &v, std::ostream &os)
Write a Vec(tor) to a stream.
Definition: io_ascii.hh:154
Template Frame for ARS data.
Definition: frame.hh:12
void sock_reset()
Definition: recipient.hh:45
bool m_connected
Definition: recipient.hh:20