From 02262476d6b13915b5c2313353d048702e4a84a5 Mon Sep 17 00:00:00 2001 From: Brant DeBow <125889545+brant-livefront@users.noreply.github.com> Date: Tue, 11 Feb 2025 10:20:06 -0500 Subject: [PATCH] [PM-17562] Add Azure Service Bus for Distributed Events (#5382) * [PM-17562] Add Azure Service Bus for Distributed Events * Fix failing test * Addressed issues mentioned in SonarQube * Respond to PR feedback * Respond to PR feedback - make webhook opt-in, remove message body from log --- dev/docker-compose.yml | 15 ++++ dev/servicebusemulator_config.json | 38 ++++++++++ .../Services/EventLoggingListenerService.cs | 0 .../AzureServiceBusEventListenerService.cs | 73 +++++++++++++++++++ .../AzureServiceBusEventWriteService.cs | 43 +++++++++++ .../AzureTableStorageEventHandler.cs | 14 ++++ .../RabbitMqEventListenerService.cs | 18 ++--- ...EventHandler.cs => WebhookEventHandler.cs} | 12 +-- src/Core/Settings/GlobalSettings.cs | 26 ++++++- src/Events/Startup.cs | 14 ++-- src/EventsProcessor/Startup.cs | 33 ++++++++- .../Utilities/ServiceCollectionExtensions.cs | 10 ++- ...erTests.cs => WebhookEventHandlerTests.cs} | 18 ++--- 13 files changed, 278 insertions(+), 36 deletions(-) create mode 100644 dev/servicebusemulator_config.json rename src/Core/{ => AdminConsole}/Services/EventLoggingListenerService.cs (100%) create mode 100644 src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs create mode 100644 src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs create mode 100644 src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs rename src/Core/{ => AdminConsole}/Services/Implementations/RabbitMqEventListenerService.cs (88%) rename src/Core/AdminConsole/Services/Implementations/{HttpPostEventHandler.cs => WebhookEventHandler.cs} (59%) rename test/Core.Test/AdminConsole/Services/{HttpPostEventHandlerTests.cs => WebhookEventHandlerTests.cs} (74%) diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index d23eaefbb0..1bfbe0a9d7 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -109,6 +109,21 @@ services: profiles: - proxy + service-bus: + container_name: service-bus + image: mcr.microsoft.com/azure-messaging/servicebus-emulator:latest + pull_policy: always + volumes: + - "./servicebusemulator_config.json:/ServiceBus_Emulator/ConfigFiles/Config.json" + ports: + - "5672:5672" + environment: + SQL_SERVER: mssql + MSSQL_SA_PASSWORD: "${MSSQL_PASSWORD}" + ACCEPT_EULA: "Y" + profiles: + - servicebus + volumes: mssql_dev_data: postgres_dev_data: diff --git a/dev/servicebusemulator_config.json b/dev/servicebusemulator_config.json new file mode 100644 index 0000000000..f0e4279b06 --- /dev/null +++ b/dev/servicebusemulator_config.json @@ -0,0 +1,38 @@ +{ + "UserConfig": { + "Namespaces": [ + { + "Name": "sbemulatorns", + "Queues": [ + { + "Name": "queue.1", + "Properties": { + "DeadLetteringOnMessageExpiration": false, + "DefaultMessageTimeToLive": "PT1H", + "DuplicateDetectionHistoryTimeWindow": "PT20S", + "ForwardDeadLetteredMessagesTo": "", + "ForwardTo": "", + "LockDuration": "PT1M", + "MaxDeliveryCount": 3, + "RequiresDuplicateDetection": false, + "RequiresSession": false + } + } + ], + "Topics": [ + { + "Name": "event-logging", + "Subscriptions": [ + { + "Name": "events-write-subscription" + } + ] + } + ] + } + ], + "Logging": { + "Type": "File" + } + } +} diff --git a/src/Core/Services/EventLoggingListenerService.cs b/src/Core/AdminConsole/Services/EventLoggingListenerService.cs similarity index 100% rename from src/Core/Services/EventLoggingListenerService.cs rename to src/Core/AdminConsole/Services/EventLoggingListenerService.cs diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs new file mode 100644 index 0000000000..5c329ce8ad --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs @@ -0,0 +1,73 @@ +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Bit.Core.Models.Data; +using Bit.Core.Settings; +using Microsoft.Extensions.Logging; + +namespace Bit.Core.Services; + +public class AzureServiceBusEventListenerService : EventLoggingListenerService +{ + private readonly ILogger<AzureServiceBusEventListenerService> _logger; + private readonly ServiceBusClient _client; + private readonly ServiceBusProcessor _processor; + + public AzureServiceBusEventListenerService( + IEventMessageHandler handler, + ILogger<AzureServiceBusEventListenerService> logger, + GlobalSettings globalSettings, + string subscriptionName) : base(handler) + { + _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); + _processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.TopicName, subscriptionName, new ServiceBusProcessorOptions()); + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + _processor.ProcessMessageAsync += async args => + { + try + { + var eventMessage = JsonSerializer.Deserialize<EventMessage>(args.Message.Body.ToString()); + + await _handler.HandleEventAsync(eventMessage); + await args.CompleteMessageAsync(args.Message); + } + catch (Exception exception) + { + _logger.LogError( + exception, + "An error occured while processing message: {MessageId}", + args.Message.MessageId + ); + } + }; + + _processor.ProcessErrorAsync += args => + { + _logger.LogError( + args.Exception, + "An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}", + args.EntityPath, + args.ErrorSource + ); + return Task.CompletedTask; + }; + + await _processor.StartProcessingAsync(cancellationToken); + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + await _processor.StopProcessingAsync(cancellationToken); + await base.StopAsync(cancellationToken); + } + + public override void Dispose() + { + _processor.DisposeAsync().GetAwaiter().GetResult(); + _client.DisposeAsync().GetAwaiter().GetResult(); + base.Dispose(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs new file mode 100644 index 0000000000..ed8f45ed55 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs @@ -0,0 +1,43 @@ +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Bit.Core.Settings; + +namespace Bit.Core.AdminConsole.Services.Implementations; + +public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable +{ + private readonly ServiceBusClient _client; + private readonly ServiceBusSender _sender; + + public AzureServiceBusEventWriteService(GlobalSettings globalSettings) + { + _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); + _sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.TopicName); + } + + public async Task CreateAsync(IEvent e) + { + var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e)) + { + ContentType = "application/json" + }; + + await _sender.SendMessageAsync(message); + } + + public async Task CreateManyAsync(IEnumerable<IEvent> events) + { + foreach (var e in events) + { + await CreateAsync(e); + } + } + + public async ValueTask DisposeAsync() + { + await _sender.DisposeAsync(); + await _client.DisposeAsync(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs new file mode 100644 index 0000000000..2612ba0487 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs @@ -0,0 +1,14 @@ +using Bit.Core.Models.Data; +using Microsoft.Extensions.DependencyInjection; + +namespace Bit.Core.Services; + +public class AzureTableStorageEventHandler( + [FromKeyedServices("persistent")] IEventWriteService eventWriteService) + : IEventMessageHandler +{ + public Task HandleEventAsync(EventMessage eventMessage) + { + return eventWriteService.CreateManyAsync(EventTableEntity.IndexEvent(eventMessage)); + } +} diff --git a/src/Core/Services/Implementations/RabbitMqEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs similarity index 88% rename from src/Core/Services/Implementations/RabbitMqEventListenerService.cs rename to src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs index 9360170368..c302497142 100644 --- a/src/Core/Services/Implementations/RabbitMqEventListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs @@ -38,7 +38,10 @@ public class RabbitMqEventListenerService : EventLoggingListenerService _connection = await _factory.CreateConnectionAsync(cancellationToken); _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); + await _channel.ExchangeDeclareAsync(exchange: _exchangeName, + type: ExchangeType.Fanout, + durable: true, + cancellationToken: cancellationToken); await _channel.QueueDeclareAsync(queue: _queueName, durable: true, exclusive: false, @@ -52,7 +55,7 @@ public class RabbitMqEventListenerService : EventLoggingListenerService await base.StartAsync(cancellationToken); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken cancellationToken) { var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += async (_, eventArgs) => @@ -68,18 +71,13 @@ public class RabbitMqEventListenerService : EventLoggingListenerService } }; - await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken); - - while (!stoppingToken.IsCancellationRequested) - { - await Task.Delay(1_000, stoppingToken); - } + await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken); } public override async Task StopAsync(CancellationToken cancellationToken) { - await _channel.CloseAsync(); - await _connection.CloseAsync(); + await _channel.CloseAsync(cancellationToken); + await _connection.CloseAsync(cancellationToken); await base.StopAsync(cancellationToken); } diff --git a/src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs b/src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs similarity index 59% rename from src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs rename to src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs index 8aece0c1da..60abc198d8 100644 --- a/src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs @@ -4,25 +4,25 @@ using Bit.Core.Settings; namespace Bit.Core.Services; -public class HttpPostEventHandler : IEventMessageHandler +public class WebhookEventHandler : IEventMessageHandler { private readonly HttpClient _httpClient; - private readonly string _httpPostUrl; + private readonly string _webhookUrl; - public const string HttpClientName = "HttpPostEventHandlerHttpClient"; + public const string HttpClientName = "WebhookEventHandlerHttpClient"; - public HttpPostEventHandler( + public WebhookEventHandler( IHttpClientFactory httpClientFactory, GlobalSettings globalSettings) { _httpClient = httpClientFactory.CreateClient(HttpClientName); - _httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl; + _webhookUrl = globalSettings.EventLogging.WebhookUrl; } public async Task HandleEventAsync(EventMessage eventMessage) { var content = JsonContent.Create(eventMessage); - var response = await _httpClient.PostAsync(_httpPostUrl, content); + var response = await _httpClient.PostAsync(_webhookUrl, content); response.EnsureSuccessStatusCode(); } } diff --git a/src/Core/Settings/GlobalSettings.cs b/src/Core/Settings/GlobalSettings.cs index a63a36c1c0..a1c7a4fac6 100644 --- a/src/Core/Settings/GlobalSettings.cs +++ b/src/Core/Settings/GlobalSettings.cs @@ -260,8 +260,31 @@ public class GlobalSettings : IGlobalSettings public class EventLoggingSettings { + public AzureServiceBusSettings AzureServiceBus { get; set; } = new AzureServiceBusSettings(); + public virtual string WebhookUrl { get; set; } public RabbitMqSettings RabbitMq { get; set; } = new RabbitMqSettings(); + public class AzureServiceBusSettings + { + private string _connectionString; + private string _topicName; + + public virtual string EventRepositorySubscriptionName { get; set; } = "events-write-subscription"; + public virtual string WebhookSubscriptionName { get; set; } = "events-webhook-subscription"; + + public string ConnectionString + { + get => _connectionString; + set => _connectionString = value.Trim('"'); + } + + public string TopicName + { + get => _topicName; + set => _topicName = value.Trim('"'); + } + } + public class RabbitMqSettings { private string _hostName; @@ -270,8 +293,7 @@ public class GlobalSettings : IGlobalSettings private string _exchangeName; public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue"; - public virtual string HttpPostQueueName { get; set; } = "events-httpPost-queue"; - public virtual string HttpPostUrl { get; set; } + public virtual string WebhookQueueName { get; set; } = "events-webhook-queue"; public string HostName { diff --git a/src/Events/Startup.cs b/src/Events/Startup.cs index b692733a55..431f449708 100644 --- a/src/Events/Startup.cs +++ b/src/Events/Startup.cs @@ -95,20 +95,20 @@ public class Startup new RabbitMqEventListenerService( provider.GetRequiredService<EventRepositoryHandler>(), provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(), - provider.GetRequiredService<GlobalSettings>(), + globalSettings, globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName)); - if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HttpPostUrl)) + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.WebhookUrl)) { - services.AddSingleton<HttpPostEventHandler>(); - services.AddHttpClient(HttpPostEventHandler.HttpClientName); + services.AddSingleton<WebhookEventHandler>(); + services.AddHttpClient(WebhookEventHandler.HttpClientName); services.AddSingleton<IHostedService>(provider => new RabbitMqEventListenerService( - provider.GetRequiredService<HttpPostEventHandler>(), + provider.GetRequiredService<WebhookEventHandler>(), provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(), - provider.GetRequiredService<GlobalSettings>(), - globalSettings.EventLogging.RabbitMq.HttpPostQueueName)); + globalSettings, + globalSettings.EventLogging.RabbitMq.WebhookQueueName)); } } } diff --git a/src/EventsProcessor/Startup.cs b/src/EventsProcessor/Startup.cs index 2f64c0f926..65d1d36e24 100644 --- a/src/EventsProcessor/Startup.cs +++ b/src/EventsProcessor/Startup.cs @@ -1,8 +1,11 @@ using System.Globalization; +using Bit.Core.Repositories; +using Bit.Core.Services; using Bit.Core.Settings; using Bit.Core.Utilities; using Bit.SharedWeb.Utilities; using Microsoft.IdentityModel.Logging; +using TableStorageRepos = Bit.Core.Repositories.TableStorage; namespace Bit.EventsProcessor; @@ -24,9 +27,37 @@ public class Startup services.AddOptions(); // Settings - services.AddGlobalSettingsServices(Configuration, Environment); + var globalSettings = services.AddGlobalSettingsServices(Configuration, Environment); // Hosted Services + + // Optional Azure Service Bus Listeners + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && + CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName)) + { + services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>(); + services.AddSingleton<AzureTableStorageEventHandler>(); + services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent"); + services.AddSingleton<IHostedService>(provider => + new AzureServiceBusEventListenerService( + provider.GetRequiredService<AzureTableStorageEventHandler>(), + provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(), + globalSettings, + globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName)); + + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.WebhookUrl)) + { + services.AddSingleton<WebhookEventHandler>(); + services.AddHttpClient(WebhookEventHandler.HttpClientName); + + services.AddSingleton<IHostedService>(provider => + new AzureServiceBusEventListenerService( + provider.GetRequiredService<WebhookEventHandler>(), + provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(), + globalSettings, + globalSettings.EventLogging.AzureServiceBus.WebhookSubscriptionName)); + } + } services.AddHostedService<AzureQueueHostedService>(); } diff --git a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs index 622b3d7f39..192871bffc 100644 --- a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs +++ b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs @@ -321,7 +321,15 @@ public static class ServiceCollectionExtensions if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString)) { - services.AddSingleton<IEventWriteService, AzureQueueEventWriteService>(); + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && + CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName)) + { + services.AddSingleton<IEventWriteService, AzureServiceBusEventWriteService>(); + } + else + { + services.AddSingleton<IEventWriteService, AzureQueueEventWriteService>(); + } } else if (globalSettings.SelfHosted) { diff --git a/test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs b/test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs similarity index 74% rename from test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs rename to test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs index 414b1c54be..eab0be88a1 100644 --- a/test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs @@ -13,14 +13,14 @@ using GlobalSettings = Bit.Core.Settings.GlobalSettings; namespace Bit.Core.Test.Services; [SutProviderCustomize] -public class HttpPostEventHandlerTests +public class WebhookEventHandlerTests { private readonly MockedHttpMessageHandler _handler; private HttpClient _httpClient; - private const string _httpPostUrl = "http://localhost/test/event"; + private const string _webhookUrl = "http://localhost/test/event"; - public HttpPostEventHandlerTests() + public WebhookEventHandlerTests() { _handler = new MockedHttpMessageHandler(); _handler.Fallback @@ -29,15 +29,15 @@ public class HttpPostEventHandlerTests _httpClient = _handler.ToHttpClient(); } - public SutProvider<HttpPostEventHandler> GetSutProvider() + public SutProvider<WebhookEventHandler> GetSutProvider() { var clientFactory = Substitute.For<IHttpClientFactory>(); - clientFactory.CreateClient(HttpPostEventHandler.HttpClientName).Returns(_httpClient); + clientFactory.CreateClient(WebhookEventHandler.HttpClientName).Returns(_httpClient); var globalSettings = new GlobalSettings(); - globalSettings.EventLogging.RabbitMq.HttpPostUrl = _httpPostUrl; + globalSettings.EventLogging.WebhookUrl = _webhookUrl; - return new SutProvider<HttpPostEventHandler>() + return new SutProvider<WebhookEventHandler>() .SetDependency(globalSettings) .SetDependency(clientFactory) .Create(); @@ -51,7 +51,7 @@ public class HttpPostEventHandlerTests await sutProvider.Sut.HandleEventAsync(eventMessage); sutProvider.GetDependency<IHttpClientFactory>().Received(1).CreateClient( - Arg.Is(AssertHelper.AssertPropertyEqual<string>(HttpPostEventHandler.HttpClientName)) + Arg.Is(AssertHelper.AssertPropertyEqual<string>(WebhookEventHandler.HttpClientName)) ); Assert.Single(_handler.CapturedRequests); @@ -60,7 +60,7 @@ public class HttpPostEventHandlerTests var returned = await request.Content.ReadFromJsonAsync<EventMessage>(); Assert.Equal(HttpMethod.Post, request.Method); - Assert.Equal(_httpPostUrl, request.RequestUri.ToString()); + Assert.Equal(_webhookUrl, request.RequestUri.ToString()); AssertHelper.AssertPropertyEqual(eventMessage, returned, new[] { "IdempotencyId" }); } }