Background
Dapper is a lightweight ORM for use in high performance database transactions that uses hand written SQL scripts and transforms the results into objects. This approach makes interacting with a database both performant and easy, as you can fine tune the SQL to your taste. Of course there are other high performant ORM's available, EF Core 6 has made performance a key focus area of improvement but we'll focus on Dapper here. We want our solution to to extend all the built-in functionality of Dapper and add layer of resiliency that manages transient database errors and other error types. We'll be handling exceptions with the use of a few different Polly policies.
Challenge
We want to gracefully handle transient errors, meaning we want to retry our request with a minimal amount of boiler plate. Polly makes this trivial. Next we want to gracefully handle networking errors that could corrupt our connection pool by both retrying our request and clearing the connection pool. We also want to set a high level timeout for all our operations so that our end-user is not waiting forever for a response. Finally if we encounter a fatal exception we want to bypass our retry logic and prevent additional retries with the poison request.
Solution
Error Numbers We'll Handle
First let's go through what errors we're going to handle. We're going to focus on the SqlException
here with the property Number
. These numbers indicate the type of error that caused the exception. If more than one error occurred it's important to know that only the first error number is reported. This information is gathered from mutliple sources including Offcial docs and the master.dbo.sysmessages
table
Transient Errors
- 40613 - Database is not currently available.
- 40197 - The service has encountered an error processing your request. Please try again.
- 40501 - The service is currently busy.
- 49918 - Cannot process request. Not enough resources to process request.
- 40549 - Session is terminated because you have a long running transaction.
- 40550 - The session has been terminated because it has acquired too many locks.
- 1205 - Transaction was deadlocked resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
Networking Errors
- 258 - Cannot call methods on server.
- 26 - Error Locating server specified.
- 40 - Could not open a connection to the server.
- 10053 - A transport-level error has occurred when receiving results from the server.
Constraint Violation Errors
- 2627 - Cannot insert duplicate key.
- 547 - The statement conflicted with the constraint.
2601 - Cannot insert duplicate key row in object.
Our Policies
We'll want to handle these errors differently and execute different paths depending on the type of error received. One policy won't do. Let's take a look at what we want our policies to behave like:
First In Line, Timeout
We want to set up a timeout for all our actions to take place; Dapper has the ability to set timeouts per command but we want a global timeout on our entire request. Polly makes this easy. For this, we setup a simple
Timeout Async
policyvar timeoutPolicy = Policy.TimeoutAsync(maxTimeout ?? TimeSpan.FromMinutes(2));
Next, We Handle Transient Errors
For our transient errors we want to retry and to log a warning with the associated sql and parameters that were associated with the transient error. We do this by utilizing the execution
Context
that we'll describe more later and set up aWait And Retry Async
policy.var transientPolicy = Policy.Handle<SqlException>(ex => transientNumbers.Contains(ex.Number)) .WaitAndRetryAsync( transientRetries, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), (ex, _, ctx) => ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered Transient SqlException. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]));
Next, The Networking Errors
For networking errors we want another
Wait And Retry Async
policy and not only log, as we did with transient error, but we also want to clear the connection pool in case it's become corrupted. We do this again by accessing available data within ourContext
.var networkPolicy = Policy.Handle<SqlException>(ex => networkingNumbers.Contains(ex.Number)) .WaitAndRetryAsync( networkRetries, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), (ex, _, ctx) => { ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered a Network Error. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]); if (ctx.TryGetConnection(out var connection)) SqlConnection.ClearPool(connection); });
Next, Constraint Violations
If we violate a constraint with a combination of query and parameters a retry is not going to solve the problem. In this case we create a
Circuit Breaker
that opens and never closes again, lggoing and returning the error straight back to the consumer.var constraintPolicy = Policy.Handle<SqlException>(ex => constraintViolationNumbers.Contains(ex.Number)) .CircuitBreakerAsync( 1, TimeSpan.MaxValue, (ex, _, ctx) => ctx.GetLogger()?.LogError(ex, "{@Operation} Encountered a Constraint Violation. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]), ctx => { } );
Putting The Policies Together
Finally we wrap our policies in our desired execution order and return our fully formed resiliency policy.
var resiliencyPolicy = timeoutPolicy .WrapAsync(transientPolicy) .WrapAsync(networkPolicy) .WrapAsync(constraintPolicy); return resiliencyPolicy;
All Together Now
And here's what all this together looks like.
public static class SqlResiliencyPolicy { private static readonly ISet<int> transientNumbers = new HashSet<int>(new[] { 40613, 40197, 40501, 49918, 40549, 40550, 1205 }); private static readonly ISet<int> networkingNumbers = new HashSet<int>(new[] { 258, -2, 10060, 0, 64, 26, 40, 10053 }); private static readonly ISet<int> constraintViolationNumbers = new HashSet<int>(new[] { 2627, 547, 2601 }); public static IAsyncPolicy GetSqlResiliencyPolicy(TimeSpan? maxTimeout = null, int transientRetries = 3, int networkRetries = 3) { var timeoutPolicy = Policy.TimeoutAsync(maxTimeout ?? TimeSpan.FromMinutes(2)); var transientPolicy = Policy.Handle<SqlException>(ex => transientNumbers.Contains(ex.Number)) .WaitAndRetryAsync( transientRetries, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), (ex, _, ctx) => ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered Transient SqlException. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey])); var networkPolicy = Policy.Handle<SqlException>(ex => networkingNumbers.Contains(ex.Number)) .WaitAndRetryAsync( networkRetries, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), (ex, _, ctx) => { ctx.GetLogger()?.LogWarning(ex, "{@Operation} Encountered a Network Error. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]); if (ctx.TryGetConnection(out var connection)) SqlConnection.ClearPool(connection); }); var constraintPolicy = Policy.Handle<SqlException>(ex => constraintViolationNumbers.Contains(ex.Number)) .CircuitBreakerAsync( 1, TimeSpan.MaxValue, (ex, _, ctx) => ctx.GetLogger()?.LogError(ex, "{@Operation} Encountered a Constraint Violation. Params:{@Param} Sql:{@Sql}", ctx.OperationKey, ctx[ParamContextKey], ctx[SqlContextKey]), ctx => { } ); var resiliencyPolicy = timeoutPolicy .WrapAsync(transientPolicy) .WrapAsync(networkPolicy) .WrapAsync(constraintPolicy); return resiliencyPolicy; } }
The Context
As you can see we make a lot of use of the
Context
in our policies. TheContext
object is scoped to a particular execution of a policy and provides a way to pass contextual information about the delegate that's being executed. Learn more with Using Execution Context in Polly We use it to store the requested sql script, the parameters, our logger and our connection. So to make this simpler we have aContext
helper static class.public static class ContextHelper { public static readonly string LoggerContextKey = nameof(LoggerContextKey); public static readonly string SqlContextKey = nameof(SqlContextKey); public static readonly string ParamContextKey = nameof(ParamContextKey); public static readonly string ConnectionContextKey = nameof(ConnectionContextKey); public static Context NewContext( SqlConnection connection, ILogger logger, string sql, object param, string operationKey) { return new Context(operationKey, new Dictionary<string, object>() { { ConnectionContextKey, connection }, { LoggerContextKey, logger }, { SqlContextKey, sql }, { ParamContextKey, param } }); } public static ILogger GetLogger(this Context ctx) => ctx[LoggerContextKey] as ILogger; public static bool TryGetConnection(this Context ctx, out SqlConnection connection) => (connection = ctx[ConnectionContextKey] as SqlConnection) is not null ? true : false; }
Registration With DI
We need to register three items with our DI: A factory for our
SqlConnection
, our resiliency policy, and our yet to be revealedSqlDapperClient
.public delegate SqlConnection SqlConnectionFactory(); public static void AddSqlDapperClient(this IServiceCollection services, string connectionString) { services.AddSingleton<SqlConnectionFactory>(() => new SqlConnection(connectionString)); services.AddScoped(_ => SqlResiliencyPolicy.GetSqlResiliencyPolicy()); services.AddScoped<ISqlDapperClient, SqlDapperClient>(); }
Notice that our
Resiliency Policy
andDapper Client
are scoped. This means we'll get a new instance for each request and we're not sharing the policy across requests. We could have used a more elaborate setup if we needed our client to be singleton and used mutableContext
to affect the state of our resiliency policy in the case of, for instance, the Constraint Violation exception, but we're just keeping it scoped here for simplicities sake.The Dapper Wrapper, err... Client
Dapper is primarily a set of extension methods on
IDbConnection
. Now, while it is highly recommended to integration test your data access layer, I find it's also useful to unit test the layer above that, the layer that interacts with data access should be tested to ensure it passes the correct sql script and expected arguments. To facilitate this we need a way to mock the behavior ofDapper
. Mocking the complete set of interactions fromIDbConnection
to aSqlCommand
is laborious and we can simply create a client whose only purpose is to expose the extension methods ofDapper
as a mockable interface and whose design is one that necessitate integration testing with a testing database. We expose an interface with the methods we'd like to mock, this is just a sampling of whatDapper
offers but can easily be expanded.public interface ISqlDapperClient { Task<int> ExecuteAsync(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null); Task<T> ExecuteScalarAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null); Task<IEnumerable<T>> QueryAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null); Task<T> QueryFirstOrDefaultAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null); Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TReturn>(string sql, Func<TFirst, TSecond, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null); Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TThird, TReturn>(string sql, Func<TFirst, TSecond, TThird, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null); }
Then in the implementation we inject our
Logger
,Resiliency Policy
and ourSqlConnection Factory
and proxy theDapper
calls through our resiliency policy.public class SqlDapperClient : ISqlDapperClient { private readonly ILogger<SqlDapperClient> logger; private readonly SqlConnectionFactory connectionFactory; private readonly IAsyncPolicy resiliencyPolicy; public SqlDapperClient( ILogger<SqlDapperClient> logger, SqlConnectionFactory connectionFactory, IAsyncPolicy resiliencyPolicy) { this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); this.connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); this.resiliencyPolicy = resiliencyPolicy ?? throw new ArgumentNullException(nameof(resiliencyPolicy)); } public Task<int> ExecuteAsync(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.ExecuteAsync(s, p, transaction, commandTimeout, commandType), sql, param); public Task<T> ExecuteScalarAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.ExecuteScalarAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param); public Task<T> QueryFirstOrDefaultAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.QueryFirstOrDefaultAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param); public Task<IEnumerable<T>> QueryAsync<T>(string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.QueryAsync<T>(s, p, transaction, commandTimeout, commandType), sql, param); public Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TReturn>(string sql, Func<TFirst, TSecond, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.QueryAsync(s, map, p, transaction, buffered, splitOn, commandTimeout, commandType), sql, param); public Task<IEnumerable<TReturn>> QueryAsync<TFirst, TSecond, TThird, TReturn>(string sql, Func<TFirst, TSecond, TThird, TReturn> map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null) => ExecuteWithResiliency((s, p, c) => c.QueryAsync(s, map, p, transaction, buffered, splitOn, commandTimeout, commandType), sql, param); private async Task<T> ExecuteWithResiliency<T>(Func<string, object, SqlConnection, Task<T>> connectionFunc, string sql, object param = null, [CallerMemberName] string operation = "") { using var connection = connectionFactory(); return await resiliencyPolicy.ExecuteAsync( ctx => connectionFunc(sql, param, connection), NewContext(connection, logger, sql, param, operation)); } }
Notice we bring everything down to the
ExecuteWithResiliency
method that captures the extension method we want to execute, the sql we want to run and the parameters object that we pass toDapper
. In this method we create the connection and the instance of the executionContext
and run our captured extension method with the provided arguments.Summary
We have seen how to make a
SqlDapperClient
client resilient to transient errors with a global timeout on operations and reject constraint violations. And we've also seen how to make great use ofPolly's
executionContext
to pass execution specific details to our policies. Finally we introduced a mockable wrapper forDapper
that will let us assert the against the passed arguments to the underlying extension methods. That's it for this post, hope you enjoyed it.