1
0
mirror of https://github.com/bitwarden/server.git synced 2025-07-06 10:32:49 -05:00

[PM-17562] Initial POC of Distributed Events (#5323)

* Initial POC of Distributed Events

* Apply suggestions from code review

Co-authored-by: Justin Baur <19896123+justindbaur@users.noreply.github.com>

* Clean up files to support accepted changes. Address PR Feedback

* Removed unneeded using to fix lint warning

* Moved config into a common EventLogging top-level item. Fixed issues from PR review

* Optimized per suggestion from justinbaur

Co-authored-by: Justin Baur <19896123+justindbaur@users.noreply.github.com>

* Updated to add IAsyncDisposable as suggested in PR review

* Updated with suggestion to use KeyedSingleton for the IEventWriteService

* Changed key case to lowercase

---------

Co-authored-by: Justin Baur <19896123+justindbaur@users.noreply.github.com>
This commit is contained in:
Brant DeBow
2025-01-30 11:07:02 -06:00
committed by GitHub
parent 443a147433
commit 5efd68cf51
10 changed files with 311 additions and 4 deletions

View File

@ -0,0 +1,35 @@
using System.Net.Http.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;
namespace Bit.Core.Services;
public class RabbitMqEventHttpPostListener : RabbitMqEventListenerBase
{
private readonly HttpClient _httpClient;
private readonly string _httpPostUrl;
private readonly string _queueName;
protected override string QueueName => _queueName;
public const string HttpClientName = "EventHttpPostListenerHttpClient";
public RabbitMqEventHttpPostListener(
IHttpClientFactory httpClientFactory,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_httpClient = httpClientFactory.CreateClient(HttpClientName);
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl;
_queueName = globalSettings.EventLogging.RabbitMq.HttpPostQueueName;
}
protected override async Task HandleMessageAsync(EventMessage eventMessage)
{
var content = JsonContent.Create(eventMessage);
var response = await _httpClient.PostAsync(_httpPostUrl, content);
response.EnsureSuccessStatusCode();
}
}

View File

@ -0,0 +1,93 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public abstract class RabbitMqEventListenerBase : BackgroundService
{
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerBase> _logger;
protected abstract string QueueName { get; }
protected RabbitMqEventListenerBase(
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_logger = logger;
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
await _channel.QueueDeclareAsync(queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: QueueName,
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
try
{
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
await HandleMessageAsync(eventMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while processing the message");
}
};
await _channel.BasicConsumeAsync(QueueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1_000, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _channel.CloseAsync();
await _connection.CloseAsync();
await base.StopAsync(cancellationToken);
}
public override void Dispose()
{
_channel.Dispose();
_connection.Dispose();
base.Dispose();
}
protected abstract Task HandleMessageAsync(EventMessage eventMessage);
}

View File

@ -0,0 +1,29 @@
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Bit.Core.Services;
public class RabbitMqEventRepositoryListener : RabbitMqEventListenerBase
{
private readonly IEventWriteService _eventWriteService;
private readonly string _queueName;
protected override string QueueName => _queueName;
public RabbitMqEventRepositoryListener(
[FromKeyedServices("persistent")] IEventWriteService eventWriteService,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_eventWriteService = eventWriteService;
_queueName = globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName;
}
protected override Task HandleMessageAsync(EventMessage eventMessage)
{
return _eventWriteService.CreateAsync(eventMessage);
}
}

View File

@ -0,0 +1,65 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using RabbitMQ.Client;
namespace Bit.Core.Services;
public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;
public RabbitMqEventWriteService(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}
public async Task CreateAsync(IEvent e)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
var body = JsonSerializer.SerializeToUtf8Bytes(e);
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
foreach (var e in events)
{
var body = JsonSerializer.SerializeToUtf8Bytes(e);
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
}
public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}
private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}
}