diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index d23eaefbb0..1bfbe0a9d7 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -109,6 +109,21 @@ services: profiles: - proxy + service-bus: + container_name: service-bus + image: mcr.microsoft.com/azure-messaging/servicebus-emulator:latest + pull_policy: always + volumes: + - "./servicebusemulator_config.json:/ServiceBus_Emulator/ConfigFiles/Config.json" + ports: + - "5672:5672" + environment: + SQL_SERVER: mssql + MSSQL_SA_PASSWORD: "${MSSQL_PASSWORD}" + ACCEPT_EULA: "Y" + profiles: + - servicebus + volumes: mssql_dev_data: postgres_dev_data: diff --git a/dev/servicebusemulator_config.json b/dev/servicebusemulator_config.json new file mode 100644 index 0000000000..f0e4279b06 --- /dev/null +++ b/dev/servicebusemulator_config.json @@ -0,0 +1,38 @@ +{ + "UserConfig": { + "Namespaces": [ + { + "Name": "sbemulatorns", + "Queues": [ + { + "Name": "queue.1", + "Properties": { + "DeadLetteringOnMessageExpiration": false, + "DefaultMessageTimeToLive": "PT1H", + "DuplicateDetectionHistoryTimeWindow": "PT20S", + "ForwardDeadLetteredMessagesTo": "", + "ForwardTo": "", + "LockDuration": "PT1M", + "MaxDeliveryCount": 3, + "RequiresDuplicateDetection": false, + "RequiresSession": false + } + } + ], + "Topics": [ + { + "Name": "event-logging", + "Subscriptions": [ + { + "Name": "events-write-subscription" + } + ] + } + ] + } + ], + "Logging": { + "Type": "File" + } + } +} diff --git a/src/Core/Services/EventLoggingListenerService.cs b/src/Core/AdminConsole/Services/EventLoggingListenerService.cs similarity index 100% rename from src/Core/Services/EventLoggingListenerService.cs rename to src/Core/AdminConsole/Services/EventLoggingListenerService.cs diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs new file mode 100644 index 0000000000..5c329ce8ad --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs @@ -0,0 +1,73 @@ +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Bit.Core.Models.Data; +using Bit.Core.Settings; +using Microsoft.Extensions.Logging; + +namespace Bit.Core.Services; + +public class AzureServiceBusEventListenerService : EventLoggingListenerService +{ + private readonly ILogger _logger; + private readonly ServiceBusClient _client; + private readonly ServiceBusProcessor _processor; + + public AzureServiceBusEventListenerService( + IEventMessageHandler handler, + ILogger logger, + GlobalSettings globalSettings, + string subscriptionName) : base(handler) + { + _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); + _processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.TopicName, subscriptionName, new ServiceBusProcessorOptions()); + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + _processor.ProcessMessageAsync += async args => + { + try + { + var eventMessage = JsonSerializer.Deserialize(args.Message.Body.ToString()); + + await _handler.HandleEventAsync(eventMessage); + await args.CompleteMessageAsync(args.Message); + } + catch (Exception exception) + { + _logger.LogError( + exception, + "An error occured while processing message: {MessageId}", + args.Message.MessageId + ); + } + }; + + _processor.ProcessErrorAsync += args => + { + _logger.LogError( + args.Exception, + "An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}", + args.EntityPath, + args.ErrorSource + ); + return Task.CompletedTask; + }; + + await _processor.StartProcessingAsync(cancellationToken); + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + await _processor.StopProcessingAsync(cancellationToken); + await base.StopAsync(cancellationToken); + } + + public override void Dispose() + { + _processor.DisposeAsync().GetAwaiter().GetResult(); + _client.DisposeAsync().GetAwaiter().GetResult(); + base.Dispose(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs new file mode 100644 index 0000000000..ed8f45ed55 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs @@ -0,0 +1,43 @@ +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Bit.Core.Settings; + +namespace Bit.Core.AdminConsole.Services.Implementations; + +public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable +{ + private readonly ServiceBusClient _client; + private readonly ServiceBusSender _sender; + + public AzureServiceBusEventWriteService(GlobalSettings globalSettings) + { + _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); + _sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.TopicName); + } + + public async Task CreateAsync(IEvent e) + { + var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e)) + { + ContentType = "application/json" + }; + + await _sender.SendMessageAsync(message); + } + + public async Task CreateManyAsync(IEnumerable events) + { + foreach (var e in events) + { + await CreateAsync(e); + } + } + + public async ValueTask DisposeAsync() + { + await _sender.DisposeAsync(); + await _client.DisposeAsync(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs new file mode 100644 index 0000000000..2612ba0487 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs @@ -0,0 +1,14 @@ +using Bit.Core.Models.Data; +using Microsoft.Extensions.DependencyInjection; + +namespace Bit.Core.Services; + +public class AzureTableStorageEventHandler( + [FromKeyedServices("persistent")] IEventWriteService eventWriteService) + : IEventMessageHandler +{ + public Task HandleEventAsync(EventMessage eventMessage) + { + return eventWriteService.CreateManyAsync(EventTableEntity.IndexEvent(eventMessage)); + } +} diff --git a/src/Core/Services/Implementations/RabbitMqEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs similarity index 88% rename from src/Core/Services/Implementations/RabbitMqEventListenerService.cs rename to src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs index 9360170368..c302497142 100644 --- a/src/Core/Services/Implementations/RabbitMqEventListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs @@ -38,7 +38,10 @@ public class RabbitMqEventListenerService : EventLoggingListenerService _connection = await _factory.CreateConnectionAsync(cancellationToken); _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); + await _channel.ExchangeDeclareAsync(exchange: _exchangeName, + type: ExchangeType.Fanout, + durable: true, + cancellationToken: cancellationToken); await _channel.QueueDeclareAsync(queue: _queueName, durable: true, exclusive: false, @@ -52,7 +55,7 @@ public class RabbitMqEventListenerService : EventLoggingListenerService await base.StartAsync(cancellationToken); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken cancellationToken) { var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += async (_, eventArgs) => @@ -68,18 +71,13 @@ public class RabbitMqEventListenerService : EventLoggingListenerService } }; - await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken); - - while (!stoppingToken.IsCancellationRequested) - { - await Task.Delay(1_000, stoppingToken); - } + await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken); } public override async Task StopAsync(CancellationToken cancellationToken) { - await _channel.CloseAsync(); - await _connection.CloseAsync(); + await _channel.CloseAsync(cancellationToken); + await _connection.CloseAsync(cancellationToken); await base.StopAsync(cancellationToken); } diff --git a/src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs b/src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs similarity index 59% rename from src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs rename to src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs index 8aece0c1da..60abc198d8 100644 --- a/src/Core/AdminConsole/Services/Implementations/HttpPostEventHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/WebhookEventHandler.cs @@ -4,25 +4,25 @@ using Bit.Core.Settings; namespace Bit.Core.Services; -public class HttpPostEventHandler : IEventMessageHandler +public class WebhookEventHandler : IEventMessageHandler { private readonly HttpClient _httpClient; - private readonly string _httpPostUrl; + private readonly string _webhookUrl; - public const string HttpClientName = "HttpPostEventHandlerHttpClient"; + public const string HttpClientName = "WebhookEventHandlerHttpClient"; - public HttpPostEventHandler( + public WebhookEventHandler( IHttpClientFactory httpClientFactory, GlobalSettings globalSettings) { _httpClient = httpClientFactory.CreateClient(HttpClientName); - _httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl; + _webhookUrl = globalSettings.EventLogging.WebhookUrl; } public async Task HandleEventAsync(EventMessage eventMessage) { var content = JsonContent.Create(eventMessage); - var response = await _httpClient.PostAsync(_httpPostUrl, content); + var response = await _httpClient.PostAsync(_webhookUrl, content); response.EnsureSuccessStatusCode(); } } diff --git a/src/Core/Settings/GlobalSettings.cs b/src/Core/Settings/GlobalSettings.cs index a63a36c1c0..a1c7a4fac6 100644 --- a/src/Core/Settings/GlobalSettings.cs +++ b/src/Core/Settings/GlobalSettings.cs @@ -260,8 +260,31 @@ public class GlobalSettings : IGlobalSettings public class EventLoggingSettings { + public AzureServiceBusSettings AzureServiceBus { get; set; } = new AzureServiceBusSettings(); + public virtual string WebhookUrl { get; set; } public RabbitMqSettings RabbitMq { get; set; } = new RabbitMqSettings(); + public class AzureServiceBusSettings + { + private string _connectionString; + private string _topicName; + + public virtual string EventRepositorySubscriptionName { get; set; } = "events-write-subscription"; + public virtual string WebhookSubscriptionName { get; set; } = "events-webhook-subscription"; + + public string ConnectionString + { + get => _connectionString; + set => _connectionString = value.Trim('"'); + } + + public string TopicName + { + get => _topicName; + set => _topicName = value.Trim('"'); + } + } + public class RabbitMqSettings { private string _hostName; @@ -270,8 +293,7 @@ public class GlobalSettings : IGlobalSettings private string _exchangeName; public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue"; - public virtual string HttpPostQueueName { get; set; } = "events-httpPost-queue"; - public virtual string HttpPostUrl { get; set; } + public virtual string WebhookQueueName { get; set; } = "events-webhook-queue"; public string HostName { diff --git a/src/Events/Startup.cs b/src/Events/Startup.cs index b692733a55..431f449708 100644 --- a/src/Events/Startup.cs +++ b/src/Events/Startup.cs @@ -95,20 +95,20 @@ public class Startup new RabbitMqEventListenerService( provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService(), + globalSettings, globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName)); - if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HttpPostUrl)) + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.WebhookUrl)) { - services.AddSingleton(); - services.AddHttpClient(HttpPostEventHandler.HttpClientName); + services.AddSingleton(); + services.AddHttpClient(WebhookEventHandler.HttpClientName); services.AddSingleton(provider => new RabbitMqEventListenerService( - provider.GetRequiredService(), + provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService(), - globalSettings.EventLogging.RabbitMq.HttpPostQueueName)); + globalSettings, + globalSettings.EventLogging.RabbitMq.WebhookQueueName)); } } } diff --git a/src/EventsProcessor/Startup.cs b/src/EventsProcessor/Startup.cs index 2f64c0f926..65d1d36e24 100644 --- a/src/EventsProcessor/Startup.cs +++ b/src/EventsProcessor/Startup.cs @@ -1,8 +1,11 @@ using System.Globalization; +using Bit.Core.Repositories; +using Bit.Core.Services; using Bit.Core.Settings; using Bit.Core.Utilities; using Bit.SharedWeb.Utilities; using Microsoft.IdentityModel.Logging; +using TableStorageRepos = Bit.Core.Repositories.TableStorage; namespace Bit.EventsProcessor; @@ -24,9 +27,37 @@ public class Startup services.AddOptions(); // Settings - services.AddGlobalSettingsServices(Configuration, Environment); + var globalSettings = services.AddGlobalSettingsServices(Configuration, Environment); // Hosted Services + + // Optional Azure Service Bus Listeners + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && + CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName)) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddKeyedSingleton("persistent"); + services.AddSingleton(provider => + new AzureServiceBusEventListenerService( + provider.GetRequiredService(), + provider.GetRequiredService>(), + globalSettings, + globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName)); + + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.WebhookUrl)) + { + services.AddSingleton(); + services.AddHttpClient(WebhookEventHandler.HttpClientName); + + services.AddSingleton(provider => + new AzureServiceBusEventListenerService( + provider.GetRequiredService(), + provider.GetRequiredService>(), + globalSettings, + globalSettings.EventLogging.AzureServiceBus.WebhookSubscriptionName)); + } + } services.AddHostedService(); } diff --git a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs index 622b3d7f39..192871bffc 100644 --- a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs +++ b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs @@ -321,7 +321,15 @@ public static class ServiceCollectionExtensions if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString)) { - services.AddSingleton(); + if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && + CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName)) + { + services.AddSingleton(); + } + else + { + services.AddSingleton(); + } } else if (globalSettings.SelfHosted) { diff --git a/test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs b/test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs similarity index 74% rename from test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs rename to test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs index 414b1c54be..eab0be88a1 100644 --- a/test/Core.Test/AdminConsole/Services/HttpPostEventHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/WebhookEventHandlerTests.cs @@ -13,14 +13,14 @@ using GlobalSettings = Bit.Core.Settings.GlobalSettings; namespace Bit.Core.Test.Services; [SutProviderCustomize] -public class HttpPostEventHandlerTests +public class WebhookEventHandlerTests { private readonly MockedHttpMessageHandler _handler; private HttpClient _httpClient; - private const string _httpPostUrl = "http://localhost/test/event"; + private const string _webhookUrl = "http://localhost/test/event"; - public HttpPostEventHandlerTests() + public WebhookEventHandlerTests() { _handler = new MockedHttpMessageHandler(); _handler.Fallback @@ -29,15 +29,15 @@ public class HttpPostEventHandlerTests _httpClient = _handler.ToHttpClient(); } - public SutProvider GetSutProvider() + public SutProvider GetSutProvider() { var clientFactory = Substitute.For(); - clientFactory.CreateClient(HttpPostEventHandler.HttpClientName).Returns(_httpClient); + clientFactory.CreateClient(WebhookEventHandler.HttpClientName).Returns(_httpClient); var globalSettings = new GlobalSettings(); - globalSettings.EventLogging.RabbitMq.HttpPostUrl = _httpPostUrl; + globalSettings.EventLogging.WebhookUrl = _webhookUrl; - return new SutProvider() + return new SutProvider() .SetDependency(globalSettings) .SetDependency(clientFactory) .Create(); @@ -51,7 +51,7 @@ public class HttpPostEventHandlerTests await sutProvider.Sut.HandleEventAsync(eventMessage); sutProvider.GetDependency().Received(1).CreateClient( - Arg.Is(AssertHelper.AssertPropertyEqual(HttpPostEventHandler.HttpClientName)) + Arg.Is(AssertHelper.AssertPropertyEqual(WebhookEventHandler.HttpClientName)) ); Assert.Single(_handler.CapturedRequests); @@ -60,7 +60,7 @@ public class HttpPostEventHandlerTests var returned = await request.Content.ReadFromJsonAsync(); Assert.Equal(HttpMethod.Post, request.Method); - Assert.Equal(_httpPostUrl, request.RequestUri.ToString()); + Assert.Equal(_webhookUrl, request.RequestUri.ToString()); AssertHelper.AssertPropertyEqual(eventMessage, returned, new[] { "IdempotencyId" }); } }