1
0
mirror of https://github.com/bitwarden/server.git synced 2025-06-30 15:42:48 -05:00

[PM-17562] Refactor existing RabbitMq implementation (#5357)

* [PM-17562] Refactor existing RabbitMq implementation

* Fixed issues noted in PR review
This commit is contained in:
Brant DeBow
2025-02-04 08:02:43 -06:00
committed by GitHub
parent f1b9bd9a09
commit 3f3da558b6
11 changed files with 162 additions and 57 deletions

View File

@ -0,0 +1,13 @@
using Microsoft.Extensions.Hosting;
namespace Bit.Core.Services;
public abstract class EventLoggingListenerService : BackgroundService
{
protected readonly IEventMessageHandler _handler;
protected EventLoggingListenerService(IEventMessageHandler handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}
}

View File

@ -0,0 +1,92 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public class RabbitMqEventListenerService : EventLoggingListenerService
{
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerService> _logger;
private readonly string _queueName;
public RabbitMqEventListenerService(
IEventMessageHandler handler,
ILogger<RabbitMqEventListenerService> logger,
GlobalSettings globalSettings,
string queueName) : base(handler)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_logger = logger;
_queueName = queueName;
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
await _channel.QueueDeclareAsync(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _queueName,
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, eventArgs) =>
{
try
{
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
await _handler.HandleEventAsync(eventMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while processing the message");
}
};
await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1_000, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _channel.CloseAsync();
await _connection.CloseAsync();
await base.StopAsync(cancellationToken);
}
public override void Dispose()
{
_channel.Dispose();
_connection.Dispose();
base.Dispose();
}
}