[ASIO] Reestablish TCP connection after EOF

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[ASIO] Reestablish TCP connection after EOF

Boost - Users mailing list
I would like to make a c++ TCP client using boost::asio based on this
article:
https://www.gamedev.net/blogs/entry/2249317-a-guide-to-getting-started-with-boostasio/?pg=5

I have an existing server which behaviour is similar to a router
configuration. You can connect to it and as soon as you have a command promt
you can send a command to the server. It executes it and sends back the
result and a command promt. Then you can send the next command...

There is no problem with this part. My code is able to establish the
connection, send and recive data until my client gets EOF. It means the
server broke the connection. I would like my client to be able to
reestablish the connection and this part is not working. Very much would
appreciate your help.

*Here is my header:*
#pragma once
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/asio/io_context_strand.hpp>

class Hive;
class Connection;

class Connection : public boost::enable_shared_from_this< Connection >
{
    friend class Hive;

private:
    static const int m_reconnect_interval = 15000;
    boost::shared_ptr< Hive > m_hive;
    boost::asio::ip::tcp::socket m_socket;
    boost::asio::io_service::strand m_io_strand;
    boost::asio::deadline_timer m_keepalive_timer;
    boost::asio::deadline_timer m_reconnect_timer;
    boost::posix_time::ptime m_last_time;
    std::vector< uint8_t > m_recv_buffer;
    std::list< int32_t > m_pending_recvs;
    std::list< std::vector< uint8_t > > m_pending_sends;
    int32_t m_receive_buffer_size;
    int32_t m_keepalive_timer_interval;

protected:
    Connection( boost::shared_ptr< Hive > hive );
    virtual ~Connection();

private:
    Connection( const Connection & rhs );
    Connection & operator =( const Connection & rhs );
    void StartSend();
    void StartRecv( int32_t total_bytes );
    void StartKeepAliveTimer();
    void StartReconnectTimer();
    void StartError(const boost::system::error_code &error, std::string
caller);
    void DispatchSend( std::vector< uint8_t > buffer );
    void DispatchRecv( int32_t total_bytes );
    void DispatchTimer( const boost::system::error_code & error );
    void ReconnectTimer( const boost::system::error_code & error );
    void HandleConnect( const boost::system::error_code & error );
    void HandleSend( const boost::system::error_code & error, std::list<
std::vector< uint8_t > >::iterator itr );
    void HandleRecv( const boost::system::error_code & error, int32_t
actual_bytes );
    void HandleKeepAliveTimer(const boost::system::error_code &error);
    void HandleReconnectTimer( const boost::system::error_code & error );
    void Reconnect();

private:
    // Called when the connection has successfully connected to the local
host.
    virtual void OnAccept( const std::string & host, uint16_t port ) = 0;

    // Called when the connection has successfully connected to the remote
host.
    virtual void OnConnect( const std::string & host, uint16_t port ) = 0;

    // Called when data has been sent by the connection.
    virtual void OnSend( const std::vector< uint8_t > & buffer ) = 0;

    // Called when data has been received by the connection.
    virtual void OnRecv( std::vector< uint8_t > & buffer ) = 0;

    // Called on each timer event.
    virtual void OnTimer( const boost::posix_time::time_duration & delta ) =
0;

    // Called when an error is encountered.
    virtual void OnError(const std::string error) = 0;

public:

    // Starts an a/synchronous connect.
    void Connect(const std::string &host, const std::string &port);

    // Posts data to be sent to the connection.
    void Send(std::string message);

    // Posts a recv for the connection to process. If total_bytes is 0, then
    // as many bytes as possible up to GetReceiveBufferSize() will be
    // waited for. If Recv is not 0, then the connection will wait for
exactly
    // total_bytes before invoking OnRecv.
    void Recv( int32_t total_bytes = 0 );

    // Posts an asynchronous disconnect event for the object to process.
    void Disconnect();
};

class Hive : public boost::enable_shared_from_this< Hive >
{
private:
    boost::asio::io_service m_io_service;
    boost::shared_ptr< boost::asio::io_service::work > m_work_ptr;

private:
    Hive( const Hive & rhs );
    Hive & operator =( const Hive & rhs );

public:
    Hive();
    virtual ~Hive();

    // Returns the io_service of this object.
    boost::asio::io_service & GetService();

    // Polls the networking subsystem once from the current thread and
returns.
    void Poll();

    // Runs the networking system on the current thread. This function
blocks
    // until the networking system is stopped, so do not call on a single
    // threaded application with no other means of being able to call Stop
    // unless you code in such logic.
    void Run();

    // Stops the networking system. All work is finished and no more
    // networking interactions will be possible afterwards until Reset is
called.
    void Stop();

    // Restarts the networking system after Stop as been called. A new work
    // object is created ad the shutdown flag is cleared.
    void Reset();
};
*and here is my cpp:*

#include "network.h"
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
#include <boost/cstdint.hpp>
#include <iostream>

Hive::Hive()
        : m_work_ptr( new boost::asio::io_service::work( m_io_service ) )
{
}

Hive::~Hive()
{
}

boost::asio::io_service & Hive::GetService()
{
    return m_io_service;
}

void Hive::Poll()
{
    m_io_service.poll();
}

void Hive::Run()
{
    m_io_service.run();
}

void Hive::Stop()
{
    m_work_ptr.reset();
    m_io_service.run();
    m_io_service.stop();
}

void Hive::Reset()
{
    m_io_service.reset();
    m_work_ptr.reset( new boost::asio::io_service::work( m_io_service ) );
}


Connection::Connection( boost::shared_ptr< Hive > hive )
        : m_hive( hive ), m_socket( hive->GetService() ), m_io_strand(
hive->GetService() ), m_keepalive_timer( hive->GetService() ),
m_reconnect_timer(hive->GetService()), m_receive_buffer_size( 4096 ),
m_keepalive_timer_interval( 2000 )
{
}

Connection::~Connection()
{
}

void Connection::StartSend()
{
    if( !m_pending_sends.empty() )
        boost::asio::async_write( m_socket, boost::asio::buffer(
m_pending_sends.front() ),  boost::asio::bind_executor(m_io_strand,
boost::bind( &Connection::HandleSend, shared_from_this(),
boost::asio::placeholders::error, m_pending_sends.begin() ) ) );
}

void Connection::StartRecv( int32_t total_bytes )
{
    if( total_bytes > 0 )
    {
        m_recv_buffer.resize( total_bytes );
        boost::asio::async_read( m_socket, boost::asio::buffer(
m_recv_buffer ),  boost::asio::bind_executor(m_io_strand, boost::bind(
&Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
    }
    else
    {
        m_recv_buffer.resize( m_receive_buffer_size );
        m_socket.async_read_some( boost::asio::buffer( m_recv_buffer ),
boost::asio::bind_executor(m_io_strand, boost::bind(
&Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
    }
}

void Connection::StartKeepAliveTimer()
{
    std::cout << "[" << __FUNCTION__ << "] " << std::endl;
    m_last_time = boost::posix_time::microsec_clock::local_time();
    m_keepalive_timer.expires_from_now( boost::posix_time::milliseconds(
m_keepalive_timer_interval ) );
    m_keepalive_timer.async_wait(  boost::asio::bind_executor(m_io_strand,
boost::bind( &Connection::DispatchTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartReconnectTimer()
{
    std::cout << "[" << __FUNCTION__ << "] " << std::endl;
    m_last_time = boost::posix_time::microsec_clock::local_time();
    m_reconnect_timer.expires_from_now( boost::posix_time::milliseconds(
m_reconnect_interval ) );
    m_reconnect_timer.async_wait(  boost::asio::bind_executor(m_io_strand,
boost::bind( &Connection::ReconnectTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartError(const boost::system::error_code &error,
std::string caller)
{
    OnError(caller + " " + error.message() + "(" + error.category().name() +
":" + std::to_string(error.value()) + ")");
    boost::system::error_code ec;
    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
    m_socket.close(ec);
    m_keepalive_timer.cancel(ec);

    if (error == boost::asio::error::misc_errors::eof || error ==
boost::system::errc::timed_out)
        StartReconnectTimer();
}

void Connection::HandleConnect( const boost::system::error_code & error )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        if( m_socket.is_open() )
        {
            OnConnect( m_socket.remote_endpoint().address().to_string(),
m_socket.remote_endpoint().port() );
            m_reconnect_timer.cancel();
            StartKeepAliveTimer();
        }
        else
            StartError(error, __FUNCTION__);
    }
}

void Connection::HandleSend( const boost::system::error_code & error,
std::list< std::vector< uint8_t > >::iterator itr )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        OnSend( *itr );
        m_pending_sends.erase( itr );
        StartSend();
    }
}

void Connection::HandleRecv( const boost::system::error_code & error,
int32_t actual_bytes )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        m_recv_buffer.resize( actual_bytes );
        OnRecv( m_recv_buffer );
        m_pending_recvs.pop_front();
        if( !m_pending_recvs.empty() )
            StartRecv( m_pending_recvs.front() );
    }
}

void Connection::HandleKeepAliveTimer(const boost::system::error_code
&error)
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        OnTimer( boost::posix_time::microsec_clock::local_time() -
m_last_time );
        StartKeepAliveTimer();
    }
}

void Connection::HandleReconnectTimer( const boost::system::error_code &
error )
{
    std::cout << "[" << __FUNCTION__ << "] Calling reconnect... " <<
std::endl;
    Reconnect();
}

void Connection::DispatchSend( std::vector< uint8_t > buffer )
{
    bool should_start_send = m_pending_sends.empty();
    m_pending_sends.push_back( buffer );
    if( should_start_send )
        StartSend();
}

void Connection::DispatchRecv( int32_t total_bytes )
{
    bool should_start_receive = m_pending_recvs.empty();
    m_pending_recvs.push_back( total_bytes );
    if( should_start_receive )
        StartRecv( total_bytes );
}

void Connection::DispatchTimer( const boost::system::error_code & error )
{
    boost::asio::post(m_io_strand,
boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(), error )
);
}

void Connection::ReconnectTimer( const boost::system::error_code & error )
{
    boost::asio::post(m_io_strand, boost::bind(
&Connection::HandleReconnectTimer, shared_from_this(), error ) );
}

void Connection::Connect(const std::string &host, const std::string &port)
{
    std::cout << "[" << __FUNCTION__ << "] connecting to "<< host << ":" <<
port << std::endl;
    boost::system::error_code ec;
    boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
    boost::asio::ip::tcp::resolver::query query( host, port );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(
query );
    m_socket.async_connect( *iterator,
boost::asio::bind_executor(m_io_strand, boost::bind(
&Connection::HandleConnect, shared_from_this(), _1 ) ) );
}

void Connection::Reconnect()
{
    m_hive->Reset();
    Connect("10.40.60.72", "23");
}

void Connection::Disconnect()
{
    boost::asio::post(m_io_strand,
boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(),
boost::asio::error::connection_reset ) );
}

void Connection::Recv( int32_t total_bytes )
{
    boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchRecv,
shared_from_this(), total_bytes ) );
}

void Connection::Send(std::string message)
{
    std::vector< uint8_t > request;
    std::copy( message.begin(), message.end(), std::back_inserter( request )
);
    boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchSend,
shared_from_this(), request ) );
}



--
Sent from: http://boost.2283326.n4.nabble.com/Boost-Users-f2553780.html
_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users