[asio] Order of asynchronous messages

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[asio] Order of asynchronous messages

Boost - Dev mailing list
Hello,

in a distributed application (only local are network) we are experiencing issues which *may* stem from the order of asychronous messages sent over sockets.

We create a send buffer:

    auto buffer = std::make_shared<std::vector<double>>();

and fill it: buffer.push_back(some_data).

This buffer is then given to our communication routine

    request = aSend(*buffer, recipient);

to keep the send buffer alive we place it in a class attribute:

    bufferedRequests.emplace_back(request, buffer);

The aSend method from above works like (asio is boost asio, of course)

PtrRequest aSend(std::vector<double> const & itemsToSend, int recipient)
{
  PtrRequest request(new SocketRequest);

  asio::async_write(*_sockets[recipient],
                    asio::buffer(itemsToSend),
                    [request](boost::system::error_code const &, std::size_t) {
                      std::static_pointer_cast<SocketRequest>(request)->complete();
                    });
  return request;
}

This way, we can check for completion using request->test(), which is set from the callback request->complete().

For checking of completed requests we have a loop, which is called reqularly

void checkBufferedRequests(bool blocking)
{
  do {
    for (auto it = bufferedRequests.begin(); it != bufferedRequests.end();) {
      if (it->first->test())
        it = bufferedRequests.erase(it);
      else
        ++it;
    }
    if (bufferedRequests.empty())
      return;
    if (blocking)
      std::this_thread::yield(); // give up our time slice, so communication way work
  } while (blocking);
}

which operates on bufferedRequests, that is a list<pair<PtrRequest, PtrBuffer>>.

We use that, so that the sending peer is not blocked by a slow receiving peer.

The sockets are created like

   tcp::acceptor acceptor(*_ioService);

    {
      tcp::endpoint endpoint(tcp::v4(), _portNumber);

      acceptor.open(endpoint.protocol());
      acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
      acceptor.bind(endpoint);
      acceptor.listen();
    }

Now we are having problems with mangled data that are unexplainable right now.

- Is the order of requests guaranteed to be preserved? I think so, based that asio works on TCP.

- What happens if there is another read or write performed on the socket. I think that there is no way to guarantee that this does not interfere with the order of the asynchronous send or queued requests.


Last but not least: Do you see any potential problems with that code?

Thanks a lot!
Florian

_______________________________________________
Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Reply | Threaded
Open this post in threaded view
|

Re: [asio] Order of asynchronous messages

Boost - Dev mailing list
Hi Florian,

I suspect the issue is assuming that one call to `async_write()` writes a
whole buffer before a subsequent call to `async_write()` - on the same
socket - does. The use of `*_sockets[recipient]` makes it look like you're
reusing sockets.

As per the docs [1], `async_write()` calls `async_write_some()`, possibly
multiple times depending on how much data can be sent at any time. So with
your code, the underlying `async_write_some()` invocations may be
overlapping if you're reusing a socket for multiple clients.

You may want to have your `checkBufferedRequests()` loop submit the
`async_write()`s and have `aSend()` just add the buffer to a queue.

Cheers,
Darren

[1]
https://www.boost.org/doc/libs/1_68_0/doc/html/boost_asio/reference/async_write/overload1.html

On Mon, 1 Oct 2018 at 16:30, Florian Lindner via Boost <
[hidden email]> wrote:

> Hello,
>
> in a distributed application (only local are network) we are experiencing
> issues which *may* stem from the order of asychronous messages sent over
> sockets.
>
> We create a send buffer:
>
>     auto buffer = std::make_shared<std::vector<double>>();
>
> and fill it: buffer.push_back(some_data).
>
> This buffer is then given to our communication routine
>
>     request = aSend(*buffer, recipient);
>
> to keep the send buffer alive we place it in a class attribute:
>
>     bufferedRequests.emplace_back(request, buffer);
>
> The aSend method from above works like (asio is boost asio, of course)
>
> PtrRequest aSend(std::vector<double> const & itemsToSend, int recipient)
> {
>   PtrRequest request(new SocketRequest);
>
>   asio::async_write(*_sockets[recipient],
>                     asio::buffer(itemsToSend),
>                     [request](boost::system::error_code const &,
> std::size_t) {
>
> std::static_pointer_cast<SocketRequest>(request)->complete();
>                     });
>   return request;
> }
>
> This way, we can check for completion using request->test(), which is set
> from the callback request->complete().
>
> For checking of completed requests we have a loop, which is called
> reqularly
>
> void checkBufferedRequests(bool blocking)
> {
>   do {
>     for (auto it = bufferedRequests.begin(); it !=
> bufferedRequests.end();) {
>       if (it->first->test())
>         it = bufferedRequests.erase(it);
>       else
>         ++it;
>     }
>     if (bufferedRequests.empty())
>       return;
>     if (blocking)
>       std::this_thread::yield(); // give up our time slice, so
> communication way work
>   } while (blocking);
> }
>
> which operates on bufferedRequests, that is a list<pair<PtrRequest,
> PtrBuffer>>.
>
> We use that, so that the sending peer is not blocked by a slow receiving
> peer.
>
> The sockets are created like
>
>    tcp::acceptor acceptor(*_ioService);
>
>     {
>       tcp::endpoint endpoint(tcp::v4(), _portNumber);
>
>       acceptor.open(endpoint.protocol());
>       acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
>       acceptor.bind(endpoint);
>       acceptor.listen();
>     }
>
> Now we are having problems with mangled data that are unexplainable right
> now.
>
> - Is the order of requests guaranteed to be preserved? I think so, based
> that asio works on TCP.
>
> - What happens if there is another read or write performed on the socket.
> I think that there is no way to guarantee that this does not interfere with
> the order of the asynchronous send or queued requests.
>
>
> Last but not least: Do you see any potential problems with that code?
>
> Thanks a lot!
> Florian
>
> _______________________________________________
> Unsubscribe & other changes:
> http://lists.boost.org/mailman/listinfo.cgi/boost
>

_______________________________________________
Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Reply | Threaded
Open this post in threaded view
|

Re: [asio] Order of asynchronous messages

Boost - Dev mailing list
In reply to this post by Boost - Dev mailing list
On 2/10/2018 04:23, Florian Lindner wrote:
> Now we are having problems with mangled data that are unexplainable right now.
>
> - Is the order of requests guaranteed to be preserved? I think so, based that asio works on TCP.
>
> - What happens if there is another read or write performed on the socket. I think that there is no way to guarantee that this does not interfere with the order of the asynchronous send or queued requests.
There is no guarantee about transmission order for interleaved writes.
Don't do it.

In general, you should have at maximum one read and one write operation
"in flight" at any given moment for any given stream socket, if you
expect the stream to be coherent.

(Also, avoid mixing read with read_until.  There is a way to do this
correctly but it's non-intuitive.)

_______________________________________________
Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost