mirror of
https://github.com/bitwarden/server.git
synced 2025-07-06 10:32:49 -05:00
[PM-17562] Add Azure Service Bus for Distributed Events (#5382)
* [PM-17562] Add Azure Service Bus for Distributed Events * Fix failing test * Addressed issues mentioned in SonarQube * Respond to PR feedback * Respond to PR feedback - make webhook opt-in, remove message body from log
This commit is contained in:
@ -0,0 +1,13 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public abstract class EventLoggingListenerService : BackgroundService
|
||||
{
|
||||
protected readonly IEventMessageHandler _handler;
|
||||
|
||||
protected EventLoggingListenerService(IEventMessageHandler handler)
|
||||
{
|
||||
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
using System.Text.Json;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private readonly ILogger<AzureServiceBusEventListenerService> _logger;
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
|
||||
public AzureServiceBusEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
ILogger<AzureServiceBusEventListenerService> logger,
|
||||
GlobalSettings globalSettings,
|
||||
string subscriptionName) : base(handler)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.TopicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += async args =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var eventMessage = JsonSerializer.Deserialize<EventMessage>(args.Message.Body.ToString());
|
||||
|
||||
await _handler.HandleEventAsync(eventMessage);
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An error occured while processing message: {MessageId}",
|
||||
args.Message.MessageId
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
_processor.ProcessErrorAsync += args =>
|
||||
{
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_processor.DisposeAsync().GetAwaiter().GetResult();
|
||||
_client.DisposeAsync().GetAwaiter().GetResult();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
using System.Text.Json;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Services;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.AdminConsole.Services.Implementations;
|
||||
|
||||
public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _sender;
|
||||
|
||||
public AzureServiceBusEventWriteService(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.TopicName);
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e))
|
||||
{
|
||||
ContentType = "application/json"
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
foreach (var e in events)
|
||||
{
|
||||
await CreateAsync(e);
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureTableStorageEventHandler(
|
||||
[FromKeyedServices("persistent")] IEventWriteService eventWriteService)
|
||||
: IEventMessageHandler
|
||||
{
|
||||
public Task HandleEventAsync(EventMessage eventMessage)
|
||||
{
|
||||
return eventWriteService.CreateManyAsync(EventTableEntity.IndexEvent(eventMessage));
|
||||
}
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private IChannel _channel;
|
||||
private IConnection _connection;
|
||||
private readonly string _exchangeName;
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly ILogger<RabbitMqEventListenerService> _logger;
|
||||
private readonly string _queueName;
|
||||
|
||||
public RabbitMqEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
ILogger<RabbitMqEventListenerService> logger,
|
||||
GlobalSettings globalSettings,
|
||||
string queueName) : base(handler)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
|
||||
_logger = logger;
|
||||
_queueName = queueName;
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_connection = await _factory.CreateConnectionAsync(cancellationToken);
|
||||
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
|
||||
await _channel.ExchangeDeclareAsync(exchange: _exchangeName,
|
||||
type: ExchangeType.Fanout,
|
||||
durable: true,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueDeclareAsync(queue: _queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueBindAsync(queue: _queueName,
|
||||
exchange: _exchangeName,
|
||||
routingKey: string.Empty,
|
||||
cancellationToken: cancellationToken);
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var consumer = new AsyncEventingBasicConsumer(_channel);
|
||||
consumer.ReceivedAsync += async (_, eventArgs) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
|
||||
await _handler.HandleEventAsync(eventMessage);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "An error occurred while processing the message");
|
||||
}
|
||||
};
|
||||
|
||||
await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _channel.CloseAsync(cancellationToken);
|
||||
await _connection.CloseAsync(cancellationToken);
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_channel.Dispose();
|
||||
_connection.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
@ -4,25 +4,25 @@ using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class HttpPostEventHandler : IEventMessageHandler
|
||||
public class WebhookEventHandler : IEventMessageHandler
|
||||
{
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly string _httpPostUrl;
|
||||
private readonly string _webhookUrl;
|
||||
|
||||
public const string HttpClientName = "HttpPostEventHandlerHttpClient";
|
||||
public const string HttpClientName = "WebhookEventHandlerHttpClient";
|
||||
|
||||
public HttpPostEventHandler(
|
||||
public WebhookEventHandler(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
GlobalSettings globalSettings)
|
||||
{
|
||||
_httpClient = httpClientFactory.CreateClient(HttpClientName);
|
||||
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl;
|
||||
_webhookUrl = globalSettings.EventLogging.WebhookUrl;
|
||||
}
|
||||
|
||||
public async Task HandleEventAsync(EventMessage eventMessage)
|
||||
{
|
||||
var content = JsonContent.Create(eventMessage);
|
||||
var response = await _httpClient.PostAsync(_httpPostUrl, content);
|
||||
var response = await _httpClient.PostAsync(_webhookUrl, content);
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user