[asio] continuous reads into streambuf

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[asio] continuous reads into streambuf

Boost - Users mailing list
Hello all,

once again I find myself struggling with asio streambufs but wanted to ask here before chicken out to char buffers again.
My problem is that I cannot read more than one protocol loop successfully.

I got a TCP connection here that continuously receives messages on a simple protocol like that:

header_size CRLF
header             (contains payload size)
payload

<repeat>

I thought this is a prime use case for reading into a streambuf as I can easily deal with additional data being in there and the client pushing as fast as it can. 

Here is what I do (simplified quite a bit, sorry for copy and paste and simplify errors):

(m_socket and m_streambuf are only used by one io_context, one thread, no races as far as I can tell)



void read_header_size() {

  asio::async_read_until(m_socket, m_streambuf, "\r\n",
    [self](const boost::system::error_code, const std::size_t) {
        // < handle error skipped >
        std::string hsizestr;
        hsizestr.reserve(32);
        {
            std::istream is(&self->m_streambuf);
            std::getline(is, hsizestr);
        }
        std::size_t header_size = parse(hsizestr);
        self->read_header(header_size);
  });
}


void read_header(const std::size_t n_header_size) {

  asio::async_read(m_socket, m_streambuf, boost::asio::transfer_exactly(n_header_size),
    [self, packet, n_header_size](const boost::system::error_code, const std::size_t) {

        // < handle error skipped >
        // Tried two versions of parsing the header (manual consume and istreams of various sorts)
        // manual consume:
        size_t payload_size = parse_header(n_streambuf.data().data(), n_header_size)) {
n_streambuf.consume(n_header_size);

        self->read_payload(payload_size);
    });
}

void read_payload(const std::size_t n_payload_size) {
  asio::async_read(m_socket, m_streambuf, boost::asio::transfer_exactly(payload_size),
    [self, n_packet, packet_size](const boost::system::error_code, const std::size_t) {
         // < handle error skipped >
         {
    std::istream is(&self->m_streambuf, std::ios::binary);
            is.read(target_buffer, target_buffer_size);                // target buffer is elsewhere and well guarded
         }

         // read next message
         self->read_header_size();
    });
}


When I run this and make sure the client pushes at least 2 packets I can safely receive and process the first but after that the next asnc_read() in read_header never returns. As in never calls its handler. From debugging and looking at the streambuf I can reasonably assume the buffer contains all the data after the first async_read_until because the packages are very small (only about 60 bytes total).
This leads me to believe the transfer_exactly don't return unless something is actually really transferred. But if such is the case, why does the first loop succeed? 

I am quite lost here. If anybody has some suggestion it is much appreciated.

Thanks a bunch!
Stephan






_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
On Thu, May 24, 2018 at 7:59 AM, Stephan Menzel via Boost-users
<[hidden email]> wrote:
> I am quite lost here. If anybody has some suggestion it is much appreciated.

I have no idea why your code is malfunctioning because I am not
familiar with using std streams with asio (although I know asio pretty
well, asio::basic_streambuf in particular).

However, let me propose a wild idea. Set your current code aside for
the moment and reimplement it a different way. Instead of doing three
asynchronous reads, call async_read_some once into your
basic_streambuf. Use a reasonably large size in the call to prepare
(say, 2048 or higher). When you have data in your buffer, inspect the
contents and determine if you have a complete message. If not, then
keep looping and calling async_read_some until your buffer has an
entire message. Then extract the message, consume the bytes used, and
repeat. When extracting the bytes do not use asynchronous I/O.

What, you ask, is the purpose of all this? Calls to asynchronous
initiating functions are VERY expensive. Calling async_read_until to
get some data, then async_read to get the header, then async_read to
get the body, is going to kill performance. Instead, you should be
using async_read_some to transfer into your streambuf, all the
existing data that has already been received into the socket's kernel
buffer. It is possible that you will get more than one message into
your buffer when you do this (if your protocol is full-duplex). This
is great when it happens because you are getting the most work out of
each call to async_read_some (which as I stated, is quite expensive).

In the absence of any other information about why your current code is
malfunctioning, it is possible that this reimplementation will not
only make your code perform better and become easier to reason about,
but might also solve your unknown bug.

Regards
_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
Hello Vinnie,

thanks for your reply

On Thu, May 24, 2018 at 5:22 PM, Vinnie Falco via Boost-users <[hidden email]> wrote:

However, let me propose a wild idea. Set your current code aside for
the moment and reimplement it a different way. Instead of doing three
asynchronous reads, call async_read_some once into your
basic_streambuf. Use a reasonably large size in the call to prepare
(say, 2048 or higher). When you have data in your buffer, inspect the
contents and determine if you have a complete message. If not, then
keep looping and calling async_read_some until your buffer has an
entire message. Then extract the message, consume the bytes used, and
repeat. When extracting the bytes do not use asynchronous I/O.

Not a wild idea at all. Yet a see a two problems with it. 
This pretty much means taking over what asio calls 'composed operations' myself. Instead of letting asio do read_some calls and figure out when I have, say, a line completely read (until CRL), I read and figure it out myself. Which is very much what I wanted to avoid by going the streambuf approach in the first place.
The second issue to me is the inspection of the contents. The packets are of variable size and can theoretically (though not practically) range up to about 64k. I don't know how large the payload is until I read and parsed the header. Which means I would hav to do just that: Inspect the contents of the buffer. Which I have no idea how to do. Over the years, this is not the first time I'm trying this approach and I always folded when it came to inspecting the contents of a streambuf. I simply don't know how this can be done. I can't even reliably do it in the debugger. Sure, I see data but which is input and which is output and where are all those position markers and what would I really get if I read... 

I guess you are right though. An new implementation using fixed buffers would probably solve the bug as well. As it always did in the past. I guess I just wanted to be fancy again.

Thanks for your suggestion...

Stephan

_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
On Thu, May 24, 2018 at 10:49 AM, Stephan Menzel via Boost-users
<[hidden email]> wrote:
> This pretty much means taking over what asio calls 'composed operations'

You don't have to write a composed operation and accompanying
initiating function, you can do this right in the code you already
have. Just replace `async_read_until` with `stream.async_read_some`
and put that in a loop.

> ...I always folded when it came to
> inspecting the contents of a streambuf. I simply don't know how this can be
> done.

There are a few approaches. The easiest solution is to convert the
readable bytes of the stream buffer to a pair of iterators using these
asio utility functions, and apply std algorithms (or your own) to that
range:

<https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/buffers_begin.html>
<https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/buffers_end.html>

If you are using Boost.Asio 1.67, you can adapt a std::string into a
dynamic buffer using dynamic_string_buffer:

<https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/dynamic_string_buffer.html>

Once you have the message in a std::string it should be pretty
straightforward to use string algorithms on it.

Hope this helps!
_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
In reply to this post by Boost - Users mailing list
On 25/05/2018 02:59, Stephan Menzel wrote:
> once again I find myself struggling with asio streambufs but wanted to
> ask here before chicken out to char buffers again.
> My problem is that I cannot read more than one protocol loop successfully.
[...]
>    asio::async_read_until(m_socket, m_streambuf, "\r\n",
[...]
>    asio::async_read(m_socket, m_streambuf,
> boost::asio::transfer_exactly(n_header_size),

Mixing read_until and read on the same socket is problematic, because
the way they actually behave -- while not actually wrong -- is not what
you first expect.

There are two key bits of information you need to realise the problem:

  1. async_read_until actually calls async_read_some under the hood to
read an arbitrary amount of data from the socket.  All the data is
stored in the streambuf and only a *subset* of that length is returned
to indicate where the terminator was found.

  2. async_read just calls async_read_some directly with the specified
length request.

In particular, even if #1 already read the entire message into the
streambuf, #2 will ignore that and still wait for the specified number
of new incoming bytes -- even if you are reading into the same streambuf.

The solution to this is pretty straightforward, once you realise that
you need to do it:

Instead of calling async_read with the total number of bytes you are
expecting in the message body, first check how many bytes are already
available in the streambuf (via size()), and only issue a read for the
balance (and if the balance is 0 or negative, then obviously don't issue
the read at all).

In an ideal world, async_read would do this for you, but it doesn't.
(And it does make sense that it doesn't, once you think about it --
otherwise multiple consecutive reads without consuming the data would
just return the same data over and over instead of appending it.)

_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
Mere moments ago, quoth I:
> Instead of calling async_read with the total number of bytes you are
> expecting in the message body, first check how many bytes are already
> available in the streambuf (via size()), and only issue a read for the
> balance (and if the balance is 0 or negative, then obviously don't issue
> the read at all).

Also, don't forget to call consume on the streambuf once you have
finished using a chunk of data.  (ie. after you have read the header
size, you need to consume all those bytes so that the remaining size()
no longer includes them, before you calculate the balance.)

_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
In reply to this post by Boost - Users mailing list
On Fri, May 25, 2018 at 1:44 AM, Gavin Lambert via Boost-users <[hidden email]> wrote:
Mixing read_until and read on the same socket is problematic, because the way they actually behave -- while not actually wrong -- is not what you first expect.

There are two key bits of information you need to realise the problem:

 1. async_read_until actually calls async_read_some under the hood to read an arbitrary amount of data from the socket.  All the data is stored in the streambuf and only a *subset* of that length is returned to indicate where the terminator was found.

 2. async_read just calls async_read_some directly with the specified length request.

In particular, even if #1 already read the entire message into the streambuf, #2 will ignore that and still wait for the specified number of new incoming bytes -- even if you are reading into the same streambuf.

This!
That was the key right here. I had this hunch but failed to confirm it or know how to prevent it. Thanks a bunch. I can't believe I have never read this crucial bit of info anywhere in the asio docs. I took your and Vinnie's advice and boiled this down to fewer async ops still reading into the streambuf and scratched the initial read_until, leaving only async_read. And it behaves nicely now.

Looking at all my past failures with similar streambuf based approaches I think it may very well be that I always had this mix of the two operations. This should be prominently placed in the asio docs.

So, for anybody who comes across this, the key elements are:

 * You can re-use the same streambuf for many reads and writes as long as you DO NOT MIX async_read() and async_read_until()
 * streambuf.size() gives you the data available for extraction
 * when extracting data, either read from streambuf.data() and then consume() the data you have read or use a stream that does the consume for you
 * when writing data into it either write into buffers given by prepare() and then call commit() or use a stream that does the commit for you

Thanks again,
Stephan




_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
On 05/25/2018 06:40 AM, Stephan Menzel via Boost-users wrote:
> On Fri, May 25, 2018 at 1:44 AM, Gavin Lambert via Boost-users
> Looking at all my past failures with similar streambuf based approaches
> I think it may very well be that I always had this mix of the two
> operations. This should be prominently placed in the asio docs.

Would it be possible for you to post a modified snippet of your code
showing your new way of doing things?

>
> So, for anybody who comes across this, the key elements are:
>
>   * You can re-use the same streambuf for many reads and writes as long
> as you DO NOT MIX async_read() and async_read_until()
>   * streambuf.size() gives you the data available for extraction
>   * when extracting data, either read from streambuf.data() and then
> consume() the data you have read or use a stream that does the consume
> for you
>   * when writing data into it either write into buffers given by
> prepare() and then call commit() or use a stream that does the commit
> for you--
Raymond Burkholder
[hidden email]
https://blog.raymond.burkholder.net

--
This message has been scanned for viruses and
dangerous content by MailScanner, and is
believed to be clean.

_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
On Fri, May 25, 2018 at 2:49 PM, Raymond Burkholder via Boost-users <[hidden email]> wrote:

Would it be possible for you to post a modified snippet of your code showing your new way of doing things?

I have changed many things and that wasn't real code. But the easiest way to transform my example would be to change the header size. I went from having a CRLF limited line to a 64 bit integer, which I serialize right into the buffer. This way I always know how large it is and don't need read_until anymore. Like this:
 

void read_header_size() {

  asio::async_read(m_socket, m_streambuf, asio::transfer_at_least(8),
    [self](const boost::system::error_code, const std::size_t) {
        // < handle error skipped >

        boost::uint64_t header_size = 0;

        std::istream is(&self->m_streambuf, std::ios::binary);
        is.read(reinterpret_cast<char *>(&header_size), sizeof(header_size));

        self->read_header(header_size);
     });
}


The rest could stay as it is. We're not mixing the functions anymore.
I changed more though. Gave my packet a method to de-serialize itself from a streambuf and return the number of missing bytes or 0 if ready. Then I call this until it returns a number of bytes and only then go for more async read, with the handler calling the de-serialize again.
Advantages are less code and I can use the same method on client and server alike.

Cheers,
Stephan


_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users
Reply | Threaded
Open this post in threaded view
|

Re: [asio] continuous reads into streambuf

Boost - Users mailing list
On Fri, May 25, 2018 at 8:12 AM, Stephan Menzel via Boost-users
<[hidden email]> wrote:
> Gave my packet a method to de-serialize itself from a
> streambuf and return the number of missing bytes or 0 if ready.

That sounds better :) I do the same thing here:
<https://github.com/boostorg/beast/blob/develop/include/boost/beast/websocket/impl/stream.ipp#L331>

Regards
_______________________________________________
Boost-users mailing list
[hidden email]
https://lists.boost.org/mailman/listinfo.cgi/boost-users