Removed connection_pool::async_get_connection timeout overloads

Please use asio::cancel_after, instead.
Replaced client_errc::timeout, client_errc::cancelled by
    client_errc::no_connection_available, client_errc::pool_cancelled
async_get_connection no longer uses the last connect error code as failure error code,
    but embeds this info in the output diagnostics

close #349
This commit is contained in:
Anarthal (Rubén Pérez) 2024-09-26 08:30:52 +02:00 committed by GitHub
parent a84b774bef
commit e72b4906af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1069 additions and 1364 deletions

View File

@ -64,8 +64,7 @@ The resizing algorithm works like this:
* If a connection is requested, but all available connections are in use, a
new one is created, until `max_size` is reached.
* If a connection is requested, and there are `max_size` connections in use,
[refmem connection_pool async_get_connection] waits for a connection to become available,
up to a certain period of time. If no connection is available after this period, the operation fails.
[refmem connection_pool async_get_connection] waits for a connection to become available.
* Once created, connections never get deallocated.
By default, [refmem pool_params max_size] is 151, which is
@ -83,6 +82,22 @@ This is how you configure pool sizes:
[connection_pool_configure_size]
[heading Applying a timeout to async_get_connection]
By default, [refmem connection_pool async_get_connection] waits until a connection is available.
This means that, if the server is unavailable, `async_get_connection` may wait forever.
For this reason, you may consider setting a timeout to `async_get_connection`.
You can do this using [asioreflink cancel_after cancel_after], which uses Asio's
per-operation cancellation mechanism:
[connection_pool_apply_timeout]
You might consider setting the timeout at a higher level, instead. For instance,
if you're handling an HTTP request, you can use `cancel_after` to set a timeout
to the entire request. The [link mysql.examples.connection_pool connection pool example]
takes this approach.
[heading Session state]
@ -171,6 +186,10 @@ the created strand, avoiding data races at the cost of some performance.
Thread-safety only protects the pool. Individual connections are [*not] thread-safe.
]
Thread-safety extends to per-operation cancellation, too.
Cancelling an operation on a thread-safe pool is safe.
[heading Transport types and TLS]
You can use the same set of transports as when working with [reflink any_connection]:

View File

@ -19,6 +19,8 @@
#include <boost/mysql/error_code.hpp>
#include <boost/mysql/error_with_diagnostics.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/value_from.hpp>
@ -27,9 +29,9 @@
#include <boost/url/parse.hpp>
#include <boost/variant2/variant.hpp>
#include <chrono>
#include <cstdint>
#include <exception>
#include <iostream>
#include <string>
#include "handle_request.hpp"
@ -325,8 +327,13 @@ public:
{
try
{
// Attempt to handle the request
return handle_request_impl(yield);
// Attempt to handle the request. We use cancel_after to set
// a timeout to the overall operation
return asio::spawn(
yield.get_executor(),
[this](asio::yield_context yield2) { return handle_request_impl(yield2); },
asio::cancel_after(std::chrono::seconds(30), yield)
);
}
catch (const boost::mysql::error_with_diagnostics& err)
{

View File

@ -81,18 +81,24 @@ enum class client_errc : int
/// The static interface encountered an error when parsing a field into a C++ data structure.
static_row_parsing_error,
/// (EXPERIMENTAL) An operation controlled by Boost.MySQL timed out.
timeout,
/// (EXPERIMENTAL) An operation controlled by Boost.MySQL was cancelled.
cancelled,
/**
* \brief (EXPERIMENTAL) Getting a connection from a connection_pool failed because the
* pool is not running. Ensure that you're calling connection_pool::async_run.
*/
pool_not_running,
/**
* \brief (EXPERIMENTAL) Getting a connection from a connection_pool failed because the
* pool was cancelled.
*/
pool_cancelled,
/**
* \brief (EXPERIMENTAL) Getting a connection from a connection_pool was cancelled before
* a connection was available.
*/
no_connection_available,
/// (EXPERIMENTAL) An invalid byte sequence was found while trying to decode a string.
invalid_encoding,

View File

@ -281,11 +281,6 @@ class connection_pool
friend struct detail::access;
#endif
static constexpr std::chrono::steady_clock::duration get_default_timeout() noexcept
{
return std::chrono::seconds(30);
}
struct initiate_run : detail::initiation_base
{
using detail::initiation_base::initiation_base;
@ -309,37 +304,26 @@ class connection_pool
using detail::initiation_base::initiation_base;
template <class Handler>
void operator()(
Handler&& h,
diagnostics* diag,
std::shared_ptr<detail::pool_impl> self,
std::chrono::steady_clock::duration timeout
)
void operator()(Handler&& h, diagnostics* diag, std::shared_ptr<detail::pool_impl> self)
{
async_get_connection_erased(std::move(self), timeout, diag, std::forward<Handler>(h));
async_get_connection_erased(std::move(self), diag, std::forward<Handler>(h));
}
};
BOOST_MYSQL_DECL
static void async_get_connection_erased(
std::shared_ptr<detail::pool_impl> pool,
std::chrono::steady_clock::duration timeout,
diagnostics* diag,
asio::any_completion_handler<void(error_code, pooled_connection)> handler
);
template <class CompletionToken>
auto async_get_connection_impl(
std::chrono::steady_clock::duration timeout,
diagnostics* diag,
CompletionToken&& token
)
auto async_get_connection_impl(diagnostics* diag, CompletionToken&& token)
-> decltype(asio::async_initiate<CompletionToken, void(error_code, pooled_connection)>(
std::declval<initiate_get_connection>(),
token,
diag,
impl_,
timeout
impl_
))
{
BOOST_ASSERT(valid());
@ -347,8 +331,7 @@ class connection_pool
initiate_get_connection{get_executor()},
token,
diag,
impl_,
timeout
impl_
);
}
@ -525,9 +508,9 @@ public:
* to succeed.
*
* The async operation will run indefinitely, until the pool is cancelled
* (by being destroyed or calling \ref cancel). The operation completes once
* all internal connection operations (including connects, pings and resets)
* complete.
* (by calling \ref cancel or using per-operation cancellation on the `async_run` operation).
* The operation completes once all internal connection operations
* (including connects, pings and resets) complete.
*
* It is safe to call this function after calling \ref cancel.
*
@ -585,157 +568,81 @@ public:
);
}
/// \copydoc async_get_connection(diagnostics&,CompletionToken&&)
/**
* \brief Retrieves a connection from the pool.
* \details
* Retrieves an idle connection from the pool to be used.
*
* If this function completes successfully (empty error code), the return \ref pooled_connection
* will have `valid() == true` and will be usable. If it completes with a non-empty error code,
* it will have `valid() == false`.
*
* If a connection is idle when the operation is started, it will complete immediately
* with that connection. Otherwise, it will wait for a connection to become idle
* (possibly creating one in the process, if pool configuration allows it), until
* the operation is cancelled (by emitting a cancellation signal) or the pool
* is cancelled (by calling \ref connection_pool::cancel).
* If the pool is not running, the operation fails immediately.
*
* If the operation is cancelled, and the overload with \ref diagnostics was used,
* the output diagnostics will contain the most recent error generated by
* the connections attempting to connect (via \ref any_connection::async_connect), if any.
* In cases where \ref async_get_connection doesn't complete because connections are unable
* to connect, this feature can help figuring out where the problem is.
*
* \par Preconditions
* `this->valid() == true` \n
*
* \par Object lifetimes
* While the operation is outstanding, the pool's internal data will be kept alive.
* It is safe to destroy `*this` while the operation is outstanding.
*
* \par Handler signature
* The handler signature for this operation is
* `void(boost::mysql::error_code, boost::mysql::pooled_connection)`
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation.
* Cancelling `async_get_connection` has no observable side effects.
* The following `asio::cancellation_type_t` values are supported:
*
* - `asio::cancellation_type_t::terminal`
* - `asio::cancellation_type_t::partial`
* - `asio::cancellation_type_t::total`
*
* \par Errors
* - \ref client_errc::no_connection_available, if the `async_get_connection`
* operation is cancelled before a connection becomes available.
* - \ref client_errc::pool_not_running, if the pool is not running
* when the operation is started.
* - \ref client_errc::pool_cancelled, if the pool is cancelled before
* the operation completes, or `async_get_connection` is called
* on a pool that has been cancelled.
*
* \par Thread-safety
* Safe for pools built with \ref pool_params::thread_safe. Can be called
* concurrently with other safe functions. For thread-safe pools, cancellation
* signals can be safely emitted from any thread.
*/
template <
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(::boost::mysql::error_code, ::boost::mysql::pooled_connection))
CompletionToken = with_diagnostics_t<asio::deferred_t>>
auto async_get_connection(CompletionToken&& token = {}) BOOST_MYSQL_RETURN_TYPE(
decltype(async_get_connection_impl({}, nullptr, std::forward<CompletionToken>(token)))
decltype(async_get_connection_impl(nullptr, std::forward<CompletionToken>(token)))
)
{
return async_get_connection_impl(
get_default_timeout(),
nullptr,
std::forward<CompletionToken>(token)
);
return async_get_connection_impl(nullptr, std::forward<CompletionToken>(token));
}
/**
* \brief Retrieves a connection from the pool.
* \details
* Retrieves an idle connection from the pool to be used.
*
* If this function completes successfully (empty error code), the return \ref pooled_connection
* will have `valid() == true` and will be usable. If it completes with a non-empty error code,
* it will have `valid() == false`.
*
* If a connection is idle when the operation is started, it will complete immediately
* with that connection. Otherwise, it will wait for a connection to become idle
* (possibly creating one in the process, if pool configuration allows it), up to
* a duration of 30 seconds.
*
* If a timeout happens because connection establishment has failed, appropriate
* diagnostics will be returned.
*
* \par Preconditions
* `this->valid() == true` \n
*
* \par Object lifetimes
* While the operation is outstanding, the pool's internal data will be kept alive.
* It is safe to destroy `*this` while the operation is outstanding.
*
* \par Handler signature
* The handler signature for this operation is
* `void(boost::mysql::error_code, boost::mysql::pooled_connection)`
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation.
* Cancelling `async_get_connection` has no observable side effects.
* The following `asio::cancellation_type_t` values are supported:
*
* - `asio::cancellation_type_t::terminal`
* - `asio::cancellation_type_t::partial`
* - `asio::cancellation_type_t::total`
*
* \par Errors
* \li Any error returned by \ref any_connection::async_connect, if a timeout
* happens because connection establishment is failing.
* \li \ref client_errc::timeout, if a timeout happens for any other reason
* (e.g. all connections are in use and limits forbid creating more).
* \li \ref client_errc::cancelled if \ref cancel was called before the operation is started or while
* it is outstanding, or if the pool is not running.
*
* \par Thread-safety
* Safe for pools built with \ref pool_params::thread_safe. Can be called
* concurrently with other safe functions. For thread-safe pools, cancellation
* signals can be safely emitted from any thread.
*/
/// \copydoc async_get_connection
template <
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(::boost::mysql::error_code, ::boost::mysql::pooled_connection))
CompletionToken = with_diagnostics_t<asio::deferred_t>>
auto async_get_connection(diagnostics& diag, CompletionToken&& token = {}) BOOST_MYSQL_RETURN_TYPE(
decltype(async_get_connection_impl({}, nullptr, std::forward<CompletionToken>(token)))
decltype(async_get_connection_impl(nullptr, std::forward<CompletionToken>(token)))
)
{
return async_get_connection_impl(get_default_timeout(), &diag, std::forward<CompletionToken>(token));
}
/// \copydoc async_get_connection(std::chrono::steady_clock::duration,diagnostics&,CompletionToken&&)
template <
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(::boost::mysql::error_code, ::boost::mysql::pooled_connection))
CompletionToken = with_diagnostics_t<asio::deferred_t>>
auto async_get_connection(std::chrono::steady_clock::duration timeout, CompletionToken&& token = {})
BOOST_MYSQL_RETURN_TYPE(
decltype(async_get_connection_impl({}, nullptr, std::forward<CompletionToken>(token)))
)
{
return async_get_connection_impl(timeout, nullptr, std::forward<CompletionToken>(token));
}
/**
* \brief Retrieves a connection from the pool.
* \details
* Retrieves an idle connection from the pool to be used.
*
* If this function completes successfully (empty error code), the return \ref pooled_connection
* will have `valid() == true` and will be usable. If it completes with a non-empty error code,
* it will have `valid() == false`.
*
* If a connection is idle when the operation is started, it will complete immediately
* with that connection. Otherwise, it will wait for a connection to become idle
* (possibly creating one in the process, if pool configuration allows it), up to
* a duration of `timeout`. A zero timeout disables it.
*
* If a timeout happens because connection establishment has failed, appropriate
* diagnostics will be returned.
*
* \par Preconditions
* `this->valid() == true` \n
* Timeout values must be positive: `timeout.count() >= 0`.
*
* \par Object lifetimes
* While the operation is outstanding, the pool's internal data will be kept alive.
* It is safe to destroy `*this` while the operation is outstanding.
*
* \par Handler signature
* The handler signature for this operation is
* `void(boost::mysql::error_code, boost::mysql::pooled_connection)`
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation.
* Cancelling `async_get_connection` has no observable side effects.
* The following `asio::cancellation_type_t` values are supported:
*
* - `asio::cancellation_type_t::terminal`
* - `asio::cancellation_type_t::partial`
* - `asio::cancellation_type_t::total`
*
* \par Errors
* \li Any error returned by \ref any_connection::async_connect, if a timeout
* happens because connection establishment is failing.
* \li \ref client_errc::timeout, if a timeout happens for any other reason
* (e.g. all connections are in use and limits forbid creating more).
* \li \ref client_errc::cancelled if \ref cancel was called before the operation is started or while
* it is outstanding, or if the pool is not running.
*
* \par Thread-safety
* Safe for pools built with \ref pool_params::thread_safe. Can be called
* concurrently with other safe functions. For thread-safe pools, cancellation
* signals can be safely emitted from any thread.
*/
template <
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(::boost::mysql::error_code, ::boost::mysql::pooled_connection))
CompletionToken = with_diagnostics_t<asio::deferred_t>>
auto async_get_connection(
std::chrono::steady_clock::duration timeout,
diagnostics& diag,
CompletionToken&& token = {}
)
BOOST_MYSQL_RETURN_TYPE(
decltype(async_get_connection_impl({}, nullptr, std::forward<CompletionToken>(token)))
)
{
return async_get_connection_impl(timeout, &diag, std::forward<CompletionToken>(token));
return async_get_connection_impl(&diag, std::forward<CompletionToken>(token));
}
/**
@ -745,10 +652,9 @@ public:
*
* \li Stops the currently outstanding \ref async_run operation, if any, which will complete
* with a success error code.
* \li Cancels any outstanding \ref async_get_connection operations, which will complete with
* \ref client_errc::cancelled.
* \li Marks the pool as cancelled. Successive `async_get_connection` calls will complete
* immediately with \ref client_errc::cancelled.
* \li Cancels any outstanding \ref async_get_connection operations.
* \li Marks the pool as cancelled. Successive `async_get_connection` calls will
* fail immediately.
*
* This function will return immediately, without waiting for the cancelled operations to complete.
*

View File

@ -100,6 +100,7 @@ private:
msg = std::move(from);
is_server = true;
}
} impl_;
friend bool operator==(const diagnostics& lhs, const diagnostics& rhs) noexcept;

View File

@ -53,12 +53,11 @@ void boost::mysql::connection_pool::async_run_erased(
void boost::mysql::connection_pool::async_get_connection_erased(
std::shared_ptr<detail::pool_impl> pool,
std::chrono::steady_clock::duration timeout,
diagnostics* diag,
asio::any_completion_handler<void(error_code, pooled_connection)> handler
)
{
pool->async_get_connection(timeout, diag, std::move(handler));
pool->async_get_connection(diag, std::move(handler));
}
void boost::mysql::connection_pool::cancel()

View File

@ -56,11 +56,14 @@ inline const char* error_to_string(client_errc error)
case client_errc::row_type_mismatch:
return "The StaticRow type passed to read_some_rows does not correspond to the resultset type being "
"read";
case client_errc::timeout: return "An operation controlled by Boost.MySQL timed out";
case client_errc::cancelled: return "An operation controlled by Boost.MySQL was cancelled";
case client_errc::pool_not_running:
return "Getting a connection from a connection_pool failed because the pool is not running. Ensure "
"that you're calling connection_pool::async_run.";
case client_errc::pool_cancelled:
return "Getting a connection from a connection_pool failed because the pool was cancelled.";
case client_errc::no_connection_available:
return "Getting a connection from a connection_pool was cancelled before "
"a connection was available.";
case client_errc::invalid_encoding:
return "A string passed to a formatting function contains a byte sequence that can't be decoded with "
"the current character set.";

View File

@ -15,17 +15,18 @@
#include <boost/mysql/error_code.hpp>
#include <boost/mysql/pipeline.hpp>
#include <boost/mysql/detail/access.hpp>
#include <boost/mysql/detail/connection_pool_fwd.hpp>
#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
#include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
#include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
#include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
@ -42,11 +43,24 @@ namespace detail {
template <class ConnectionType, class ClockType>
struct conn_shared_state
{
// The list of connections that are currently idle. Non-owning.
intrusive::list<basic_connection_node<ConnectionType, ClockType>> idle_list;
timer_list<ClockType> pending_requests;
// Timer acting as a condition variable to wait for idle connections
asio::basic_waitable_timer<ClockType> idle_connections_cv;
// The number of pending connections (currently getting ready).
// Controls that we don't create connections while some are still connecting
std::size_t num_pending_connections{0};
error_code last_ec;
diagnostics last_diag;
// Info about the last connection attempt. Already processed, suitable to be used
// as the result of an async_get_connection op
diagnostics last_connect_diag;
conn_shared_state(asio::any_io_executor ex)
: idle_connections_cv(std::move(ex), (ClockType::time_point::max)())
{
}
};
// The templated type is never exposed to the user. We template
@ -77,7 +91,7 @@ class basic_connection_node : public intrusive::list_base_hook<>,
void entering_idle()
{
shared_st_->idle_list.push_back(*this);
shared_st_->pending_requests.notify_one();
shared_st_->idle_connections_cv.cancel_one();
}
void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
void entering_pending() { ++shared_st_->num_pending_connections; }
@ -86,8 +100,7 @@ class basic_connection_node : public intrusive::list_base_hook<>,
// Helpers
void propagate_connect_diag(error_code ec)
{
shared_st_->last_ec = ec;
shared_st_->last_diag = connect_diag_;
shared_st_->last_connect_diag = create_connect_diagnostics(ec, connect_diag_);
}
struct connection_task_op

View File

@ -19,7 +19,6 @@
#include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
#include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
#include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
#include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
#include <boost/mysql/impl/internal/coroutine.hpp>
@ -38,7 +37,6 @@
#include <boost/asio/immediate.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <boost/core/ignore_unused.hpp>
#include <chrono>
#include <cstddef>
@ -66,7 +64,6 @@ class basic_pool_impl
using this_type = basic_pool_impl<ConnectionType, ClockType, ConnectionWrapper>;
using node_type = basic_connection_node<ConnectionType, ClockType>;
using timer_type = asio::basic_waitable_timer<ClockType>;
using timer_block_type = timer_block<ClockType>;
using shared_state_type = conn_shared_state<ConnectionType, ClockType>;
enum class state_t
@ -114,21 +111,23 @@ class basic_pool_impl
wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
}
error_code get_diagnostics(diagnostics* diag) const
void maybe_create_connection()
{
if (state_ == state_t::cancelled)
if (can_create_connection())
create_connection();
}
node_type* try_get_connection()
{
if (!shared_st_.idle_list.empty())
{
return client_errc::cancelled;
}
else if (shared_st_.last_ec)
{
if (diag)
*diag = shared_st_.last_diag;
return shared_st_.last_ec;
node_type& res = shared_st_.idle_list.front();
res.mark_as_in_use();
return &res;
}
else
{
return client_errc::timeout;
return nullptr;
}
}
@ -144,6 +143,20 @@ class basic_pool_impl
asio::post(get_executor(), std::move(self));
}
template <class OpSelf>
void wait_for_connections(OpSelf& self)
{
// Having this encapsulated helps prevent subtle use-after-move errors
if (params_.thread_safe)
{
shared_st_.idle_connections_cv.async_wait(asio::bind_executor(pool_ex_, std::move(self)));
}
else
{
shared_st_.idle_connections_cv.async_wait(std::move(self));
}
}
struct run_op
{
int resume_point_{0};
@ -167,10 +180,8 @@ class basic_pool_impl
};
template <class Self>
void operator()(Self& self, error_code ec = {})
void operator()(Self& self, error_code = {})
{
boost::ignore_unused(ec);
switch (resume_point_)
{
case 0:
@ -207,7 +218,7 @@ class basic_pool_impl
obj_->state_ = state_t::cancelled;
for (auto& conn : obj_->all_conns_)
conn.cancel();
obj_->shared_st_.pending_requests.notify_all();
obj_->shared_st_.idle_connections_cv.expires_at((ClockType::time_point::min)());
// Wait for all connection tasks to exit
BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
@ -222,55 +233,63 @@ class basic_pool_impl
struct get_connection_op
{
int resume_point_{0};
std::shared_ptr<this_type> obj_;
diagnostics* diag_;
std::chrono::steady_clock::time_point timeout_;
std::unique_ptr<timer_block_type> timer_;
error_code result_ec_;
node_type* result_conn_{};
std::shared_ptr<asio::cancellation_signal> sig_;
// Operation arguments
std::shared_ptr<this_type> obj;
diagnostics* diag;
// The proxy signal. Used in thread-safe mode. Present here only for
// lifetime management
std::shared_ptr<asio::cancellation_signal> sig;
// The original cancellation slot. Needed for proper cleanup after we proxy
// the signal in thread-safe mode
asio::cancellation_slot parent_slot;
// State
int resume_point{0};
error_code result_ec;
node_type* result_conn{};
bool has_waited{false};
get_connection_op(
std::shared_ptr<this_type> obj,
std::chrono::steady_clock::time_point timeout,
diagnostics* diag,
std::shared_ptr<asio::cancellation_signal> sig
std::shared_ptr<asio::cancellation_signal> sig,
asio::cancellation_slot parent_slot
) noexcept
: obj_(std::move(obj)), diag_(diag), timeout_(timeout), sig_(std::move(sig))
: obj(std::move(obj)), diag(diag), sig(std::move(sig)), parent_slot(parent_slot)
{
}
bool thread_safe() const { return obj_->params_.thread_safe; }
bool thread_safe() const { return obj->params_.thread_safe; }
template <class Self>
void do_complete(Self& self)
{
auto wr = result_ec_ ? ConnectionWrapper() : ConnectionWrapper(*result_conn_, std::move(obj_));
obj_.reset();
sig_.reset();
self.complete(result_ec_, std::move(wr));
auto wr = result_ec ? ConnectionWrapper() : ConnectionWrapper(*result_conn, std::move(obj));
parent_slot.clear();
sig.reset();
self.complete(result_ec, std::move(wr));
}
template <class Self>
void operator()(Self& self, error_code ec = {})
void operator()(Self& self, error_code = {})
{
bool has_waited{};
timer_type* tim{};
switch (resume_point_)
switch (resume_point)
{
case 0:
// This op supports total cancellation. Must be explicitly enabled,
// as composed ops only support terminal cancellation by default.
self.reset_cancellation_state(asio::enable_total_cancellation());
// Clear diagnostics
if (diag_)
diag_->clear();
if (diag)
diag->clear();
// Enter the strand
if (thread_safe())
{
BOOST_MYSQL_YIELD(resume_point_, 1, obj_->enter_strand(self))
BOOST_MYSQL_YIELD(resume_point, 1, obj->enter_strand(self))
}
// This loop guards us against possible race conditions
@ -279,82 +298,56 @@ class basic_pool_impl
while (true)
{
// If we're not running yet, or were cancelled, just return
if (obj_->state_ == state_t::initial)
if (obj->state_ == state_t::initial)
{
result_ec_ = client_errc::pool_not_running;
result_ec = client_errc::pool_not_running;
break;
}
else if (obj_->state_ == state_t::cancelled ||
get_connection_supports_cancel_type(self.cancelled()))
else if (obj->state_ == state_t::cancelled)
{
result_ec_ = client_errc::cancelled;
// The pool was cancelled
result_ec = client_errc::pool_cancelled;
break;
}
else if (get_connection_supports_cancel_type(self.cancelled()))
{
// The operation was cancelled. Try to provide diagnostics
result_ec = client_errc::no_connection_available;
if (diag)
*diag = obj->shared_st_.last_connect_diag;
break;
}
// Try to get a connection
if (!obj_->shared_st_.idle_list.empty())
if ((result_conn = obj->try_get_connection()) != nullptr)
{
// There was a connection
result_conn_ = &obj_->shared_st_.idle_list.front();
result_conn_->mark_as_in_use();
break;
}
// No luck. If there is room for more connections, create one.
if (obj_->can_create_connection())
{
obj_->create_connection();
}
obj->maybe_create_connection();
// Allocate a timer to perform waits.
if (!timer_)
{
timer_.reset(new timer_block_type(obj_->pool_ex_));
obj_->shared_st_.pending_requests.push_back(*timer_);
}
// Wait to be notified, or until a cancellation happens
BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self);)
// Wait to be notified, or until a timeout happens
// Moving self may cause the unique_ptr we store to be set to null before the method call
tim = &timer_->timer;
tim->expires_at(timeout_);
if (thread_safe())
{
BOOST_MYSQL_YIELD(
resume_point_,
2,
tim->async_wait(asio::bind_executor(obj_->pool_ex_, std::move(self)))
)
}
else
{
BOOST_MYSQL_YIELD(resume_point_, 3, tim->async_wait(std::move(self)))
}
if (!ec)
{
// We've got a timeout. Try to give as much info as possible
result_ec_ = obj_->get_diagnostics(diag_);
break;
}
// Remember that we have waited, so completions are dispatched
// correctly
has_waited = true;
}
// Clean up the timer, which removes it from the list.
// Must be done within the strand, as it affects global state
has_waited = timer_.get();
timer_.reset();
// Perform any required dispatching before completing
if (thread_safe())
{
// Exit the strand
BOOST_MYSQL_YIELD(resume_point_, 4, obj_->exit_strand(self))
BOOST_MYSQL_YIELD(resume_point, 3, obj->exit_strand(self))
}
else if (!has_waited)
{
// This is an immediate completion
BOOST_MYSQL_YIELD(
resume_point_,
5,
resume_point,
4,
asio::async_immediate(self.get_io_executor(), std::move(self))
)
}
@ -365,6 +358,54 @@ class basic_pool_impl
}
};
// Cancel handler to use for get_connection in thread-safe mode.
// This imitates what Asio does for composed ops
struct get_connection_cancel_handler
{
// Pointer to the proxy cancellation signal
// Lifetime managed by the get_connection composed op
std::weak_ptr<asio::cancellation_signal> sig;
// Pointer to the pool object
std::weak_ptr<this_type> obj;
get_connection_cancel_handler(
std::weak_ptr<asio::cancellation_signal> sig,
std::weak_ptr<this_type> obj
) noexcept
: sig(std::move(sig)), obj(std::move(obj))
{
}
void operator()(asio::cancellation_type_t type)
{
if (get_connection_supports_cancel_type(type))
{
// Try to get the pool object back
std::shared_ptr<this_type> obj_shared = obj.lock();
if (obj_shared)
{
// Dispatch to the strand. We don't need to keep a reference to the
// pool because even if it was destroyed before running the handler,
// the strand would be alive.
auto sig_copy = sig;
asio::dispatch(asio::bind_executor(obj_shared->strand(), [sig_copy, type]() {
// If the operation has already completed, the weak ptr will be
// invalid
auto sig_shared = sig_copy.lock();
if (sig_shared)
{
sig_shared->emit(type);
}
}));
}
}
}
};
// Not thread-safe
void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
public:
basic_pool_impl(pool_executor_params&& ex_params, pool_params&& params)
: pool_ex_(
@ -373,36 +414,76 @@ public:
),
conn_ex_(std::move(ex_params.connection_executor)),
params_(make_internal_pool_params(std::move(params))),
shared_st_(pool_ex_),
wait_gp_(pool_ex_),
cancel_timer_(pool_ex_, (std::chrono::steady_clock::time_point::max)())
{
}
using executor_type = asio::any_io_executor;
asio::strand<asio::any_io_executor> strand()
{
BOOST_ASSERT(params_.thread_safe);
return *pool_ex_.template target<asio::strand<asio::any_io_executor>>();
}
using executor_type = asio::any_io_executor;
executor_type get_executor() { return params_.thread_safe ? strand().get_inner_executor() : pool_ex_; }
template <class CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
async_run(CompletionToken&& token)
void async_run(asio::any_completion_handler<void(error_code)> handler)
{
// Completely disable composed cancellation handling, as it's not what we want
auto slot = asio::get_associated_cancellation_slot(token);
auto token_without_slot = asio::bind_cancellation_slot(
asio::cancellation_slot(),
std::forward<CompletionToken>(token)
);
// Completely disable composed cancellation handling, as it's not what we
// want
auto slot = asio::get_associated_cancellation_slot(handler);
auto token_without_slot = asio::bind_cancellation_slot(asio::cancellation_slot(), std::move(handler));
// Initiate passing the original token's slot manually
return asio::async_compose<decltype(token_without_slot), void(error_code)>(
asio::async_compose<decltype(token_without_slot), void(error_code)>(
run_op(shared_from_this_wrapper(), slot),
token_without_slot,
pool_ex_
);
}
// Not thread-safe
void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
void async_get_connection(
diagnostics* diag,
asio::any_completion_handler<void(error_code, ConnectionWrapper)> handler
)
{
// The slot to pass for cleanup
asio::cancellation_slot parent_slot;
// The signal pointer. Will only be created if required
std::shared_ptr<asio::cancellation_signal> sig;
// In thread-safe mode, and if we have a connected slot, create a proxy
// signal that dispatches to the strand
if (params_.thread_safe)
{
parent_slot = asio::get_associated_cancellation_slot(handler);
if (parent_slot.is_connected())
{
// Create a signal. In rare cases, the memory acquired here may outlive
// the async operation (e.g. the completion handler runs after the
// signal is emitted and before the strand dispatch runs). This means we
// can't use the handler's allocator.
sig = std::make_shared<asio::cancellation_signal>();
// Emplace the handler
parent_slot.template emplace<get_connection_cancel_handler>(sig, shared_from_this_wrapper());
// Bind the handler to the slot
handler = asio::bind_cancellation_slot(sig->slot(), std::move(handler));
}
}
// Start
using handler_type = asio::any_completion_handler<void(error_code, ConnectionWrapper)>;
asio::async_compose<handler_type, void(error_code, ConnectionWrapper)>(
get_connection_op(shared_from_this_wrapper(), diag, std::move(sig), parent_slot),
handler,
pool_ex_
);
}
void cancel()
{
@ -465,139 +546,6 @@ public:
}
}
// Cancel handler to use for get_connection in thread-safe mode.
// This imitates what Asio does for composed ops
struct get_connection_cancel_handler
{
// The child cancellation signal to be emitted.
// Lifetime managed by the get_connection composed op
std::weak_ptr<asio::cancellation_signal> sig_ptr;
// The pool, required to get the strand
std::shared_ptr<this_type> self;
get_connection_cancel_handler(
std::weak_ptr<asio::cancellation_signal> sig_ptr,
std::shared_ptr<this_type> self
) noexcept
: sig_ptr(std::move(sig_ptr)), self(std::move(self))
{
}
void operator()(asio::cancellation_type_t type)
{
// The handler that should run through the strand
struct dispatch_handler
{
std::weak_ptr<asio::cancellation_signal> sig_ptr;
std::shared_ptr<this_type> self;
asio::cancellation_type_t cancel_type;
using executor_type = asio::any_io_executor;
executor_type get_executor() const { return self->strand(); }
void operator()()
{
// If the operation hans't completed yet
if (auto signal = sig_ptr.lock())
{
// Emit the child signal, which effectively causes the op to be cancelled
signal->emit(cancel_type);
}
}
};
if (get_connection_supports_cancel_type(type))
{
asio::dispatch(dispatch_handler{std::move(sig_ptr), std::move(self), type});
}
}
};
struct get_connection_initiation
{
template <class Handler>
void operator()(
Handler&& handler,
diagnostics* diag,
std::shared_ptr<this_type> self,
std::chrono::steady_clock::time_point timeout
)
{
// In thread-safe mode, the cancel handler must be run through the strand
auto slot = asio::get_associated_cancellation_slot(handler);
if (self->params().thread_safe && slot.is_connected())
{
// Create the child signal
auto sig = std::make_shared<asio::cancellation_signal>();
// The original slot will call the handler, which dispatches to the strand
slot.template emplace<get_connection_cancel_handler>(sig, self);
// Bind the handler to the new slot
auto bound_handler = asio::bind_cancellation_slot(
sig->slot(),
std::forward<Handler>(handler)
);
// Start
asio::async_compose<decltype(bound_handler), void(error_code, ConnectionWrapper)>(
get_connection_op(self, timeout, diag, std::move(sig)),
bound_handler,
self->pool_ex_
);
}
else
{
// Just start
asio::async_compose<Handler, void(error_code, ConnectionWrapper)>(
get_connection_op(self, timeout, diag, nullptr),
handler,
self->pool_ex_
);
}
}
};
template <class CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
async_get_connection(
std::chrono::steady_clock::time_point timeout,
diagnostics* diag,
CompletionToken&& token
)
{
return asio::async_initiate<CompletionToken, void(error_code, ConnectionWrapper)>(
get_connection_initiation{},
token,
std::move(diag),
shared_from_this_wrapper(),
timeout
);
}
template <class CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
async_get_connection(
std::chrono::steady_clock::duration timeout,
diagnostics* diag,
CompletionToken&& token
)
{
return async_get_connection(
timeout.count() > 0 ? std::chrono::steady_clock::now() + timeout
: (std::chrono::steady_clock::time_point::max)(),
diag,
std::forward<CompletionToken>(token)
);
}
asio::strand<asio::any_io_executor> strand()
{
BOOST_ASSERT(params_.thread_safe);
return *pool_ex_.template target<asio::strand<asio::any_io_executor>>();
}
// Exposed for testing
static bool run_supports_cancel_type(asio::cancellation_type_t v)
{

View File

@ -15,6 +15,7 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/error.hpp>
#include <chrono>
#include <cstddef>
@ -80,13 +81,13 @@ struct run_with_timeout_state
{
std::shared_ptr<this_type> st;
void operator()(error_code ec)
void operator()(error_code)
{
// If the op has already completed, we don't care about the timer's result
// Emitting the signal may call the handler inline, so we decrement first
if (st->remaining-- == 2u)
{
st->final_ec = ec ? client_errc::cancelled : client_errc::timeout;
st->final_ec = asio::error::operation_aborted;
st->op_signal.emit(asio::cancellation_type::terminal);
}
@ -128,8 +129,7 @@ struct run_with_timeout_state
// Handler must be a suitable completion handler. Arbitrary completion tokens are not supported.
// Handler is called with the following error code:
// - If the op finishes first, with op's error code.
// - If the timer finishes first, without interruptions, with client_errc::timeout.
// - If the timer finishes first because it was cancelled, with client_errc::cancelled.
// - If the timer finishes first, with asio::error::operation_aborted
// Both op and timer are run within the timer's executor.
// If timeout == 0, the timeout is disabled.
template <class Op, class Timer, class Handler>

View File

@ -11,6 +11,7 @@
#include <boost/mysql/diagnostics.hpp>
#include <boost/mysql/error_code.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
namespace boost {
@ -194,6 +195,51 @@ public:
connection_status status() const noexcept { return status_; }
};
// Composes a diagnostics object containing info about the last connect error.
// Suitable for the diagnostics output of async_get_connection
inline diagnostics create_connect_diagnostics(error_code connect_ec, const diagnostics& connect_diag)
{
diagnostics res;
if (connect_ec)
{
// Manipulating the internal representations is more efficient here,
// and better than using stringstream
auto& res_impl = access::get_impl(res);
const auto& connect_diag_impl = access::get_impl(connect_diag);
if (connect_ec == asio::error::operation_aborted)
{
// operation_aborted in this context means timeout
res_impl.msg = "Last connection attempt timed out";
res_impl.is_server = false;
}
else
{
// Add the error code information
res_impl.msg = "Last connection attempt failed with: ";
res_impl.msg += connect_ec.message();
res_impl.msg += " [";
res_impl.msg += connect_ec.to_string();
res_impl.msg += "]";
// Add any diagnostics
if (connect_diag_impl.msg.empty())
{
// The resulting object doesn't contain server-supplied info
res_impl.is_server = false;
}
else
{
// The resulting object may contain server-supplied info
res_impl.msg += ": ";
res_impl.msg += connect_diag_impl.msg;
res_impl.is_server = connect_diag_impl.is_server;
}
}
}
return res;
}
} // namespace detail
} // namespace mysql
} // namespace boost

View File

@ -1,58 +0,0 @@
//
// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_TIMER_LIST_HPP
#define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_TIMER_LIST_HPP
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
#include <cstddef>
namespace boost {
namespace mysql {
namespace detail {
template <class ClockType>
struct timer_block : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink>>
{
asio::basic_waitable_timer<ClockType> timer;
timer_block(asio::any_io_executor ex) : timer(std::move(ex)) {}
};
template <class ClockType>
class timer_list
{
intrusive::list<timer_block<ClockType>, intrusive::constant_time_size<false>> requests_;
public:
timer_list() = default;
void push_back(timer_block<ClockType>& req) noexcept { requests_.push_back(req); }
void notify_one()
{
for (auto& req : requests_)
{
if (req.timer.cancel())
return;
}
}
void notify_all()
{
for (auto& req : requests_)
req.timer.cancel();
}
std::size_t size() const noexcept { return requests_.size(); }
};
} // namespace detail
} // namespace mysql
} // namespace boost
#endif

View File

@ -78,11 +78,6 @@ bool boost::mysql::is_fatal_error(error_code ec) noexcept
case client_errc::row_type_mismatch:
case client_errc::static_row_parsing_error:
// While these are currently produced only by the connection pool,
// any timed out or cancelled operation would leave the connection in an undefined state
case client_errc::timeout:
case client_errc::cancelled:
// These are only produced by handshake. We categorize them as fatal because they need reconnection,
// although anything affecting handshake effectively does.
case client_errc::server_doesnt_support_ssl:

View File

@ -67,6 +67,13 @@ void poll_global_context(
source_location loc = BOOST_MYSQL_CURRENT_LOCATION
); // poll until done() == true
// TODO: refactor this
void poll_context(
asio::any_io_executor ex,
const bool* done,
source_location loc = BOOST_MYSQL_CURRENT_LOCATION
);
} // namespace test
} // namespace mysql
} // namespace boost

View File

@ -248,6 +248,35 @@ bool boost::mysql::test::is_initiation_function() { return g_is_running_initiati
static boost::asio::io_context g_ctx;
static void poll_context_impl(
boost::asio::io_context& ctx,
const std::function<bool()>& done,
boost::source_location loc
)
{
BOOST_TEST_CONTEXT("Called from " << loc)
{
using std::chrono::steady_clock;
// Restart the context, in case it was stopped
ctx.restart();
// Poll until this time point
constexpr std::chrono::seconds timeout(5);
auto timeout_tp = steady_clock::now() + timeout;
// Perform the polling
while (!done() && steady_clock::now() < timeout_tp)
{
ctx.poll();
std::this_thread::yield();
}
// Check for timeout
BOOST_TEST_REQUIRE(done());
}
}
boost::asio::any_io_executor boost::mysql::test::global_context_executor() { return g_ctx.get_executor(); }
void boost::mysql::test::poll_global_context()
@ -258,30 +287,15 @@ void boost::mysql::test::poll_global_context()
void boost::mysql::test::poll_global_context(const bool* done, source_location loc)
{
poll_global_context([done]() { return *done; }, loc);
poll_context_impl(g_ctx, [done]() { return *done; }, loc);
}
void boost::mysql::test::poll_global_context(const std::function<bool()>& done, source_location loc)
{
BOOST_TEST_CONTEXT("Called from " << loc)
{
using std::chrono::steady_clock;
poll_context_impl(g_ctx, done, loc);
}
// Restart the context, in case it was stopped
g_ctx.restart();
// Poll until this time point
constexpr std::chrono::seconds timeout(5);
auto timeout_tp = steady_clock::now() + timeout;
// Perform the polling
while (!done() && steady_clock::now() < timeout_tp)
{
g_ctx.poll();
std::this_thread::yield();
}
// Check for timeout
BOOST_TEST_REQUIRE(done());
}
void boost::mysql::test::poll_context(asio::any_io_executor ex, const bool* done, source_location loc)
{
poll_context_impl(static_cast<asio::io_context&>(ex.context()), [done]() { return *done; }, loc);
}

View File

@ -12,25 +12,33 @@
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/assert/source_location.hpp>
#include <functional>
#include "test_common/source_location.hpp"
#include "test_common/tracker_executor.hpp"
namespace boost {
namespace mysql {
namespace test {
#ifdef BOOST_ASIO_HAS_CO_AWAIT
inline void run_coro(boost::asio::any_io_executor ex, std::function<boost::asio::awaitable<void>(void)> fn)
inline void run_coro(
boost::asio::any_io_executor ex,
std::function<boost::asio::awaitable<void>(void)> fn,
source_location loc = BOOST_MYSQL_CURRENT_LOCATION
)
{
boost::asio::co_spawn(ex, fn, [](std::exception_ptr ptr) {
bool done = false;
boost::asio::co_spawn(ex, fn, [&](std::exception_ptr ptr) {
done = true;
if (ptr)
{
std::rethrow_exception(ptr);
}
});
auto& ctx = static_cast<boost::asio::io_context&>(ex.context());
ctx.restart();
ctx.run();
poll_context(ex, &done, loc);
}
#endif

View File

@ -298,8 +298,10 @@ BOOST_FIXTURE_TEST_CASE(connection_upper_limit, fixture)
// Getting another connection will block until one is returned.
// Since we won't return the one we have, the function time outs
pool.async_get_connection(std::chrono::milliseconds(1), diag, as_netresult)
.validate_error(client_errc::timeout);
pool.async_get_connection(diag, asio::cancel_after(std::chrono::milliseconds(1), asio::deferred))(
as_netresult
)
.validate_error(client_errc::no_connection_available);
// Cleanup the pool
pool.cancel();
@ -354,10 +356,10 @@ BOOST_DATA_TEST_CASE_F(fixture, cancel_get_connection, data::make({false, true})
// Cancel. This will make run and get_connection return
pool.cancel();
std::move(run_result).validate_no_error_nodiag();
std::move(getconn_result).validate_error(client_errc::cancelled);
std::move(getconn_result).validate_error(client_errc::pool_cancelled);
// Calling get_connection after cancel will return client_errc::cancelled
pool.async_get_connection(diag, as_netresult).validate_error(client_errc::cancelled);
// Calling get_connection after cancel will error
pool.async_get_connection(diag, as_netresult).validate_error(client_errc::pool_cancelled);
}
BOOST_FIXTURE_TEST_CASE(get_connection_pool_not_running, fixture)
@ -428,46 +430,15 @@ BOOST_FIXTURE_TEST_CASE(cancel_extends_pool_lifetime, fixture)
poll_global_context();
}
// Spotcheck: the different async_get_connection overloads work
BOOST_FIXTURE_TEST_CASE(get_connection_overloads, fixture)
// Spotcheck: the overload without diagnostics work
BOOST_FIXTURE_TEST_CASE(get_connection_no_diag, fixture)
{
connection_pool pool(global_context_executor(), create_pool_params());
auto run_result = pool.async_run(as_netresult);
// With all params
auto conn = pool.async_get_connection(std::chrono::hours(1), diag, as_netresult).get();
auto conn = pool.async_get_connection(as_netresult).get_nodiag();
conn->async_ping(as_netresult).validate_no_error();
// With timeout, without diag
conn = pool.async_get_connection(std::chrono::hours(1), as_netresult).get_nodiag();
conn->async_ping(as_netresult).validate_no_error();
// With diag, without timeout
conn = pool.async_get_connection(diag, as_netresult).get();
conn->async_ping(as_netresult).validate_no_error();
// Without diag, without timeout
conn = pool.async_get_connection(as_netresult).get_nodiag();
conn->async_ping(as_netresult).validate_no_error();
// Cleanup the pool
pool.cancel();
std::move(run_result).validate_no_error_nodiag();
}
// Spotcheck: async_get_connection timeouts work
BOOST_FIXTURE_TEST_CASE(get_connection_timeout, fixture)
{
// Create and run the pool
auto params = create_pool_params();
params.password = "bad_password"; // Guarantee that no connection will ever become available
connection_pool pool(global_context_executor(), std::move(params));
auto run_result = pool.async_run(as_netresult);
// Getting a connection will timeout. The error may be a generic
// timeout or a "bad password" error, depending on timing
pool.async_get_connection(std::chrono::milliseconds(1), diag, as_netresult).validate_any_error();
// Cleanup the pool
pool.cancel();
std::move(run_result).validate_no_error_nodiag();
@ -557,13 +528,6 @@ BOOST_FIXTURE_TEST_CASE(zero_timeuts, fixture)
auto conn = pool.async_get_connection(diag, as_netresult).get();
conn->async_ping(as_netresult).validate_no_error();
// Return the connection
conn = pooled_connection();
// Get the same connection again. A zero timeout for async_get_connection works, too
conn = pool.async_get_connection(std::chrono::seconds(0), diag, as_netresult).get();
conn->async_ping(as_netresult).validate_no_error();
// Cleanup the pool
pool.cancel();
std::move(run_result).validate_no_error_nodiag();
@ -585,11 +549,6 @@ BOOST_FIXTURE_TEST_CASE(cancel_after, fixture)
.get();
conn->async_ping(as_netresult).validate_no_error();
// The overload with a timeout also works
conn = pool.async_get_connection(timeout, diag, asio::cancel_after(timeout, asio::deferred))(as_netresult)
.get();
conn->async_ping(as_netresult).validate_no_error();
// Cleanup the pool
pool.cancel();
}
@ -600,7 +559,7 @@ BOOST_FIXTURE_TEST_CASE(async_run_per_operation_cancellation, fixture)
connection_pool pool(global_context_executor(), create_pool_params());
pool.async_run(asio::cancel_after(std::chrono::microseconds(1), asio::deferred))(as_netresult)
.validate_no_error_nodiag();
pool.async_get_connection(diag, as_netresult).validate_error(client_errc::cancelled);
pool.async_get_connection(diag, as_netresult).validate_error(client_errc::pool_cancelled);
}
// Spotcheck: per-operation cancellation works with async_get_connection
@ -617,7 +576,7 @@ BOOST_FIXTURE_TEST_CASE(async_get_connection_per_operation_cancellation, fixture
pool.async_get_connection(diag, asio::cancel_after(std::chrono::microseconds(1), asio::deferred))(
as_netresult
)
.validate_error(client_errc::cancelled);
.validate_error(client_errc::no_connection_available);
// Cleanup the pool
pool.cancel();
@ -689,7 +648,7 @@ BOOST_FIXTURE_TEST_CASE(cancel_after_partial_token, fixture)
co_await pool.async_get_connection(asio::cancel_after(std::chrono::microseconds(1))),
error_with_diagnostics,
[](const error_with_diagnostics& err) {
BOOST_TEST(err.code() == client_errc::cancelled);
BOOST_TEST(err.code() == client_errc::no_connection_available);
BOOST_TEST(err.get_diagnostics() == diagnostics());
return true;
}

View File

@ -13,6 +13,7 @@
#include <boost/mysql/with_diagnostics.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/thread_pool.hpp>
@ -39,12 +40,12 @@ boost::asio::awaitable<std::int64_t> get_num_employees(boost::mysql::connection_
{
// Get a fresh connection from the pool.
// pooled_connection is a proxy to an any_connection object.
boost::mysql::pooled_connection conn = co_await pool.async_get_connection(boost::asio::use_awaitable);
boost::mysql::pooled_connection conn = co_await pool.async_get_connection();
// Use pooled_connection::operator-> to access the underlying any_connection.
// Let's use the connection
results result;
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result, boost::asio::use_awaitable);
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result);
co_return result.rows().at(0).at(0).as_int64();
// When conn is destroyed, the connection is returned to the pool
@ -55,18 +56,28 @@ boost::asio::awaitable<void> return_without_reset(boost::mysql::connection_pool&
{
//[connection_pool_return_without_reset
// Get a connection from the pool
boost::mysql::pooled_connection conn = co_await pool.async_get_connection(boost::asio::use_awaitable);
boost::mysql::pooled_connection conn = co_await pool.async_get_connection();
// Use the connection in a way that doesn't mutate session state.
// We're not setting variables, preparing statements or starting transactions,
// so it's safe to skip reset
boost::mysql::results result;
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result, boost::asio::use_awaitable);
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result);
// Explicitly return the connection to the pool, skipping reset
conn.return_without_reset();
//]
}
boost::asio::awaitable<void> apply_timeout(boost::mysql::connection_pool& pool)
{
//[connection_pool_apply_timeout
// Get a connection from the pool, but don't wait more than 5 seconds
auto conn = co_await pool.async_get_connection(boost::asio::cancel_after(std::chrono::seconds(5)));
//]
conn.return_without_reset();
}
#endif
//[connection_pool_sync
@ -102,7 +113,18 @@ public:
// use_future returns a std::future<pooled_connection>.
// Calling get() waits for the future to complete and throws an exception on failure.
// with_diagnostics ensures that the exception contains any server-supplied information.
return conn_pool_.async_get_connection(timeout, with_diagnostics(boost::asio::use_future)).get();
// cancel_after applies a timeout to the operation
// <-
// clang-format off
// ->
return conn_pool_
.async_get_connection(
with_diagnostics(boost::asio::cancel_after(timeout, boost::asio::use_future))
)
.get();
// <-
// clang-format on
// ->
}
};
//]
@ -168,6 +190,7 @@ BOOST_AUTO_TEST_CASE(section_connection_pool)
pool.async_run(boost::asio::detached);
run_coro(ctx.get_executor(), [&pool]() -> boost::asio::awaitable<void> {
co_await return_without_reset(pool);
co_await apply_timeout(pool);
pool.cancel();
});
#endif

View File

@ -65,7 +65,7 @@ public:
asio::bind_executor(
strand_,
[this](error_code ec, mysql::pooled_connection c) {
if (ec == boost::mysql::client_errc::cancelled)
if (ec == boost::mysql::client_errc::no_connection_available)
{
// We got a cancellation. Increase timeout so we don't get stuck
// and try again

View File

@ -51,7 +51,6 @@ add_executable(
test/execution_processor/results_impl.cpp
test/execution_processor/static_results_impl.cpp
test/connection_pool/timer_list.cpp
test/connection_pool/wait_group.cpp
test/connection_pool/run_with_timeout.cpp
test/connection_pool/sansio_connection_node.cpp

View File

@ -60,7 +60,6 @@ run
test/execution_processor/results_impl.cpp
test/execution_processor/static_results_impl.cpp
test/connection_pool/timer_list.cpp
test/connection_pool/wait_group.cpp
test/connection_pool/run_with_timeout.cpp
test/connection_pool/sansio_connection_node.cpp

View File

@ -324,11 +324,8 @@ void deferred_spotcheck()
{
connection_pool pool(test::global_context_executor(), pool_params());
diagnostics diag;
std::chrono::seconds timeout(5);
(void)pool.async_run(asio::deferred);
(void)pool.async_get_connection(timeout, diag, asio::deferred);
(void)pool.async_get_connection(timeout, asio::deferred);
(void)pool.async_get_connection(diag, asio::deferred);
(void)pool.async_get_connection(asio::deferred);
}
@ -339,11 +336,8 @@ asio::awaitable<void> spotcheck_default_tokens()
{
connection_pool pool(test::global_context_executor(), pool_params());
diagnostics diag;
std::chrono::seconds timeout(5);
co_await pool.async_run();
co_await pool.async_get_connection(timeout, diag);
co_await pool.async_get_connection(timeout);
co_await pool.async_get_connection(diag);
co_await pool.async_get_connection();
}
@ -368,12 +362,9 @@ void spotcheck_partial_tokens()
{
connection_pool pool(test::global_context_executor(), pool_params());
diagnostics diag;
std::chrono::seconds timeout(5);
auto tok = asio::cancel_after(std::chrono::seconds(10));
check_op(pool.async_run(tok));
check_op(pool.async_get_connection(timeout, diag, tok));
check_op(pool.async_get_connection(timeout, tok));
check_op(pool.async_get_connection(diag, tok));
check_op(pool.async_get_connection(tok));
}

File diff suppressed because it is too large Load Diff

View File

@ -176,7 +176,7 @@ BOOST_FIXTURE_TEST_CASE(timer_first_ok, fixture)
{
// Run the op
run_with_timeout(io.async_f(asio::deferred), tim, seconds(60), [this](error_code ec) {
BOOST_TEST(ec == client_errc::timeout);
BOOST_TEST(ec == asio::error::operation_aborted);
set_finished();
});
@ -190,7 +190,7 @@ BOOST_FIXTURE_TEST_CASE(timer_first_cancelled, fixture)
{
// Run the op
run_with_timeout(io.async_f(asio::deferred), tim, seconds(60), [this](error_code ec) {
BOOST_TEST(ec == client_errc::cancelled);
BOOST_TEST(ec == asio::error::operation_aborted);
set_finished();
});

View File

@ -6,19 +6,31 @@
//
#include <boost/mysql/client_errc.hpp>
#include <boost/mysql/common_server_errc.hpp>
#include <boost/mysql/diagnostics.hpp>
#include <boost/mysql/error_categories.hpp>
#include <boost/mysql/error_code.hpp>
#include <boost/mysql/mysql_server_errc.hpp>
#include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
#include <boost/asio/error.hpp>
#include <boost/test/unit_test.hpp>
#include <cstddef>
#include <string>
#include "test_common/create_diagnostics.hpp"
#include "test_common/printing.hpp"
#include "test_unit/printing.hpp"
using namespace boost::mysql::detail;
using boost::mysql::client_errc;
using boost::mysql::error_code;
using namespace boost::mysql;
using namespace boost::mysql::test;
namespace asio = boost::asio;
using detail::collection_state;
using detail::connection_status;
using detail::next_connection_action;
using detail::sansio_connection_node;
BOOST_AUTO_TEST_SUITE(test_sansio_connection_node)
@ -153,7 +165,7 @@ BOOST_AUTO_TEST_CASE(connect_error)
mock_node nod(connection_status::connect_in_progress);
// Fail connecting
auto act = nod.resume(client_errc::timeout, collection_state::none);
auto act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::sleep_connect_failed);
nod.check(connection_status::sleep_connect_failed_in_progress, 0);
@ -179,7 +191,7 @@ BOOST_AUTO_TEST_CASE(ping_error)
nod.check(connection_status::ping_in_progress, exit_idle | enter_pending);
// Ping fails
act = nod.resume(client_errc::cancelled, collection_state::none);
act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::connect);
nod.check(connection_status::connect_in_progress, 0);
@ -200,7 +212,7 @@ BOOST_AUTO_TEST_CASE(reset_error)
nod.check(connection_status::reset_in_progress, enter_pending);
// Reset fails
act = nod.resume(client_errc::timeout, collection_state::none);
act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::connect);
nod.check(connection_status::connect_in_progress, 0);
@ -219,12 +231,12 @@ BOOST_AUTO_TEST_CASE(sleep_between_retries_fail)
mock_node nod(connection_status::connect_in_progress);
// Fail connecting
auto act = nod.resume(client_errc::timeout, collection_state::none);
auto act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::sleep_connect_failed);
nod.check(connection_status::sleep_connect_failed_in_progress, 0);
// Sleep reports an error. It will get ignored
act = nod.resume(client_errc::cancelled, collection_state::none);
act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::connect);
nod.check(connection_status::connect_in_progress, 0);
}
@ -238,7 +250,7 @@ BOOST_AUTO_TEST_CASE(idle_wait_fail)
mock_node nod(connection_status::idle);
// Idle wait failed. Error gets ignored
auto act = nod.resume(client_errc::cancelled, collection_state::none);
auto act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::ping);
nod.check(connection_status::ping_in_progress, exit_idle | enter_pending);
}
@ -252,7 +264,7 @@ BOOST_AUTO_TEST_CASE(idle_wait_fail_in_use)
mock_node nod(connection_status::in_use);
// Idle wait failed. Error gets ignored
auto act = nod.resume(client_errc::cancelled, collection_state::needs_collect_with_reset);
auto act = nod.resume(asio::error::operation_aborted, collection_state::needs_collect_with_reset);
BOOST_TEST(act == next_connection_action::reset);
nod.check(connection_status::reset_in_progress, enter_pending);
}
@ -283,17 +295,95 @@ BOOST_AUTO_TEST_CASE(cancel)
nod.cancel();
// Next action will always return none
auto act = nod.resume(client_errc::cancelled, collection_state::none);
auto act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::none);
nod.check(connection_status::terminated, tc.hooks);
// Cancel again does nothing
nod.cancel();
act = nod.resume(client_errc::cancelled, collection_state::none);
act = nod.resume(asio::error::operation_aborted, collection_state::none);
BOOST_TEST(act == next_connection_action::none);
nod.check(connection_status::terminated, 0);
}
}
}
// Connect diagnostics creation
BOOST_AUTO_TEST_CASE(create_connect_diagnostics_)
{
struct
{
string_view name;
error_code input_ec;
diagnostics input_diag;
diagnostics expected;
} test_cases[]{
// Success
{"no_error", error_code(), diagnostics(), diagnostics()},
// Edge case: no error but diagnostics is set. Just ignore its value
{"no_error_diag", error_code(), create_server_diag("something"), diagnostics()},
// Timeout (operation_aborted) gets special handling
{"timeout",
asio::error::operation_aborted,
diagnostics(),
create_client_diag("Last connection attempt timed out")},
// Network error numbers are OS-specific
{"network_error",
asio::error::network_reset,
diagnostics(),
create_client_diag(
"Last connection attempt failed with: " + error_code(asio::error::network_reset).message() +
" [system:" + std::to_string(static_cast<int>(asio::error::network_reset)) + "]"
)},
// Common server, with diagnostics
{"server_error_diag",
common_server_errc::er_no_such_table,
create_server_diag("Table 'abc' does not exist"),
create_server_diag("Last connection attempt failed with: er_no_such_table "
"[mysql.common-server:1146]: Table 'abc' does not exist")},
// Common server, without diagnostics. Results in a client message, because it contains no server
// output
{"server_error_nodiag",
common_server_errc::er_no_such_table,
create_server_diag(""),
create_client_diag("Last connection attempt failed with: er_no_such_table [mysql.common-server:1146]"
)},
// MySQL/MariaDB specific errors
{"specific_server_error",
error_code(mysql_server_errc::er_binlog_fatal_error, get_mysql_server_category()),
create_server_diag("something failed"),
create_server_diag("Last connection attempt failed with: er_binlog_fatal_error "
"[mysql.mysql-server:1593]: something failed")},
// A client error with diagnostics
{"client_error_diag",
client_errc::auth_plugin_requires_ssl,
create_client_diag("Something client-side failed"),
create_client_diag("Last connection attempt failed with: The authentication plugin requires the "
"connection to use SSL [mysql.client:7]: Something client-side failed")},
// A client error, no diagnostics
{"client_error_nodiag",
client_errc::auth_plugin_requires_ssl,
diagnostics(),
create_client_diag("Last connection attempt failed with: The authentication plugin requires the "
"connection to use SSL [mysql.client:7]")},
};
for (const auto& tc : test_cases)
{
BOOST_TEST_CONTEXT(tc.name)
{
const auto actual = detail::create_connect_diagnostics(tc.input_ec, tc.input_diag);
BOOST_TEST(actual == tc.expected);
}
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -1,232 +0,0 @@
//
// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/test/unit_test_suite.hpp>
#include <chrono>
#include <memory>
using namespace boost::mysql::detail;
namespace asio = boost::asio;
BOOST_AUTO_TEST_SUITE(test_timer_list)
// We use real timers to verify that cancel semantics integrate
// correctly with our code
using list_t = timer_list<std::chrono::steady_clock>;
using block_t = timer_block<std::chrono::steady_clock>;
struct fixture
{
list_t l;
asio::io_context ctx;
static void add_wait(block_t& blk)
{
blk.timer.expires_after(std::chrono::minutes(30));
blk.timer.async_wait(asio::detached);
}
};
BOOST_FIXTURE_TEST_CASE(notify_one_empty, fixture)
{
BOOST_TEST(l.size() == 0u);
// notify_one doesn't crash if the list is empty
BOOST_CHECK_NO_THROW(l.notify_one());
}
BOOST_FIXTURE_TEST_CASE(notify_one_several_timers, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
// Add waits on them
add_wait(t1);
add_wait(t2);
// Add them to the list
l.push_back(t1);
l.push_back(t2);
BOOST_TEST(l.size() == 2u);
// Notify causes the first one to be cancelled
l.notify_one();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 1u);
}
BOOST_FIXTURE_TEST_CASE(notify_one_timer_multiple_waits, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
// Add multiple waits to the first one
add_wait(t1);
add_wait(t2);
t1.timer.async_wait(asio::detached);
// Add them to the list
l.push_back(t1);
l.push_back(t2);
BOOST_TEST(l.size() == 2u);
// Notify causes all waits on the first one to be cancelled
l.notify_one();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 1u);
}
BOOST_FIXTURE_TEST_CASE(notify_one_timer_already_cancelled, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
block_t t3(ctx.get_executor());
// Add waits on them
add_wait(t1);
add_wait(t2);
add_wait(t3);
// Add them to the list
l.push_back(t1);
l.push_back(t2);
l.push_back(t3);
BOOST_TEST(l.size() == 3u);
// The first notify cancels the first timer, the second notify, the second timer
l.notify_one();
l.notify_one();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 0u);
BOOST_TEST(t3.timer.cancel() == 1u);
}
BOOST_FIXTURE_TEST_CASE(notify_one_all_timers_cancelled, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
// Add waits on them
add_wait(t1);
add_wait(t2);
// Add them to the list
l.push_back(t1);
l.push_back(t2);
BOOST_TEST(l.size() == 2u);
// The third call will find all timers already cancelled
l.notify_one();
l.notify_one();
l.notify_one();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 0u);
}
BOOST_FIXTURE_TEST_CASE(notify_one_timer_without_wait, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
block_t t3(ctx.get_executor());
// Add waits on t2 and t3, but not t1. This can happen
// if we insert the timer on the list before adding a wait
add_wait(t2);
add_wait(t3);
// Insert them on the list
l.push_back(t1);
l.push_back(t2);
l.push_back(t3);
BOOST_TEST(l.size() == 3u);
// Since there's nothing to notify on t1, we notify t2
l.notify_one();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 0u);
BOOST_TEST(t3.timer.cancel() == 1u);
}
BOOST_FIXTURE_TEST_CASE(notify_all_empty, fixture)
{
// notify_all doesn't crash if the list is empty
BOOST_CHECK_NO_THROW(l.notify_all());
}
BOOST_FIXTURE_TEST_CASE(notify_all_some_timers, fixture)
{
// Create timers
block_t t1(ctx.get_executor());
block_t t2(ctx.get_executor());
// Add waits on them
add_wait(t1);
add_wait(t2);
// Add them to the list
l.push_back(t1);
l.push_back(t2);
BOOST_TEST(l.size() == 2u);
// Notify cancels all timers
l.notify_all();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 0u);
// Notifying again is a no-op
l.notify_all();
BOOST_TEST(t1.timer.cancel() == 0u);
BOOST_TEST(t2.timer.cancel() == 0u);
}
BOOST_FIXTURE_TEST_CASE(auto_unlink, fixture)
{
// Create timers (in dynamic memory so we can delete them easily)
std::unique_ptr<block_t> t1{new block_t(ctx.get_executor())};
std::unique_ptr<block_t> t2{new block_t(ctx.get_executor())};
std::unique_ptr<block_t> t3{new block_t(ctx.get_executor())};
// Add waits on some of them
add_wait(*t2);
add_wait(*t3);
// Add them to the list
l.push_back(*t1);
l.push_back(*t2);
l.push_back(*t3);
BOOST_TEST(l.size() == 3u);
// Delete t2
t2.reset();
BOOST_TEST(l.size() == 2u);
// Delete t1
t1.reset();
BOOST_TEST(l.size() == 1u);
// notify_one doesn't try to notify deleted timers
l.notify_one();
BOOST_TEST(t3->timer.cancel() == 0u);
// Delete the last one
t3.reset();
BOOST_TEST(l.size() == 0u);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -123,7 +123,7 @@ BOOST_AUTO_TEST_CASE(move_assign)
std::unique_ptr<archetype_context> source{new archetype_context(opts, string_archetype(42))};
source->append_raw("SELECT ");
archetype_context ctx(ascii_opts, string_archetype(42));
ctx.append_raw("abc").add_error(client_errc::cancelled);
ctx.append_raw("abc").add_error(client_errc::wrong_num_params);
// Assign
ctx = std::move(*source);

View File

@ -191,11 +191,11 @@ BOOST_AUTO_TEST_CASE(error_formatting_element)
{
auto fn = [](int v, format_context_base& ctx) {
if (v == 42)
ctx.add_error(client_errc::cancelled);
ctx.add_error(client_errc::wrong_num_params);
format_sql_to(ctx, "{}", v);
};
std::vector<int> col{1, 42, 10};
BOOST_TEST(format_single_error("{}", sequence(col, fn)) == client_errc::cancelled);
BOOST_TEST(format_single_error("{}", sequence(col, fn)) == client_errc::wrong_num_params);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -66,10 +66,6 @@ BOOST_AUTO_TEST_CASE(test_is_fatal_error)
{"row_type_mismatch", client_errc::row_type_mismatch, true },
{"static_row_parsing_error", client_errc::static_row_parsing_error, true },
// Client errors representing cancellations
{"client_timeout", client_errc::timeout, true },
{"client_cancelled", client_errc::cancelled, true },
// Client errors affecting handshake
{"server_unsupported", client_errc::server_unsupported, true },
{"unknown_auth_plugin", client_errc::unknown_auth_plugin, true },