forked from madelson/DistributedLock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTestingPostgresDb.cs
More file actions
70 lines (58 loc) · 3.33 KB
/
TestingPostgresDb.cs
File metadata and controls
70 lines (58 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
using Medallion.Threading.Internal;
using Medallion.Threading.Tests.Data;
using Npgsql;
using NpgsqlTypes;
using NUnit.Framework;
using System.Data;
using System.Data.Common;
namespace Medallion.Threading.Tests.Postgres;
public sealed class TestingPostgresDb : TestingPrimaryClientDb
{
internal static readonly string DefaultConnectionString = PostgresDb.Container.GetConnectionString();
private readonly NpgsqlConnectionStringBuilder _connectionStringBuilder = new(DefaultConnectionString);
public override DbConnectionStringBuilder ConnectionStringBuilder => this._connectionStringBuilder;
// https://til.hashrocket.com/posts/8f87c65a0a-postgresqls-max-identifier-length-is-63-bytes
public override int MaxApplicationNameLength => 63;
/// <summary>
/// Technically Postgres does support this through xact advisory lock methods, but it is very unwieldy to use due to the transaction
/// abort semantics and largely unnecessary for our purposes since, unlike SQLServer, a connection-scoped Postgres lock can still
/// participate in an ongoing transaction.
/// </summary>
public override TransactionSupport TransactionSupport => TransactionSupport.ImplicitParticipation;
public override int CountActiveSessions(string applicationName)
{
Invariant.Require(applicationName.Length <= this.MaxApplicationNameLength);
using var connection = new NpgsqlConnection(DefaultConnectionString);
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = "SELECT COUNT(*)::int FROM pg_stat_activity WHERE application_name = @applicationName";
command.Parameters.AddWithValue("applicationName", applicationName);
return (int)command.ExecuteScalar()!;
}
public override IsolationLevel GetIsolationLevel(DbConnection connection)
{
using var command = connection.CreateCommand();
// values based on https://www.postgresql.org/docs/12/transaction-iso.html
command.CommandText = "SELECT REPLACE(current_setting('transaction_isolation'), ' ', '')";
return (IsolationLevel)Enum.Parse(typeof(IsolationLevel), (string)command.ExecuteScalar()!, ignoreCase: true);
}
public override DbConnection CreateConnection() => new NpgsqlConnection(this.ConnectionStringBuilder.ConnectionString);
public override async Task KillSessionsAsync(string applicationName, DateTimeOffset? idleSince)
{
using var connection = new NpgsqlConnection(DefaultConnectionString);
await connection.OpenAsync();
using var command = connection.CreateCommand();
// based on https://stackoverflow.com/questions/13236160/is-there-a-timeout-for-idle-postgresql-connections
command.CommandText = @"
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE application_name = @applicationName
AND (
@idleSince IS NULL
OR (state = 'idle' AND state_change < @idleSince)
)";
command.Parameters.AddWithValue("applicationName", applicationName);
command.Parameters.Add(new NpgsqlParameter("idleSince", idleSince?.ToUniversalTime() ?? DBNull.Value.As<object>()) { NpgsqlDbType = NpgsqlDbType.TimestampTz });
await command.ExecuteNonQueryAsync();
}
}