From 59f5fafb875bd201cec7f22885468565312ec3e2 Mon Sep 17 00:00:00 2001 From: Brant DeBow <125889545+brant-livefront@users.noreply.github.com> Date: Tue, 3 Jun 2025 10:48:24 -0400 Subject: [PATCH] [PM-17562] Add strict delay support for RabbitMQ; Refactor implementation (#5899) * [PM-17562] Add strict delay support for RabbitMQ * fix lint error * Added more robust FailureReason handling and some additional tests * Fix two issues noted by SonarQube * Fix typo; Add alternate handling if MessageId is null or empty * Set MessageId on all message publishers --- dev/docker-compose.yml | 4 +- .../Data/Integrations/IIntegrationMessage.cs | 9 +- .../Integrations/IntegrationHandlerResult.cs | 4 +- .../Data/Integrations/IntegrationMessage.cs | 24 +- .../Data/Integrations/SlackIntegration.cs | 4 +- .../SlackIntegrationConfiguration.cs | 4 +- .../SlackIntegrationConfigurationDetails.cs | 4 +- .../WebhookIntegrationConfiguration.cs | 4 +- .../WebhookIntegrationConfigurationDetails.cs | 4 +- .../Models/Slack/SlackApiResponse.cs | 3 +- .../Services/EventLoggingListenerService.cs | 80 +++++- .../Services/IAzureServiceBusService.cs | 10 + ...isher.cs => IEventIntegrationPublisher.cs} | 3 +- .../AdminConsole/Services/IRabbitMqService.cs | 19 ++ .../AzureServiceBusEventListenerService.cs | 79 +++--- .../AzureServiceBusEventWriteService.cs | 45 ---- ...ureServiceBusIntegrationListenerService.cs | 82 +++--- .../AzureServiceBusIntegrationPublisher.cs | 36 --- .../Implementations/AzureServiceBusService.cs | 70 +++++ .../AzureTableStorageEventHandler.cs | 4 +- .../EventIntegrationEventWriteService.cs | 32 +++ .../EventIntegrationHandler.cs | 12 +- .../Implementations/EventRepositoryHandler.cs | 4 +- .../Implementations/EventRouteService.cs | 4 +- .../RabbitMqEventListenerService.cs | 95 +++---- .../RabbitMqEventWriteService.cs | 62 ----- .../RabbitMqIntegrationListenerService.cs | 219 +++++++--------- .../RabbitMqIntegrationPublisher.cs | 54 ---- .../Implementations/RabbitMqService.cs | 244 ++++++++++++++++++ .../SlackIntegrationHandler.cs | 4 +- .../Services/Implementations/SlackService.cs | 22 +- .../WebhookIntegrationHandler.cs | 8 +- src/Core/Settings/GlobalSettings.cs | 1 + .../Utilities/ServiceCollectionExtensions.cs | 70 ++--- .../Integrations/IntegrationMessageTests.cs | 32 ++- ...zureServiceBusEventListenerServiceTests.cs | 133 ++++++++++ ...rviceBusIntegrationListenerServiceTests.cs | 124 +++++++++ .../EventIntegrationEventWriteServiceTests.cs | 57 ++++ .../Services/EventIntegrationHandlerTests.cs | 40 +-- .../Services/IntegrationHandlerTests.cs | 1 + .../RabbitMqEventListenerServiceTests.cs | 173 +++++++++++++ ...RabbitMqIntegrationListenerServiceTests.cs | 230 +++++++++++++++++ .../Services/SlackServiceTests.cs | 10 +- .../WebhookIntegrationHandlerTests.cs | 4 + 44 files changed, 1554 insertions(+), 573 deletions(-) create mode 100644 src/Core/AdminConsole/Services/IAzureServiceBusService.cs rename src/Core/AdminConsole/Services/{IIntegrationPublisher.cs => IEventIntegrationPublisher.cs} (58%) create mode 100644 src/Core/AdminConsole/Services/IRabbitMqService.cs delete mode 100644 src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs delete mode 100644 src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationPublisher.cs create mode 100644 src/Core/AdminConsole/Services/Implementations/AzureServiceBusService.cs create mode 100644 src/Core/AdminConsole/Services/Implementations/EventIntegrationEventWriteService.cs delete mode 100644 src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs delete mode 100644 src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationPublisher.cs create mode 100644 src/Core/AdminConsole/Services/Implementations/RabbitMqService.cs create mode 100644 test/Core.Test/AdminConsole/Services/AzureServiceBusEventListenerServiceTests.cs create mode 100644 test/Core.Test/AdminConsole/Services/AzureServiceBusIntegrationListenerServiceTests.cs create mode 100644 test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs create mode 100644 test/Core.Test/AdminConsole/Services/RabbitMqEventListenerServiceTests.cs create mode 100644 test/Core.Test/AdminConsole/Services/RabbitMqIntegrationListenerServiceTests.cs diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 601989a473..0ee4aa53a9 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -99,7 +99,7 @@ services: - idp rabbitmq: - image: rabbitmq:management + image: rabbitmq:4.1.0-management container_name: rabbitmq ports: - "5672:5672" @@ -108,7 +108,7 @@ services: RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER} RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS} volumes: - - rabbitmq_data:/var/lib/rabbitmq_data + - rabbitmq_data:/var/lib/rabbitmq profiles: - rabbitmq diff --git a/src/Core/AdminConsole/Models/Data/Integrations/IIntegrationMessage.cs b/src/Core/AdminConsole/Models/Data/Integrations/IIntegrationMessage.cs index bd1f280cad..c94794765b 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/IIntegrationMessage.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/IIntegrationMessage.cs @@ -1,12 +1,15 @@ -using Bit.Core.Enums; +#nullable enable + +using Bit.Core.Enums; namespace Bit.Core.AdminConsole.Models.Data.Integrations; public interface IIntegrationMessage { IntegrationType IntegrationType { get; } - int RetryCount { get; set; } - DateTime? DelayUntilDate { get; set; } + string MessageId { get; set; } + int RetryCount { get; } + DateTime? DelayUntilDate { get; } void ApplyRetry(DateTime? handlerDelayUntilDate); string ToJson(); } diff --git a/src/Core/AdminConsole/Models/Data/Integrations/IntegrationHandlerResult.cs b/src/Core/AdminConsole/Models/Data/Integrations/IntegrationHandlerResult.cs index d2f0bde693..ecf5d25c51 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/IntegrationHandlerResult.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/IntegrationHandlerResult.cs @@ -1,4 +1,6 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public class IntegrationHandlerResult { diff --git a/src/Core/AdminConsole/Models/Data/Integrations/IntegrationMessage.cs b/src/Core/AdminConsole/Models/Data/Integrations/IntegrationMessage.cs index 1f288914d0..018d453cb9 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/IntegrationMessage.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/IntegrationMessage.cs @@ -1,13 +1,15 @@ -using System.Text.Json; +#nullable enable + +using System.Text.Json; using Bit.Core.Enums; namespace Bit.Core.AdminConsole.Models.Data.Integrations; -public class IntegrationMessage : IIntegrationMessage +public class IntegrationMessage : IIntegrationMessage { public IntegrationType IntegrationType { get; set; } - public T Configuration { get; set; } - public string RenderedTemplate { get; set; } + public required string MessageId { get; set; } + public required string RenderedTemplate { get; set; } public int RetryCount { get; set; } = 0; public DateTime? DelayUntilDate { get; set; } @@ -22,12 +24,22 @@ public class IntegrationMessage : IIntegrationMessage DelayUntilDate = baseTime.AddSeconds(backoffSeconds + jitterSeconds); } - public string ToJson() + public virtual string ToJson() + { + return JsonSerializer.Serialize(this); + } +} + +public class IntegrationMessage : IntegrationMessage +{ + public required T Configuration { get; set; } + + public override string ToJson() { return JsonSerializer.Serialize(this); } - public static IntegrationMessage FromJson(string json) + public static IntegrationMessage? FromJson(string json) { return JsonSerializer.Deserialize>(json); } diff --git a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegration.cs b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegration.cs index 4fcce542ce..4f2c434ff6 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegration.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegration.cs @@ -1,3 +1,5 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public record SlackIntegration(string token); diff --git a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfiguration.cs b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfiguration.cs index 2930004cbf..18b13248ec 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfiguration.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfiguration.cs @@ -1,3 +1,5 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public record SlackIntegrationConfiguration(string channelId); diff --git a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfigurationDetails.cs b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfigurationDetails.cs index b81e50d403..a9b4150419 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfigurationDetails.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/SlackIntegrationConfigurationDetails.cs @@ -1,3 +1,5 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public record SlackIntegrationConfigurationDetails(string channelId, string token); diff --git a/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfiguration.cs b/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfiguration.cs index e8217d3ad3..47e014ee2a 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfiguration.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfiguration.cs @@ -1,3 +1,5 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public record WebhookIntegrationConfiguration(string url); diff --git a/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfigurationDetails.cs b/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfigurationDetails.cs index e3e92c900f..c4c41db24f 100644 --- a/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfigurationDetails.cs +++ b/src/Core/AdminConsole/Models/Data/Integrations/WebhookIntegrationConfigurationDetails.cs @@ -1,3 +1,5 @@ -namespace Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +namespace Bit.Core.AdminConsole.Models.Data.Integrations; public record WebhookIntegrationConfigurationDetails(string url); diff --git a/src/Core/AdminConsole/Models/Slack/SlackApiResponse.cs b/src/Core/AdminConsole/Models/Slack/SlackApiResponse.cs index 59debed746..ede2123f7e 100644 --- a/src/Core/AdminConsole/Models/Slack/SlackApiResponse.cs +++ b/src/Core/AdminConsole/Models/Slack/SlackApiResponse.cs @@ -1,4 +1,5 @@ - +#nullable enable + using System.Text.Json.Serialization; namespace Bit.Core.Models.Slack; diff --git a/src/Core/AdminConsole/Services/EventLoggingListenerService.cs b/src/Core/AdminConsole/Services/EventLoggingListenerService.cs index 60b8789a6b..ec2db121db 100644 --- a/src/Core/AdminConsole/Services/EventLoggingListenerService.cs +++ b/src/Core/AdminConsole/Services/EventLoggingListenerService.cs @@ -1,13 +1,87 @@ -using Microsoft.Extensions.Hosting; +#nullable enable + +using System.Text.Json; +using Bit.Core.Models.Data; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; namespace Bit.Core.Services; public abstract class EventLoggingListenerService : BackgroundService { protected readonly IEventMessageHandler _handler; + protected ILogger _logger; - protected EventLoggingListenerService(IEventMessageHandler handler) + protected EventLoggingListenerService(IEventMessageHandler handler, ILogger logger) { - _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + _handler = handler; + _logger = logger; + } + + internal async Task ProcessReceivedMessageAsync(string body, string? messageId) + { + try + { + using var jsonDocument = JsonDocument.Parse(body); + var root = jsonDocument.RootElement; + + if (root.ValueKind == JsonValueKind.Array) + { + var eventMessages = root.Deserialize>(); + await _handler.HandleManyEventsAsync(eventMessages); + } + else if (root.ValueKind == JsonValueKind.Object) + { + var eventMessage = root.Deserialize(); + await _handler.HandleEventAsync(eventMessage); + } + else + { + if (!string.IsNullOrEmpty(messageId)) + { + _logger.LogError("An error occurred while processing message: {MessageId} - Invalid JSON", messageId); + } + else + { + _logger.LogError("An Invalid JSON error occurred while processing a message with an empty message id"); + } + } + } + catch (JsonException exception) + { + if (!string.IsNullOrEmpty(messageId)) + { + _logger.LogError( + exception, + "An error occurred while processing message: {MessageId} - Invalid JSON", + messageId + ); + } + else + { + _logger.LogError( + exception, + "An Invalid JSON error occurred while processing a message with an empty message id" + ); + } + } + catch (Exception exception) + { + if (!string.IsNullOrEmpty(messageId)) + { + _logger.LogError( + exception, + "An error occurred while processing message: {MessageId}", + messageId + ); + } + else + { + _logger.LogError( + exception, + "An error occurred while processing a message with an empty message id" + ); + } + } } } diff --git a/src/Core/AdminConsole/Services/IAzureServiceBusService.cs b/src/Core/AdminConsole/Services/IAzureServiceBusService.cs new file mode 100644 index 0000000000..d254e763d5 --- /dev/null +++ b/src/Core/AdminConsole/Services/IAzureServiceBusService.cs @@ -0,0 +1,10 @@ +using Azure.Messaging.ServiceBus; +using Bit.Core.AdminConsole.Models.Data.Integrations; + +namespace Bit.Core.Services; + +public interface IAzureServiceBusService : IEventIntegrationPublisher, IAsyncDisposable +{ + ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options); + Task PublishToRetryAsync(IIntegrationMessage message); +} diff --git a/src/Core/AdminConsole/Services/IIntegrationPublisher.cs b/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs similarity index 58% rename from src/Core/AdminConsole/Services/IIntegrationPublisher.cs rename to src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs index 986ea776e1..560da576b7 100644 --- a/src/Core/AdminConsole/Services/IIntegrationPublisher.cs +++ b/src/Core/AdminConsole/Services/IEventIntegrationPublisher.cs @@ -2,7 +2,8 @@ namespace Bit.Core.Services; -public interface IIntegrationPublisher +public interface IEventIntegrationPublisher : IAsyncDisposable { Task PublishAsync(IIntegrationMessage message); + Task PublishEventAsync(string body); } diff --git a/src/Core/AdminConsole/Services/IRabbitMqService.cs b/src/Core/AdminConsole/Services/IRabbitMqService.cs new file mode 100644 index 0000000000..b0b9a72eac --- /dev/null +++ b/src/Core/AdminConsole/Services/IRabbitMqService.cs @@ -0,0 +1,19 @@ +using Bit.Core.AdminConsole.Models.Data.Integrations; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Bit.Core.Services; + +public interface IRabbitMqService : IEventIntegrationPublisher +{ + Task CreateChannelAsync(CancellationToken cancellationToken = default); + Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default); + Task CreateIntegrationQueuesAsync( + string queueName, + string retryQueueName, + string routingKey, + CancellationToken cancellationToken = default); + Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken); + Task PublishToDeadLetterAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken); + Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs); +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs index 2ab10418a3..8b00204775 100644 --- a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventListenerService.cs @@ -1,7 +1,7 @@ -using System.Text; -using System.Text.Json; +#nullable enable + +using System.Text; using Azure.Messaging.ServiceBus; -using Bit.Core.Models.Data; using Bit.Core.Settings; using Microsoft.Extensions.Logging; @@ -9,67 +9,47 @@ namespace Bit.Core.Services; public class AzureServiceBusEventListenerService : EventLoggingListenerService { - private readonly ILogger _logger; - private readonly ServiceBusClient _client; private readonly ServiceBusProcessor _processor; public AzureServiceBusEventListenerService( IEventMessageHandler handler, - ILogger logger, + IAzureServiceBusService serviceBusService, + string subscriptionName, GlobalSettings globalSettings, - string subscriptionName) : base(handler) + ILogger logger) : base(handler, logger) { - _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); - _processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.EventTopicName, subscriptionName, new ServiceBusProcessorOptions()); + _processor = serviceBusService.CreateProcessor( + globalSettings.EventLogging.AzureServiceBus.EventTopicName, + subscriptionName, + new ServiceBusProcessorOptions()); _logger = logger; } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { - _processor.ProcessMessageAsync += async args => - { - try - { - using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(args.Message.Body)); - var root = jsonDocument.RootElement; - - if (root.ValueKind == JsonValueKind.Array) - { - var eventMessages = root.Deserialize>(); - await _handler.HandleManyEventsAsync(eventMessages); - } - else if (root.ValueKind == JsonValueKind.Object) - { - var eventMessage = root.Deserialize(); - await _handler.HandleEventAsync(eventMessage); - - } - await args.CompleteMessageAsync(args.Message); - } - catch (Exception exception) - { - _logger.LogError( - exception, - "An error occured while processing message: {MessageId}", - args.Message.MessageId - ); - } - }; - - _processor.ProcessErrorAsync += args => - { - _logger.LogError( - args.Exception, - "An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}", - args.EntityPath, - args.ErrorSource - ); - return Task.CompletedTask; - }; + _processor.ProcessMessageAsync += ProcessReceivedMessageAsync; + _processor.ProcessErrorAsync += ProcessErrorAsync; await _processor.StartProcessingAsync(cancellationToken); } + internal Task ProcessErrorAsync(ProcessErrorEventArgs args) + { + _logger.LogError( + args.Exception, + "An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}", + args.EntityPath, + args.ErrorSource + ); + return Task.CompletedTask; + } + + private async Task ProcessReceivedMessageAsync(ProcessMessageEventArgs args) + { + await ProcessReceivedMessageAsync(Encoding.UTF8.GetString(args.Message.Body), args.Message.MessageId); + await args.CompleteMessageAsync(args.Message); + } + public override async Task StopAsync(CancellationToken cancellationToken) { await _processor.StopProcessingAsync(cancellationToken); @@ -79,7 +59,6 @@ public class AzureServiceBusEventListenerService : EventLoggingListenerService public override void Dispose() { _processor.DisposeAsync().GetAwaiter().GetResult(); - _client.DisposeAsync().GetAwaiter().GetResult(); base.Dispose(); } } diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs deleted file mode 100644 index 224f86a802..0000000000 --- a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusEventWriteService.cs +++ /dev/null @@ -1,45 +0,0 @@ -using System.Text.Json; -using Azure.Messaging.ServiceBus; -using Bit.Core.Models.Data; -using Bit.Core.Services; -using Bit.Core.Settings; - -namespace Bit.Core.AdminConsole.Services.Implementations; - -public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable -{ - private readonly ServiceBusClient _client; - private readonly ServiceBusSender _sender; - - public AzureServiceBusEventWriteService(GlobalSettings globalSettings) - { - _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); - _sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName); - } - - public async Task CreateAsync(IEvent e) - { - var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e)) - { - ContentType = "application/json" - }; - - await _sender.SendMessageAsync(message); - } - - public async Task CreateManyAsync(IEnumerable events) - { - var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(events)) - { - ContentType = "application/json" - }; - - await _sender.SendMessageAsync(message); - } - - public async ValueTask DisposeAsync() - { - await _sender.DisposeAsync(); - await _client.DisposeAsync(); - } -} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationListenerService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationListenerService.cs index 8244f39c09..55a39ec774 100644 --- a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationListenerService.cs @@ -1,7 +1,6 @@ #nullable enable using Azure.Messaging.ServiceBus; -using Bit.Core.Settings; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -10,39 +9,30 @@ namespace Bit.Core.Services; public class AzureServiceBusIntegrationListenerService : BackgroundService { private readonly int _maxRetries; - private readonly string _subscriptionName; - private readonly string _topicName; + private readonly IAzureServiceBusService _serviceBusService; private readonly IIntegrationHandler _handler; - private readonly ServiceBusClient _client; private readonly ServiceBusProcessor _processor; - private readonly ServiceBusSender _sender; private readonly ILogger _logger; - public AzureServiceBusIntegrationListenerService( - IIntegrationHandler handler, + public AzureServiceBusIntegrationListenerService(IIntegrationHandler handler, + string topicName, string subscriptionName, - GlobalSettings globalSettings, + int maxRetries, + IAzureServiceBusService serviceBusService, ILogger logger) { _handler = handler; _logger = logger; - _maxRetries = globalSettings.EventLogging.AzureServiceBus.MaxRetries; - _topicName = globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName; - _subscriptionName = subscriptionName; + _maxRetries = maxRetries; + _serviceBusService = serviceBusService; - _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); - _processor = _client.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); - _sender = _client.CreateSender(_topicName); + _processor = _serviceBusService.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions()); } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { _processor.ProcessMessageAsync += HandleMessageAsync; - _processor.ProcessErrorAsync += args => - { - _logger.LogError(args.Exception, "Azure Service Bus error"); - return Task.CompletedTask; - }; + _processor.ProcessErrorAsync += ProcessErrorAsync; await _processor.StartProcessingAsync(cancellationToken); } @@ -51,51 +41,67 @@ public class AzureServiceBusIntegrationListenerService : BackgroundService { await _processor.StopProcessingAsync(cancellationToken); await _processor.DisposeAsync(); - await _sender.DisposeAsync(); - await _client.DisposeAsync(); await base.StopAsync(cancellationToken); } - private async Task HandleMessageAsync(ProcessMessageEventArgs args) + internal Task ProcessErrorAsync(ProcessErrorEventArgs args) { - var json = args.Message.Body.ToString(); + _logger.LogError( + args.Exception, + "An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}", + args.EntityPath, + args.ErrorSource + ); + return Task.CompletedTask; + } + internal async Task HandleMessageAsync(string body) + { try { - var result = await _handler.HandleAsync(json); + var result = await _handler.HandleAsync(body); var message = result.Message; if (result.Success) { - await args.CompleteMessageAsync(args.Message); - return; + // Successful integration. Return true to indicate the message has been handled + return true; } message.ApplyRetry(result.DelayUntilDate); if (result.Retryable && message.RetryCount < _maxRetries) { - var scheduledTime = (DateTime)message.DelayUntilDate!; - var retryMsg = new ServiceBusMessage(message.ToJson()) - { - Subject = args.Message.Subject, - ScheduledEnqueueTime = scheduledTime - }; - - await _sender.SendMessageAsync(retryMsg); + // Publish message to the retry queue. It will be re-published for retry after a delay + // Return true to indicate the message has been handled + await _serviceBusService.PublishToRetryAsync(message); + return true; } else { - await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable"); - return; + // Non-recoverable failure or exceeded the max number of retries + // Return false to indicate this message should be dead-lettered + return false; } - - await args.CompleteMessageAsync(args.Message); } catch (Exception ex) { + // Unknown exception - log error, return true so the message will be acknowledged and not resent _logger.LogError(ex, "Unhandled error processing ASB message"); + return true; + } + } + + private async Task HandleMessageAsync(ProcessMessageEventArgs args) + { + var json = args.Message.Body.ToString(); + if (await HandleMessageAsync(json)) + { await args.CompleteMessageAsync(args.Message); } + else + { + await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable"); + } } } diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationPublisher.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationPublisher.cs deleted file mode 100644 index 4a906e719f..0000000000 --- a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusIntegrationPublisher.cs +++ /dev/null @@ -1,36 +0,0 @@ -using Azure.Messaging.ServiceBus; -using Bit.Core.AdminConsole.Models.Data.Integrations; -using Bit.Core.Enums; -using Bit.Core.Settings; - -namespace Bit.Core.Services; - -public class AzureServiceBusIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable -{ - private readonly ServiceBusClient _client; - private readonly ServiceBusSender _sender; - - public AzureServiceBusIntegrationPublisher(GlobalSettings globalSettings) - { - _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); - _sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName); - } - - public async Task PublishAsync(IIntegrationMessage message) - { - var json = message.ToJson(); - - var serviceBusMessage = new ServiceBusMessage(json) - { - Subject = message.IntegrationType.ToRoutingKey(), - }; - - await _sender.SendMessageAsync(serviceBusMessage); - } - - public async ValueTask DisposeAsync() - { - await _sender.DisposeAsync(); - await _client.DisposeAsync(); - } -} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureServiceBusService.cs b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusService.cs new file mode 100644 index 0000000000..7d24095819 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/AzureServiceBusService.cs @@ -0,0 +1,70 @@ +using Azure.Messaging.ServiceBus; +using Bit.Core.AdminConsole.Models.Data.Integrations; +using Bit.Core.Enums; +using Bit.Core.Settings; + +namespace Bit.Core.Services; + +public class AzureServiceBusService : IAzureServiceBusService +{ + private readonly ServiceBusClient _client; + private readonly ServiceBusSender _eventSender; + private readonly ServiceBusSender _integrationSender; + + public AzureServiceBusService(GlobalSettings globalSettings) + { + _client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); + _eventSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName); + _integrationSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName); + } + + public ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options) + { + return _client.CreateProcessor(topicName, subscriptionName, options); + } + + public async Task PublishAsync(IIntegrationMessage message) + { + var json = message.ToJson(); + + var serviceBusMessage = new ServiceBusMessage(json) + { + Subject = message.IntegrationType.ToRoutingKey(), + MessageId = message.MessageId + }; + + await _integrationSender.SendMessageAsync(serviceBusMessage); + } + + public async Task PublishToRetryAsync(IIntegrationMessage message) + { + var json = message.ToJson(); + + var serviceBusMessage = new ServiceBusMessage(json) + { + Subject = message.IntegrationType.ToRoutingKey(), + ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow, + MessageId = message.MessageId + }; + + await _integrationSender.SendMessageAsync(serviceBusMessage); + } + + public async Task PublishEventAsync(string body) + { + var message = new ServiceBusMessage(body) + { + ContentType = "application/json", + MessageId = Guid.NewGuid().ToString() + }; + + await _eventSender.SendMessageAsync(message); + } + + public async ValueTask DisposeAsync() + { + await _eventSender.DisposeAsync(); + await _integrationSender.DisposeAsync(); + await _client.DisposeAsync(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs index aa545913b1..578dde9485 100644 --- a/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/AzureTableStorageEventHandler.cs @@ -1,4 +1,6 @@ -using Bit.Core.Models.Data; +#nullable enable + +using Bit.Core.Models.Data; using Microsoft.Extensions.DependencyInjection; namespace Bit.Core.Services; diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrationEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrationEventWriteService.cs new file mode 100644 index 0000000000..519f8aeb32 --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrationEventWriteService.cs @@ -0,0 +1,32 @@ +#nullable enable + +using System.Text.Json; +using Bit.Core.Models.Data; + +namespace Bit.Core.Services; +public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDisposable +{ + private readonly IEventIntegrationPublisher _eventIntegrationPublisher; + + public EventIntegrationEventWriteService(IEventIntegrationPublisher eventIntegrationPublisher) + { + _eventIntegrationPublisher = eventIntegrationPublisher; + } + + public async Task CreateAsync(IEvent e) + { + var body = JsonSerializer.Serialize(e); + await _eventIntegrationPublisher.PublishEventAsync(body: body); + } + + public async Task CreateManyAsync(IEnumerable events) + { + var body = JsonSerializer.Serialize(events); + await _eventIntegrationPublisher.PublishEventAsync(body: body); + } + + public async ValueTask DisposeAsync() + { + await _eventIntegrationPublisher.DisposeAsync(); + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/EventIntegrationHandler.cs b/src/Core/AdminConsole/Services/Implementations/EventIntegrationHandler.cs index 9a80ed67b2..aa76fdf8bc 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventIntegrationHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventIntegrationHandler.cs @@ -1,4 +1,6 @@ -using System.Text.Json; +#nullable enable + +using System.Text.Json; using Bit.Core.AdminConsole.Models.Data.Integrations; using Bit.Core.AdminConsole.Utilities; using Bit.Core.Enums; @@ -7,11 +9,9 @@ using Bit.Core.Repositories; namespace Bit.Core.Services; -#nullable enable - public class EventIntegrationHandler( IntegrationType integrationType, - IIntegrationPublisher integrationPublisher, + IEventIntegrationPublisher eventIntegrationPublisher, IOrganizationIntegrationConfigurationRepository configurationRepository, IUserRepository userRepository, IOrganizationRepository organizationRepository) @@ -34,6 +34,7 @@ public class EventIntegrationHandler( var template = configuration.Template ?? string.Empty; var context = await BuildContextAsync(eventMessage, template); var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context); + var messageId = eventMessage.IdempotencyId ?? Guid.NewGuid(); var config = configuration.MergedConfiguration.Deserialize() ?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}"); @@ -41,13 +42,14 @@ public class EventIntegrationHandler( var message = new IntegrationMessage { IntegrationType = integrationType, + MessageId = messageId.ToString(), Configuration = config, RenderedTemplate = renderedTemplate, RetryCount = 0, DelayUntilDate = null }; - await integrationPublisher.PublishAsync(message); + await eventIntegrationPublisher.PublishAsync(message); } } diff --git a/src/Core/AdminConsole/Services/Implementations/EventRepositoryHandler.cs b/src/Core/AdminConsole/Services/Implementations/EventRepositoryHandler.cs index ee3a2d5db2..0fab787589 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventRepositoryHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventRepositoryHandler.cs @@ -1,4 +1,6 @@ -using Bit.Core.Models.Data; +#nullable enable + +using Bit.Core.Models.Data; using Microsoft.Extensions.DependencyInjection; namespace Bit.Core.Services; diff --git a/src/Core/AdminConsole/Services/Implementations/EventRouteService.cs b/src/Core/AdminConsole/Services/Implementations/EventRouteService.cs index a542e75a7b..df0819b409 100644 --- a/src/Core/AdminConsole/Services/Implementations/EventRouteService.cs +++ b/src/Core/AdminConsole/Services/Implementations/EventRouteService.cs @@ -1,4 +1,6 @@ -using Bit.Core.Models.Data; +#nullable enable + +using Bit.Core.Models.Data; using Microsoft.Extensions.DependencyInjection; namespace Bit.Core.Services; diff --git a/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs index 74833f38a0..bc2329930d 100644 --- a/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventListenerService.cs @@ -1,7 +1,6 @@ -using System.Text; -using System.Text.Json; -using Bit.Core.Models.Data; -using Bit.Core.Settings; +#nullable enable + +using System.Text; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -10,94 +9,60 @@ namespace Bit.Core.Services; public class RabbitMqEventListenerService : EventLoggingListenerService { - private IChannel _channel; - private IConnection _connection; - private readonly string _exchangeName; - private readonly ConnectionFactory _factory; - private readonly ILogger _logger; + private readonly Lazy> _lazyChannel; private readonly string _queueName; + private readonly IRabbitMqService _rabbitMqService; public RabbitMqEventListenerService( IEventMessageHandler handler, - ILogger logger, - GlobalSettings globalSettings, - string queueName) : base(handler) + string queueName, + IRabbitMqService rabbitMqService, + ILogger logger) : base(handler, logger) { - _factory = new ConnectionFactory - { - HostName = globalSettings.EventLogging.RabbitMq.HostName, - UserName = globalSettings.EventLogging.RabbitMq.Username, - Password = globalSettings.EventLogging.RabbitMq.Password - }; - _exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName; _logger = logger; _queueName = queueName; + _rabbitMqService = rabbitMqService; + _lazyChannel = new Lazy>(() => _rabbitMqService.CreateChannelAsync()); } public override async Task StartAsync(CancellationToken cancellationToken) { - _connection = await _factory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - - await _channel.ExchangeDeclareAsync(exchange: _exchangeName, - type: ExchangeType.Fanout, - durable: true, - cancellationToken: cancellationToken); - await _channel.QueueDeclareAsync(queue: _queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null, - cancellationToken: cancellationToken); - await _channel.QueueBindAsync(queue: _queueName, - exchange: _exchangeName, - routingKey: string.Empty, - cancellationToken: cancellationToken); + await _rabbitMqService.CreateEventQueueAsync(_queueName, cancellationToken); await base.StartAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { - var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.ReceivedAsync += async (_, eventArgs) => - { - try - { - using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(eventArgs.Body.Span)); - var root = jsonDocument.RootElement; + var channel = await _lazyChannel.Value; + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += async (_, eventArgs) => { await ProcessReceivedMessageAsync(eventArgs); }; - if (root.ValueKind == JsonValueKind.Array) - { - var eventMessages = root.Deserialize>(); - await _handler.HandleManyEventsAsync(eventMessages); - } - else if (root.ValueKind == JsonValueKind.Object) - { - var eventMessage = root.Deserialize(); - await _handler.HandleEventAsync(eventMessage); + await channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken); + } - } - } - catch (Exception ex) - { - _logger.LogError(ex, "An error occurred while processing the message"); - } - }; - - await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken); + internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs eventArgs) + { + await ProcessReceivedMessageAsync( + Encoding.UTF8.GetString(eventArgs.Body.Span), + eventArgs.BasicProperties.MessageId); } public override async Task StopAsync(CancellationToken cancellationToken) { - await _channel.CloseAsync(cancellationToken); - await _connection.CloseAsync(cancellationToken); + if (_lazyChannel.IsValueCreated) + { + var channel = await _lazyChannel.Value; + await channel.CloseAsync(cancellationToken); + } await base.StopAsync(cancellationToken); } public override void Dispose() { - _channel.Dispose(); - _connection.Dispose(); + if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully) + { + _lazyChannel.Value.Result.Dispose(); + } base.Dispose(); } } diff --git a/src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs deleted file mode 100644 index 05fcf71a92..0000000000 --- a/src/Core/AdminConsole/Services/Implementations/RabbitMqEventWriteService.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System.Text.Json; -using Bit.Core.Models.Data; -using Bit.Core.Settings; -using RabbitMQ.Client; - -namespace Bit.Core.Services; -public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable -{ - private readonly ConnectionFactory _factory; - private readonly Lazy> _lazyConnection; - private readonly string _exchangeName; - - public RabbitMqEventWriteService(GlobalSettings globalSettings) - { - _factory = new ConnectionFactory - { - HostName = globalSettings.EventLogging.RabbitMq.HostName, - UserName = globalSettings.EventLogging.RabbitMq.Username, - Password = globalSettings.EventLogging.RabbitMq.Password - }; - _exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName; - - _lazyConnection = new Lazy>(CreateConnectionAsync); - } - - public async Task CreateAsync(IEvent e) - { - var connection = await _lazyConnection.Value; - using var channel = await connection.CreateChannelAsync(); - - await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); - - var body = JsonSerializer.SerializeToUtf8Bytes(e); - - await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body); - } - - public async Task CreateManyAsync(IEnumerable events) - { - var connection = await _lazyConnection.Value; - using var channel = await connection.CreateChannelAsync(); - await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); - - var body = JsonSerializer.SerializeToUtf8Bytes(events); - - await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body); - } - - public async ValueTask DisposeAsync() - { - if (_lazyConnection.IsValueCreated) - { - var connection = await _lazyConnection.Value; - await connection.DisposeAsync(); - } - } - - private async Task CreateConnectionAsync() - { - return await _factory.CreateConnectionAsync(); - } -} diff --git a/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationListenerService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationListenerService.cs index 1d6910db95..5b18d8817c 100644 --- a/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationListenerService.cs +++ b/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationListenerService.cs @@ -1,5 +1,8 @@ -using System.Text; -using Bit.Core.Settings; +#nullable enable + +using System.Text; +using System.Text.Json; +using Bit.Core.AdminConsole.Models.Data.Integrations; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using RabbitMQ.Client; @@ -9,183 +12,137 @@ namespace Bit.Core.Services; public class RabbitMqIntegrationListenerService : BackgroundService { - private const string _deadLetterRoutingKey = "dead-letter"; - private IChannel _channel; - private IConnection _connection; - private readonly string _exchangeName; - private readonly string _queueName; - private readonly string _retryQueueName; - private readonly string _deadLetterQueueName; - private readonly string _routingKey; - private readonly string _retryRoutingKey; private readonly int _maxRetries; + private readonly string _queueName; + private readonly string _routingKey; + private readonly string _retryQueueName; private readonly IIntegrationHandler _handler; - private readonly ConnectionFactory _factory; + private readonly Lazy> _lazyChannel; + private readonly IRabbitMqService _rabbitMqService; private readonly ILogger _logger; - private readonly int _retryTiming; public RabbitMqIntegrationListenerService(IIntegrationHandler handler, string routingKey, string queueName, string retryQueueName, - string deadLetterQueueName, - GlobalSettings globalSettings, + int maxRetries, + IRabbitMqService rabbitMqService, ILogger logger) { _handler = handler; _routingKey = routingKey; - _retryRoutingKey = $"{_routingKey}-retry"; - _queueName = queueName; _retryQueueName = retryQueueName; - _deadLetterQueueName = deadLetterQueueName; + _queueName = queueName; + _rabbitMqService = rabbitMqService; _logger = logger; - _exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName; - _maxRetries = globalSettings.EventLogging.RabbitMq.MaxRetries; - _retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming; - - _factory = new ConnectionFactory - { - HostName = globalSettings.EventLogging.RabbitMq.HostName, - UserName = globalSettings.EventLogging.RabbitMq.Username, - Password = globalSettings.EventLogging.RabbitMq.Password - }; + _maxRetries = maxRetries; + _lazyChannel = new Lazy>(() => _rabbitMqService.CreateChannelAsync()); } public override async Task StartAsync(CancellationToken cancellationToken) { - _connection = await _factory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - - await _channel.ExchangeDeclareAsync(exchange: _exchangeName, - type: ExchangeType.Direct, - durable: true, - cancellationToken: cancellationToken); - - // Declare main queue - await _channel.QueueDeclareAsync(queue: _queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null, - cancellationToken: cancellationToken); - await _channel.QueueBindAsync(queue: _queueName, - exchange: _exchangeName, - routingKey: _routingKey, - cancellationToken: cancellationToken); - - // Declare retry queue (Configurable TTL, dead-letters back to main queue) - await _channel.QueueDeclareAsync(queue: _retryQueueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: new Dictionary - { - { "x-dead-letter-exchange", _exchangeName }, - { "x-dead-letter-routing-key", _routingKey }, - { "x-message-ttl", _retryTiming } - }, - cancellationToken: cancellationToken); - await _channel.QueueBindAsync(queue: _retryQueueName, - exchange: _exchangeName, - routingKey: _retryRoutingKey, - cancellationToken: cancellationToken); - - // Declare dead letter queue - await _channel.QueueDeclareAsync(queue: _deadLetterQueueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null, - cancellationToken: cancellationToken); - await _channel.QueueBindAsync(queue: _deadLetterQueueName, - exchange: _exchangeName, - routingKey: _deadLetterRoutingKey, - cancellationToken: cancellationToken); + await _rabbitMqService.CreateIntegrationQueuesAsync( + _queueName, + _retryQueueName, + _routingKey, + cancellationToken: cancellationToken); await base.StartAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { - var consumer = new AsyncEventingBasicConsumer(_channel); + var channel = await _lazyChannel.Value; + var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (_, ea) => + { + await ProcessReceivedMessageAsync(ea, cancellationToken); + }; + + await channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken); + } + + internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs ea, CancellationToken cancellationToken) + { + var channel = await _lazyChannel.Value; + try { var json = Encoding.UTF8.GetString(ea.Body.Span); - try + // Determine if the message came off of the retry queue too soon + // If so, place it back on the retry queue + var integrationMessage = JsonSerializer.Deserialize(json); + if (integrationMessage is not null && + integrationMessage.DelayUntilDate.HasValue && + integrationMessage.DelayUntilDate.Value > DateTime.UtcNow) { - var result = await _handler.HandleAsync(json); - var message = result.Message; + await _rabbitMqService.RepublishToRetryQueueAsync(channel, ea); + await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); + return; + } - if (result.Success) + var result = await _handler.HandleAsync(json); + var message = result.Message; + + if (result.Success) + { + // Successful integration send. Acknowledge message delivery and return + await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); + return; + } + + if (result.Retryable) + { + // Integration failed, but is retryable - apply delay and check max retries + message.ApplyRetry(result.DelayUntilDate); + + if (message.RetryCount < _maxRetries) { - // Successful integration send. Acknowledge message delivery and return - await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); - return; - } - - if (result.Retryable) - { - // Integration failed, but is retryable - apply delay and check max retries - message.ApplyRetry(result.DelayUntilDate); - - if (message.RetryCount < _maxRetries) - { - // Publish message to the retry queue. It will be re-published for retry after a delay - await _channel.BasicPublishAsync( - exchange: _exchangeName, - routingKey: _retryRoutingKey, - body: Encoding.UTF8.GetBytes(message.ToJson()), - cancellationToken: cancellationToken); - } - else - { - // Exceeded the max number of retries; fail and send to dead letter queue - await PublishToDeadLetterAsync(message.ToJson()); - _logger.LogWarning("Max retry attempts reached. Sent to DLQ."); - } + // Publish message to the retry queue. It will be re-published for retry after a delay + await _rabbitMqService.PublishToRetryAsync(channel, message, cancellationToken); } else { - // Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries - await PublishToDeadLetterAsync(message.ToJson()); - _logger.LogWarning("Non-retryable failure. Sent to DLQ."); + // Exceeded the max number of retries; fail and send to dead letter queue + await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken); + _logger.LogWarning("Max retry attempts reached. Sent to DLQ."); } - - // Message has been sent to retry or dead letter queues. - // Acknowledge receipt so Rabbit knows it's been processed - await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); } - catch (Exception ex) + else { - // Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error - _logger.LogError(ex, "Unhandled error processing integration message."); - await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); + // Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries + await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken); + _logger.LogWarning("Non-retryable failure. Sent to DLQ."); } - }; - await _channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken); - } - - private async Task PublishToDeadLetterAsync(string json) - { - await _channel.BasicPublishAsync( - exchange: _exchangeName, - routingKey: _deadLetterRoutingKey, - body: Encoding.UTF8.GetBytes(json)); + // Message has been sent to retry or dead letter queues. + // Acknowledge receipt so Rabbit knows it's been processed + await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); + } + catch (Exception ex) + { + // Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error + _logger.LogError(ex, "Unhandled error processing integration message."); + await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); + } } public override async Task StopAsync(CancellationToken cancellationToken) { - await _channel.CloseAsync(cancellationToken); - await _connection.CloseAsync(cancellationToken); + if (_lazyChannel.IsValueCreated) + { + var channel = await _lazyChannel.Value; + await channel.CloseAsync(cancellationToken); + } await base.StopAsync(cancellationToken); } public override void Dispose() { - _channel.Dispose(); - _connection.Dispose(); + if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully) + { + _lazyChannel.Value.Result.Dispose(); + } base.Dispose(); } } diff --git a/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationPublisher.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationPublisher.cs deleted file mode 100644 index 12801e3216..0000000000 --- a/src/Core/AdminConsole/Services/Implementations/RabbitMqIntegrationPublisher.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System.Text; -using Bit.Core.AdminConsole.Models.Data.Integrations; -using Bit.Core.Enums; -using Bit.Core.Settings; -using RabbitMQ.Client; - -namespace Bit.Core.Services; - -public class RabbitMqIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable -{ - private readonly ConnectionFactory _factory; - private readonly Lazy> _lazyConnection; - private readonly string _exchangeName; - - public RabbitMqIntegrationPublisher(GlobalSettings globalSettings) - { - _factory = new ConnectionFactory - { - HostName = globalSettings.EventLogging.RabbitMq.HostName, - UserName = globalSettings.EventLogging.RabbitMq.Username, - Password = globalSettings.EventLogging.RabbitMq.Password - }; - _exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName; - - _lazyConnection = new Lazy>(CreateConnectionAsync); - } - - public async Task PublishAsync(IIntegrationMessage message) - { - var routingKey = message.IntegrationType.ToRoutingKey(); - var connection = await _lazyConnection.Value; - await using var channel = await connection.CreateChannelAsync(); - - await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Direct, durable: true); - - var body = Encoding.UTF8.GetBytes(message.ToJson()); - - await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: routingKey, body: body); - } - - public async ValueTask DisposeAsync() - { - if (_lazyConnection.IsValueCreated) - { - var connection = await _lazyConnection.Value; - await connection.DisposeAsync(); - } - } - - private async Task CreateConnectionAsync() - { - return await _factory.CreateConnectionAsync(); - } -} diff --git a/src/Core/AdminConsole/Services/Implementations/RabbitMqService.cs b/src/Core/AdminConsole/Services/Implementations/RabbitMqService.cs new file mode 100644 index 0000000000..617d1b41fb --- /dev/null +++ b/src/Core/AdminConsole/Services/Implementations/RabbitMqService.cs @@ -0,0 +1,244 @@ +#nullable enable + +using System.Text; +using Bit.Core.AdminConsole.Models.Data.Integrations; +using Bit.Core.Enums; +using Bit.Core.Settings; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Bit.Core.Services; + +public class RabbitMqService : IRabbitMqService +{ + private const string _deadLetterRoutingKey = "dead-letter"; + + private readonly ConnectionFactory _factory; + private readonly Lazy> _lazyConnection; + private readonly string _deadLetterQueueName; + private readonly string _eventExchangeName; + private readonly string _integrationExchangeName; + private readonly int _retryTiming; + private readonly bool _useDelayPlugin; + + public RabbitMqService(GlobalSettings globalSettings) + { + _factory = new ConnectionFactory + { + HostName = globalSettings.EventLogging.RabbitMq.HostName, + UserName = globalSettings.EventLogging.RabbitMq.Username, + Password = globalSettings.EventLogging.RabbitMq.Password + }; + _deadLetterQueueName = globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName; + _eventExchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName; + _integrationExchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName; + _retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming; + _useDelayPlugin = globalSettings.EventLogging.RabbitMq.UseDelayPlugin; + + _lazyConnection = new Lazy>(CreateConnectionAsync); + } + + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) + { + var connection = await _lazyConnection.Value; + return await connection.CreateChannelAsync(cancellationToken: cancellationToken); + } + + public async Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default) + { + using var channel = await CreateChannelAsync(cancellationToken); + await channel.QueueDeclareAsync(queue: queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null, + cancellationToken: cancellationToken); + await channel.QueueBindAsync(queue: queueName, + exchange: _eventExchangeName, + routingKey: string.Empty, + cancellationToken: cancellationToken); + } + + public async Task CreateIntegrationQueuesAsync( + string queueName, + string retryQueueName, + string routingKey, + CancellationToken cancellationToken = default) + { + using var channel = await CreateChannelAsync(cancellationToken); + var retryRoutingKey = $"{routingKey}-retry"; + + // Declare main integration queue + await channel.QueueDeclareAsync( + queue: queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null, + cancellationToken: cancellationToken); + await channel.QueueBindAsync( + queue: queueName, + exchange: _integrationExchangeName, + routingKey: routingKey, + cancellationToken: cancellationToken); + + if (!_useDelayPlugin) + { + // Declare retry queue (Configurable TTL, dead-letters back to main queue) + // Only needed if NOT using delay plugin + await channel.QueueDeclareAsync(queue: retryQueueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: new Dictionary + { + { "x-dead-letter-exchange", _integrationExchangeName }, + { "x-dead-letter-routing-key", routingKey }, + { "x-message-ttl", _retryTiming } + }, + cancellationToken: cancellationToken); + await channel.QueueBindAsync(queue: retryQueueName, + exchange: _integrationExchangeName, + routingKey: retryRoutingKey, + cancellationToken: cancellationToken); + } + } + + public async Task PublishAsync(IIntegrationMessage message) + { + var routingKey = message.IntegrationType.ToRoutingKey(); + await using var channel = await CreateChannelAsync(); + + var body = Encoding.UTF8.GetBytes(message.ToJson()); + var properties = new BasicProperties + { + MessageId = message.MessageId, + Persistent = true + }; + + await channel.BasicPublishAsync( + exchange: _integrationExchangeName, + mandatory: true, + basicProperties: properties, + routingKey: routingKey, + body: body); + } + + public async Task PublishEventAsync(string body) + { + await using var channel = await CreateChannelAsync(); + var properties = new BasicProperties + { + MessageId = Guid.NewGuid().ToString(), + Persistent = true + }; + + await channel.BasicPublishAsync( + exchange: _eventExchangeName, + mandatory: true, + basicProperties: properties, + routingKey: string.Empty, + body: Encoding.UTF8.GetBytes(body)); + } + + public async Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken) + { + var routingKey = message.IntegrationType.ToRoutingKey(); + var retryRoutingKey = $"{routingKey}-retry"; + var properties = new BasicProperties + { + Persistent = true, + MessageId = message.MessageId, + Headers = _useDelayPlugin && message.DelayUntilDate.HasValue ? + new Dictionary + { + ["x-delay"] = Math.Max((int)(message.DelayUntilDate.Value - DateTime.UtcNow).TotalMilliseconds, 0) + } : + null + }; + + await channel.BasicPublishAsync( + exchange: _integrationExchangeName, + routingKey: _useDelayPlugin ? routingKey : retryRoutingKey, + mandatory: true, + basicProperties: properties, + body: Encoding.UTF8.GetBytes(message.ToJson()), + cancellationToken: cancellationToken); + } + + public async Task PublishToDeadLetterAsync( + IChannel channel, + IIntegrationMessage message, + CancellationToken cancellationToken) + { + var properties = new BasicProperties + { + MessageId = message.MessageId, + Persistent = true + }; + + await channel.BasicPublishAsync( + exchange: _integrationExchangeName, + mandatory: true, + basicProperties: properties, + routingKey: _deadLetterRoutingKey, + body: Encoding.UTF8.GetBytes(message.ToJson()), + cancellationToken: cancellationToken); + } + + public async Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs) + { + await channel.BasicPublishAsync( + exchange: _integrationExchangeName, + routingKey: eventArgs.RoutingKey, + mandatory: true, + basicProperties: new BasicProperties(eventArgs.BasicProperties), + body: eventArgs.Body); + } + + public async ValueTask DisposeAsync() + { + if (_lazyConnection.IsValueCreated) + { + var connection = await _lazyConnection.Value; + await connection.DisposeAsync(); + } + } + + private async Task CreateConnectionAsync() + { + var connection = await _factory.CreateConnectionAsync(); + using var channel = await connection.CreateChannelAsync(); + + // Declare Exchanges + await channel.ExchangeDeclareAsync(exchange: _eventExchangeName, type: ExchangeType.Fanout, durable: true); + if (_useDelayPlugin) + { + await channel.ExchangeDeclareAsync( + exchange: _integrationExchangeName, + type: "x-delayed-message", + durable: true, + arguments: new Dictionary + { + { "x-delayed-type", "direct" } + } + ); + } + else + { + await channel.ExchangeDeclareAsync(exchange: _integrationExchangeName, type: ExchangeType.Direct, durable: true); + } + + // Declare dead letter queue for Integration exchange + await channel.QueueDeclareAsync(queue: _deadLetterQueueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null); + await channel.QueueBindAsync(queue: _deadLetterQueueName, + exchange: _integrationExchangeName, + routingKey: _deadLetterRoutingKey); + + return connection; + } +} diff --git a/src/Core/AdminConsole/Services/Implementations/SlackIntegrationHandler.cs b/src/Core/AdminConsole/Services/Implementations/SlackIntegrationHandler.cs index 134e93e838..fe0f6eabe1 100644 --- a/src/Core/AdminConsole/Services/Implementations/SlackIntegrationHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/SlackIntegrationHandler.cs @@ -1,4 +1,6 @@ -using Bit.Core.AdminConsole.Models.Data.Integrations; +#nullable enable + +using Bit.Core.AdminConsole.Models.Data.Integrations; namespace Bit.Core.Services; diff --git a/src/Core/AdminConsole/Services/Implementations/SlackService.cs b/src/Core/AdminConsole/Services/Implementations/SlackService.cs index effcfdf1ce..3f82217830 100644 --- a/src/Core/AdminConsole/Services/Implementations/SlackService.cs +++ b/src/Core/AdminConsole/Services/Implementations/SlackService.cs @@ -1,4 +1,6 @@ -using System.Net.Http.Headers; +#nullable enable + +using System.Net.Http.Headers; using System.Net.Http.Json; using System.Web; using Bit.Core.Models.Slack; @@ -22,7 +24,7 @@ public class SlackService( public async Task GetChannelIdAsync(string token, string channelName) { - return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault(); + return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault() ?? string.Empty; } public async Task> GetChannelIdsAsync(string token, List channelNames) @@ -58,7 +60,7 @@ public class SlackService( } else { - logger.LogError("Error getting Channel Ids: {Error}", result.Error); + logger.LogError("Error getting Channel Ids: {Error}", result?.Error ?? "Unknown Error"); nextCursor = string.Empty; } @@ -89,7 +91,7 @@ public class SlackService( new KeyValuePair("redirect_uri", redirectUrl) })); - SlackOAuthResponse result; + SlackOAuthResponse? result; try { result = await tokenResponse.Content.ReadFromJsonAsync(); @@ -99,7 +101,7 @@ public class SlackService( result = null; } - if (result == null) + if (result is null) { logger.LogError("Error obtaining token via OAuth: Unknown error"); return string.Empty; @@ -130,6 +132,11 @@ public class SlackService( var response = await _httpClient.SendAsync(request); var result = await response.Content.ReadFromJsonAsync(); + if (result is null) + { + logger.LogError("Error retrieving Slack user ID: Unknown error"); + return string.Empty; + } if (!result.Ok) { logger.LogError("Error retrieving Slack user ID: {Error}", result.Error); @@ -151,6 +158,11 @@ public class SlackService( var response = await _httpClient.SendAsync(request); var result = await response.Content.ReadFromJsonAsync(); + if (result is null) + { + logger.LogError("Error opening DM channel: Unknown error"); + return string.Empty; + } if (!result.Ok) { logger.LogError("Error opening DM channel: {Error}", result.Error); diff --git a/src/Core/AdminConsole/Services/Implementations/WebhookIntegrationHandler.cs b/src/Core/AdminConsole/Services/Implementations/WebhookIntegrationHandler.cs index 5f9898afe8..df364b2a48 100644 --- a/src/Core/AdminConsole/Services/Implementations/WebhookIntegrationHandler.cs +++ b/src/Core/AdminConsole/Services/Implementations/WebhookIntegrationHandler.cs @@ -1,4 +1,6 @@ -using System.Globalization; +#nullable enable + +using System.Globalization; using System.Net; using System.Text; using Bit.Core.AdminConsole.Models.Data.Integrations; @@ -29,7 +31,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory) case HttpStatusCode.ServiceUnavailable: case HttpStatusCode.GatewayTimeout: result.Retryable = true; - result.FailureReason = response.ReasonPhrase; + result.FailureReason = response.ReasonPhrase ?? $"Failure with status code: {(int)response.StatusCode}"; if (response.Headers.TryGetValues("Retry-After", out var values)) { @@ -52,7 +54,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory) break; default: result.Retryable = false; - result.FailureReason = response.ReasonPhrase; + result.FailureReason = response.ReasonPhrase ?? $"Failure with status code {(int)response.StatusCode}"; break; } diff --git a/src/Core/Settings/GlobalSettings.cs b/src/Core/Settings/GlobalSettings.cs index b821f214db..f08d66c28f 100644 --- a/src/Core/Settings/GlobalSettings.cs +++ b/src/Core/Settings/GlobalSettings.cs @@ -327,6 +327,7 @@ public class GlobalSettings : IGlobalSettings public int MaxRetries { get; set; } = 3; public int RetryTiming { get; set; } = 30000; // 30s + public bool UseDelayPlugin { get; set; } = false; public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue"; public virtual string IntegrationDeadLetterQueueName { get; set; } = "integration-dead-letter-queue"; public virtual string SlackEventsQueueName { get; set; } = "events-slack-queue"; diff --git a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs index 7ee94d0dce..ccf2b5212f 100644 --- a/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs +++ b/src/SharedWeb/Utilities/ServiceCollectionExtensions.cs @@ -550,7 +550,8 @@ public static class ServiceCollectionExtensions if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName)) { - services.AddKeyedSingleton("broadcast"); + services.AddSingleton(); + services.AddKeyedSingleton("broadcast"); } else { @@ -563,7 +564,8 @@ public static class ServiceCollectionExtensions if (IsRabbitMqEnabled(globalSettings)) { - services.AddKeyedSingleton("broadcast"); + services.AddSingleton(); + services.AddKeyedSingleton("broadcast"); } else { @@ -585,13 +587,15 @@ public static class ServiceCollectionExtensions services.AddSingleton(); services.AddSingleton(); services.AddKeyedSingleton("persistent"); - services.AddSingleton(provider => new AzureServiceBusEventListenerService( handler: provider.GetRequiredService(), - logger: provider.GetRequiredService>(), + serviceBusService: provider.GetRequiredService(), + subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName, globalSettings: globalSettings, - subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName)); + logger: provider.GetRequiredService>() + ) + ); return services; } @@ -607,12 +611,10 @@ public static class ServiceCollectionExtensions { var routingKey = integrationType.ToRoutingKey(); - services.AddSingleton(); - services.AddKeyedSingleton(routingKey, (provider, _) => new EventIntegrationHandler( integrationType, - provider.GetRequiredService(), + provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService())); @@ -620,18 +622,22 @@ public static class ServiceCollectionExtensions services.AddSingleton(provider => new AzureServiceBusEventListenerService( handler: provider.GetRequiredKeyedService(routingKey), - logger: provider.GetRequiredService>(), + serviceBusService: provider.GetRequiredService(), + subscriptionName: eventSubscriptionName, globalSettings: globalSettings, - subscriptionName: eventSubscriptionName)); + logger: provider.GetRequiredService>() + ) + ); services.AddSingleton, THandler>(); - services.AddSingleton(provider => new AzureServiceBusIntegrationListenerService( handler: provider.GetRequiredService>(), + topicName: globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName, subscriptionName: integrationSubscriptionName, - logger: provider.GetRequiredService>(), - globalSettings: globalSettings)); + maxRetries: globalSettings.EventLogging.AzureServiceBus.MaxRetries, + serviceBusService: provider.GetRequiredService(), + logger: provider.GetRequiredService>())); return services; } @@ -642,6 +648,8 @@ public static class ServiceCollectionExtensions !CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName)) return services; + services.AddSingleton(); + services.AddSingleton(); services.AddAzureServiceBusEventRepositoryListener(globalSettings); services.AddSlackService(globalSettings); @@ -668,9 +676,9 @@ public static class ServiceCollectionExtensions services.AddSingleton(provider => new RabbitMqEventListenerService( provider.GetRequiredService(), - provider.GetRequiredService>(), - globalSettings, - globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName)); + globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName, + provider.GetRequiredService(), + provider.GetRequiredService>())); return services; } @@ -679,19 +687,17 @@ public static class ServiceCollectionExtensions string eventQueueName, string integrationQueueName, string integrationRetryQueueName, - string integrationDeadLetterQueueName, - IntegrationType integrationType, - GlobalSettings globalSettings) + int maxRetries, + IntegrationType integrationType) where TConfig : class where THandler : class, IIntegrationHandler { var routingKey = integrationType.ToRoutingKey(); - services.AddSingleton(); services.AddKeyedSingleton(routingKey, (provider, _) => new EventIntegrationHandler( integrationType, - provider.GetRequiredService(), + provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService())); @@ -699,9 +705,9 @@ public static class ServiceCollectionExtensions services.AddSingleton(provider => new RabbitMqEventListenerService( provider.GetRequiredKeyedService(routingKey), - provider.GetRequiredService>(), - globalSettings, - eventQueueName)); + eventQueueName, + provider.GetRequiredService(), + provider.GetRequiredService>())); services.AddSingleton, THandler>(); services.AddSingleton(provider => @@ -710,8 +716,8 @@ public static class ServiceCollectionExtensions routingKey: routingKey, queueName: integrationQueueName, retryQueueName: integrationRetryQueueName, - deadLetterQueueName: integrationDeadLetterQueueName, - globalSettings: globalSettings, + maxRetries: maxRetries, + rabbitMqService: provider.GetRequiredService(), logger: provider.GetRequiredService>())); return services; @@ -724,6 +730,8 @@ public static class ServiceCollectionExtensions return services; } + services.AddSingleton(); + services.AddSingleton(); services.AddRabbitMqEventRepositoryListener(globalSettings); services.AddSlackService(globalSettings); @@ -731,18 +739,16 @@ public static class ServiceCollectionExtensions globalSettings.EventLogging.RabbitMq.SlackEventsQueueName, globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName, globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName, - globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName, - IntegrationType.Slack, - globalSettings); + globalSettings.EventLogging.RabbitMq.MaxRetries, + IntegrationType.Slack); services.AddHttpClient(WebhookIntegrationHandler.HttpClientName); services.AddRabbitMqIntegration( globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName, globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName, globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName, - globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName, - IntegrationType.Webhook, - globalSettings); + globalSettings.EventLogging.RabbitMq.MaxRetries, + IntegrationType.Webhook); return services; } diff --git a/test/Core.Test/AdminConsole/Models/Data/Integrations/IntegrationMessageTests.cs b/test/Core.Test/AdminConsole/Models/Data/Integrations/IntegrationMessageTests.cs index 0946841347..6ed84717de 100644 --- a/test/Core.Test/AdminConsole/Models/Data/Integrations/IntegrationMessageTests.cs +++ b/test/Core.Test/AdminConsole/Models/Data/Integrations/IntegrationMessageTests.cs @@ -7,12 +7,17 @@ namespace Bit.Core.Test.Models.Data.Integrations; public class IntegrationMessageTests { + private const string _messageId = "TestMessageId"; + [Fact] public void ApplyRetry_IncrementsRetryCountAndSetsDelayUntilDate() { var message = new IntegrationMessage { + Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"), + MessageId = _messageId, RetryCount = 2, + RenderedTemplate = string.Empty, DelayUntilDate = null }; @@ -30,19 +35,22 @@ public class IntegrationMessageTests var message = new IntegrationMessage { Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"), + MessageId = _messageId, RenderedTemplate = "This is the message", IntegrationType = IntegrationType.Webhook, RetryCount = 2, - DelayUntilDate = null + DelayUntilDate = DateTime.UtcNow }; var json = message.ToJson(); var result = IntegrationMessage.FromJson(json); Assert.Equal(message.Configuration, result.Configuration); + Assert.Equal(message.MessageId, result.MessageId); Assert.Equal(message.RenderedTemplate, result.RenderedTemplate); Assert.Equal(message.IntegrationType, result.IntegrationType); Assert.Equal(message.RetryCount, result.RetryCount); + Assert.Equal(message.DelayUntilDate, result.DelayUntilDate); } [Fact] @@ -51,4 +59,26 @@ public class IntegrationMessageTests var json = "{ Invalid JSON"; Assert.Throws(() => IntegrationMessage.FromJson(json)); } + + [Fact] + public void ToJson_BaseIntegrationMessage_DeserializesCorrectly() + { + var message = new IntegrationMessage + { + MessageId = _messageId, + RenderedTemplate = "This is the message", + IntegrationType = IntegrationType.Webhook, + RetryCount = 2, + DelayUntilDate = DateTime.UtcNow + }; + + var json = message.ToJson(); + var result = JsonSerializer.Deserialize(json); + + Assert.Equal(message.MessageId, result.MessageId); + Assert.Equal(message.RenderedTemplate, result.RenderedTemplate); + Assert.Equal(message.IntegrationType, result.IntegrationType); + Assert.Equal(message.RetryCount, result.RetryCount); + Assert.Equal(message.DelayUntilDate, result.DelayUntilDate); + } } diff --git a/test/Core.Test/AdminConsole/Services/AzureServiceBusEventListenerServiceTests.cs b/test/Core.Test/AdminConsole/Services/AzureServiceBusEventListenerServiceTests.cs new file mode 100644 index 0000000000..13704817ca --- /dev/null +++ b/test/Core.Test/AdminConsole/Services/AzureServiceBusEventListenerServiceTests.cs @@ -0,0 +1,133 @@ +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Bit.Test.Common.AutoFixture; +using Bit.Test.Common.AutoFixture.Attributes; +using Bit.Test.Common.Helpers; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Xunit; + +namespace Bit.Core.Test.Services; + +[SutProviderCustomize] +public class AzureServiceBusEventListenerServiceTests +{ + private readonly IEventMessageHandler _handler = Substitute.For(); + private readonly ILogger _logger = + Substitute.For>(); + private const string _messageId = "messageId"; + + private SutProvider GetSutProvider() + { + return new SutProvider() + .SetDependency(_handler) + .SetDependency(_logger) + .SetDependency("test-subscription", "subscriptionName") + .Create(); + } + + [Theory, BitAutoData] + public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args) + { + var sutProvider = GetSutProvider(); + + await sutProvider.Sut.ProcessErrorAsync(args); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_EmptyJson_LogsError() + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync(string.Empty, _messageId); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJson_LogsError() + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync("{ Inavlid JSON }", _messageId); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Is(o => o.ToString().Contains("Invalid JSON")), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJsonArray_LogsError() + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync( + "{ \"not a valid\", \"list of event messages\" }", + _messageId + ); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJsonObject_LogsError() + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync( + JsonSerializer.Serialize(DateTime.UtcNow), // wrong object - not EventMessage + _messageId + ); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_SingleEvent_DelegatesToHandler(EventMessage message) + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync( + JsonSerializer.Serialize(message), + _messageId + ); + + await sutProvider.GetDependency().Received(1).HandleEventAsync( + Arg.Is(AssertHelper.AssertPropertyEqual(message, new[] { "IdempotencyId" }))); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_ManyEvents_DelegatesToHandler(IEnumerable messages) + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessReceivedMessageAsync( + JsonSerializer.Serialize(messages), + _messageId + ); + + await sutProvider.GetDependency().Received(1).HandleManyEventsAsync( + Arg.Is(AssertHelper.AssertPropertyEqual(messages, new[] { "IdempotencyId" }))); + } +} diff --git a/test/Core.Test/AdminConsole/Services/AzureServiceBusIntegrationListenerServiceTests.cs b/test/Core.Test/AdminConsole/Services/AzureServiceBusIntegrationListenerServiceTests.cs new file mode 100644 index 0000000000..b1eb117cf0 --- /dev/null +++ b/test/Core.Test/AdminConsole/Services/AzureServiceBusIntegrationListenerServiceTests.cs @@ -0,0 +1,124 @@ +#nullable enable + +using Azure.Messaging.ServiceBus; +using Bit.Core.AdminConsole.Models.Data.Integrations; +using Bit.Core.Services; +using Bit.Test.Common.AutoFixture; +using Bit.Test.Common.AutoFixture.Attributes; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Xunit; + +namespace Bit.Core.Test.Services; + +[SutProviderCustomize] +public class AzureServiceBusIntegrationListenerServiceTests +{ + private const int _maxRetries = 3; + private const string _topicName = "test_topic"; + private const string _subscriptionName = "test_subscription"; + private readonly IIntegrationHandler _handler = Substitute.For(); + private readonly IAzureServiceBusService _serviceBusService = Substitute.For(); + private readonly ILogger _logger = + Substitute.For>(); + + private SutProvider GetSutProvider() + { + return new SutProvider() + .SetDependency(_handler) + .SetDependency(_serviceBusService) + .SetDependency(_topicName, "topicName") + .SetDependency(_subscriptionName, "subscriptionName") + .SetDependency(_maxRetries, "maxRetries") + .SetDependency(_logger) + .Create(); + } + + [Theory, BitAutoData] + public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args) + { + var sutProvider = GetSutProvider(); + await sutProvider.Sut.ProcessErrorAsync(args); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Theory, BitAutoData] + public async Task HandleMessageAsync_FailureNotRetryable_PublishesToDeadLetterQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = 0; + + var result = new IntegrationHandlerResult(false, message); + result.Retryable = false; + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = (IntegrationMessage)IntegrationMessage.FromJson(message.ToJson())!; + + Assert.False(await sutProvider.Sut.HandleMessageAsync(message.ToJson())); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default); + } + + [Theory, BitAutoData] + public async Task HandleMessageAsync_FailureRetryableButTooManyRetries_PublishesToDeadLetterQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = _maxRetries; + var result = new IntegrationHandlerResult(false, message); + result.Retryable = true; + + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = (IntegrationMessage)IntegrationMessage.FromJson(message.ToJson())!; + + Assert.False(await sutProvider.Sut.HandleMessageAsync(message.ToJson())); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default); + } + + [Theory, BitAutoData] + public async Task HandleMessageAsync_FailureRetryable_PublishesToRetryQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = 0; + + var result = new IntegrationHandlerResult(false, message); + result.Retryable = true; + result.DelayUntilDate = DateTime.UtcNow.AddMinutes(1); + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = (IntegrationMessage)IntegrationMessage.FromJson(message.ToJson())!; + + Assert.True(await sutProvider.Sut.HandleMessageAsync(message.ToJson())); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + await _serviceBusService.Received(1).PublishToRetryAsync(message); + } + + [Theory, BitAutoData] + public async Task HandleMessageAsync_SuccessfulResult_Succeeds(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + var result = new IntegrationHandlerResult(true, message); + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = (IntegrationMessage)IntegrationMessage.FromJson(message.ToJson())!; + + Assert.True(await sutProvider.Sut.HandleMessageAsync(message.ToJson())); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default); + } +} diff --git a/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs b/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs new file mode 100644 index 0000000000..9369690d86 --- /dev/null +++ b/test/Core.Test/AdminConsole/Services/EventIntegrationEventWriteServiceTests.cs @@ -0,0 +1,57 @@ +using System.Text.Json; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Bit.Test.Common.AutoFixture.Attributes; +using Bit.Test.Common.Helpers; +using NSubstitute; +using Xunit; + +namespace Bit.Core.Test.Services; + +[SutProviderCustomize] +public class EventIntegrationEventWriteServiceTests +{ + private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For(); + private readonly EventIntegrationEventWriteService Subject; + + public EventIntegrationEventWriteServiceTests() + { + Subject = new EventIntegrationEventWriteService(_eventIntegrationPublisher); + } + + [Theory, BitAutoData] + public async Task CreateAsync_EventPublishedToEventQueue(EventMessage eventMessage) + { + var expected = JsonSerializer.Serialize(eventMessage); + await Subject.CreateAsync(eventMessage); + await _eventIntegrationPublisher.Received(1).PublishEventAsync( + Arg.Is(body => AssertJsonStringsMatch(eventMessage, body))); + } + + [Theory, BitAutoData] + public async Task CreateManyAsync_EventsPublishedToEventQueue(IEnumerable eventMessages) + { + await Subject.CreateManyAsync(eventMessages); + await _eventIntegrationPublisher.Received(1).PublishEventAsync( + Arg.Is(body => AssertJsonStringsMatch(eventMessages, body))); + } + + private static bool AssertJsonStringsMatch(EventMessage expected, string body) + { + var actual = JsonSerializer.Deserialize(body); + AssertHelper.AssertPropertyEqual(expected, actual, new[] { "IdempotencyId" }); + return true; + } + + private static bool AssertJsonStringsMatch(IEnumerable expected, string body) + { + using var actual = JsonSerializer.Deserialize>(body).GetEnumerator(); + + foreach (var expectedMessage in expected) + { + actual.MoveNext(); + AssertHelper.AssertPropertyEqual(expectedMessage, actual.Current, new[] { "IdempotencyId" }); + } + return true; + } +} diff --git a/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs b/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs index f0a0d1d724..0962df52cd 100644 --- a/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/EventIntegrationHandlerTests.cs @@ -24,7 +24,7 @@ public class EventIntegrationHandlerTests private const string _templateWithActingUser = "#ActingUserName#, #ActingUserEmail#"; private const string _url = "https://localhost"; private const string _url2 = "https://example.com"; - private readonly IIntegrationPublisher _integrationPublisher = Substitute.For(); + private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For(); private SutProvider> GetSutProvider( List configurations) @@ -35,7 +35,7 @@ public class EventIntegrationHandlerTests return new SutProvider>() .SetDependency(configurationRepository) - .SetDependency(_integrationPublisher) + .SetDependency(_eventIntegrationPublisher) .SetDependency(IntegrationType.Webhook) .Create(); } @@ -45,6 +45,7 @@ public class EventIntegrationHandlerTests return new IntegrationMessage() { IntegrationType = IntegrationType.Webhook, + MessageId = "TestMessageId", Configuration = new WebhookIntegrationConfigurationDetails(_url), RenderedTemplate = template, RetryCount = 0, @@ -87,7 +88,7 @@ public class EventIntegrationHandlerTests var sutProvider = GetSutProvider(NoConfigurations()); await sutProvider.Sut.HandleEventAsync(eventMessage); - Assert.Empty(_integrationPublisher.ReceivedCalls()); + Assert.Empty(_eventIntegrationPublisher.ReceivedCalls()); } [Theory, BitAutoData] @@ -101,8 +102,9 @@ public class EventIntegrationHandlerTests $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" ); - Assert.Single(_integrationPublisher.ReceivedCalls()); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + Assert.Single(_eventIntegrationPublisher.ReceivedCalls()); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); await sutProvider.GetDependency().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any()); await sutProvider.GetDependency().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any()); } @@ -120,8 +122,9 @@ public class EventIntegrationHandlerTests var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}"); - Assert.Single(_integrationPublisher.ReceivedCalls()); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + Assert.Single(_eventIntegrationPublisher.ReceivedCalls()); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); await sutProvider.GetDependency().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any()); await sutProvider.GetDependency().Received(1).GetByIdAsync(eventMessage.ActingUserId ?? Guid.Empty); } @@ -136,12 +139,13 @@ public class EventIntegrationHandlerTests sutProvider.GetDependency().GetByIdAsync(Arg.Any()).Returns(organization); await sutProvider.Sut.HandleEventAsync(eventMessage); - Assert.Single(_integrationPublisher.ReceivedCalls()); + Assert.Single(_eventIntegrationPublisher.ReceivedCalls()); var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"Org: {organization.Name}"); - Assert.Single(_integrationPublisher.ReceivedCalls()); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + Assert.Single(_eventIntegrationPublisher.ReceivedCalls()); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); await sutProvider.GetDependency().Received(1).GetByIdAsync(eventMessage.OrganizationId ?? Guid.Empty); await sutProvider.GetDependency().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any()); } @@ -159,8 +163,9 @@ public class EventIntegrationHandlerTests var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}"); - Assert.Single(_integrationPublisher.ReceivedCalls()); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + Assert.Single(_eventIntegrationPublisher.ReceivedCalls()); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); await sutProvider.GetDependency().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any()); await sutProvider.GetDependency().Received(1).GetByIdAsync(eventMessage.UserId ?? Guid.Empty); } @@ -171,7 +176,7 @@ public class EventIntegrationHandlerTests var sutProvider = GetSutProvider(NoConfigurations()); await sutProvider.Sut.HandleManyEventsAsync(eventMessages); - Assert.Empty(_integrationPublisher.ReceivedCalls()); + Assert.Empty(_eventIntegrationPublisher.ReceivedCalls()); } [Theory, BitAutoData] @@ -186,7 +191,8 @@ public class EventIntegrationHandlerTests var expectedMessage = EventIntegrationHandlerTests.expectedMessage( $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" ); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); } } @@ -203,10 +209,12 @@ public class EventIntegrationHandlerTests var expectedMessage = EventIntegrationHandlerTests.expectedMessage( $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" ); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); expectedMessage.Configuration = new WebhookIntegrationConfigurationDetails(_url2); - await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); + await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is( + AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" }))); } } } diff --git a/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs b/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs index 10f39665d5..10e42c92cc 100644 --- a/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/IntegrationHandlerTests.cs @@ -15,6 +15,7 @@ public class IntegrationHandlerTests var expected = new IntegrationMessage() { Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"), + MessageId = "TestMessageId", IntegrationType = IntegrationType.Webhook, RenderedTemplate = "Template", DelayUntilDate = null, diff --git a/test/Core.Test/AdminConsole/Services/RabbitMqEventListenerServiceTests.cs b/test/Core.Test/AdminConsole/Services/RabbitMqEventListenerServiceTests.cs new file mode 100644 index 0000000000..8fd7e460be --- /dev/null +++ b/test/Core.Test/AdminConsole/Services/RabbitMqEventListenerServiceTests.cs @@ -0,0 +1,173 @@ +using System.Text.Json; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Bit.Test.Common.AutoFixture; +using Bit.Test.Common.AutoFixture.Attributes; +using Bit.Test.Common.Helpers; +using Microsoft.Extensions.Logging; +using NSubstitute; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; + +namespace Bit.Core.Test.Services; + +[SutProviderCustomize] +public class RabbitMqEventListenerServiceTests +{ + private const string _queueName = "test_queue"; + private readonly IRabbitMqService _rabbitMqService = Substitute.For(); + private readonly ILogger _logger = Substitute.For>(); + + private SutProvider GetSutProvider() + { + return new SutProvider() + .SetDependency(_rabbitMqService) + .SetDependency(_logger) + .SetDependency(_queueName, "queueName") + .Create(); + } + + [Fact] + public async Task StartAsync_CreatesQueue() + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + await _rabbitMqService.Received(1).CreateEventQueueAsync( + Arg.Is(_queueName), + Arg.Is(cancellationToken) + ); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_EmptyJson_LogsError() + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: new byte[0]); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJson_LogsError() + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: JsonSerializer.SerializeToUtf8Bytes("{ Inavlid JSON")); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Is(o => o.ToString().Contains("Invalid JSON")), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJsonArray_LogsError() + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: JsonSerializer.SerializeToUtf8Bytes(new[] { "not a valid", "list of event messages" })); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Fact] + public async Task ProcessReceivedMessageAsync_InvalidJsonObject_LogsError() + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: JsonSerializer.SerializeToUtf8Bytes(DateTime.UtcNow)); // wrong object - not EventMessage + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + _logger.Received(1).Log( + LogLevel.Error, + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any>()); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_SingleEvent_DelegatesToHandler(EventMessage message) + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: JsonSerializer.SerializeToUtf8Bytes(message)); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + await sutProvider.GetDependency().Received(1).HandleEventAsync( + Arg.Is(AssertHelper.AssertPropertyEqual(message, new[] { "IdempotencyId" }))); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_ManyEvents_DelegatesToHandler(IEnumerable messages) + { + var sutProvider = GetSutProvider(); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: JsonSerializer.SerializeToUtf8Bytes(messages)); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs); + + await sutProvider.GetDependency().Received(1).HandleManyEventsAsync( + Arg.Is(AssertHelper.AssertPropertyEqual(messages, new[] { "IdempotencyId" }))); + } +} diff --git a/test/Core.Test/AdminConsole/Services/RabbitMqIntegrationListenerServiceTests.cs b/test/Core.Test/AdminConsole/Services/RabbitMqIntegrationListenerServiceTests.cs new file mode 100644 index 0000000000..92a51e1831 --- /dev/null +++ b/test/Core.Test/AdminConsole/Services/RabbitMqIntegrationListenerServiceTests.cs @@ -0,0 +1,230 @@ +using System.Text; +using Bit.Core.AdminConsole.Models.Data.Integrations; +using Bit.Core.Services; +using Bit.Test.Common.AutoFixture; +using Bit.Test.Common.AutoFixture.Attributes; +using Bit.Test.Common.Helpers; +using NSubstitute; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; + +namespace Bit.Core.Test.Services; + +[SutProviderCustomize] +public class RabbitMqIntegrationListenerServiceTests +{ + private const int _maxRetries = 3; + private const string _queueName = "test_queue"; + private const string _retryQueueName = "test_queue_retry"; + private const string _routingKey = "test_routing_key"; + private readonly IIntegrationHandler _handler = Substitute.For(); + private readonly IRabbitMqService _rabbitMqService = Substitute.For(); + + private SutProvider GetSutProvider() + { + return new SutProvider() + .SetDependency(_handler) + .SetDependency(_rabbitMqService) + .SetDependency(_queueName, "queueName") + .SetDependency(_retryQueueName, "retryQueueName") + .SetDependency(_routingKey, "routingKey") + .SetDependency(_maxRetries, "maxRetries") + .Create(); + } + + [Fact] + public async Task StartAsync_CreatesQueues() + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + await _rabbitMqService.Received(1).CreateIntegrationQueuesAsync( + Arg.Is(_queueName), + Arg.Is(_retryQueueName), + Arg.Is(_routingKey), + Arg.Is(cancellationToken) + ); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_FailureNotRetryable_PublishesToDeadLetterQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = 0; + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: Encoding.UTF8.GetBytes(message.ToJson()) + ); + var result = new IntegrationHandlerResult(false, message); + result.Retryable = false; + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = IntegrationMessage.FromJson(message.ToJson()); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + + await _rabbitMqService.Received(1).PublishToDeadLetterAsync( + Arg.Any(), + Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })), + Arg.Any()); + + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .RepublishToRetryQueueAsync(default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToRetryAsync(default, default, default); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_FailureRetryableButTooManyRetries_PublishesToDeadLetterQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = _maxRetries; + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: Encoding.UTF8.GetBytes(message.ToJson()) + ); + var result = new IntegrationHandlerResult(false, message); + result.Retryable = true; + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = IntegrationMessage.FromJson(message.ToJson()); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken); + + expected.ApplyRetry(result.DelayUntilDate); + await _rabbitMqService.Received(1).PublishToDeadLetterAsync( + Arg.Any(), + Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })), + Arg.Any()); + + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .RepublishToRetryQueueAsync(default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToRetryAsync(default, default, default); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_FailureRetryable_PublishesToRetryQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + message.RetryCount = 0; + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: Encoding.UTF8.GetBytes(message.ToJson()) + ); + var result = new IntegrationHandlerResult(false, message); + result.Retryable = true; + result.DelayUntilDate = DateTime.UtcNow.AddMinutes(1); + _handler.HandleAsync(Arg.Any()).Returns(result); + + var expected = IntegrationMessage.FromJson(message.ToJson()); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken); + + await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson())); + + expected.ApplyRetry(result.DelayUntilDate); + await _rabbitMqService.Received(1).PublishToRetryAsync( + Arg.Any(), + Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })), + Arg.Any()); + + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .RepublishToRetryQueueAsync(default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToDeadLetterAsync(default, default, default); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_SuccessfulResult_Succeeds(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: Encoding.UTF8.GetBytes(message.ToJson()) + ); + var result = new IntegrationHandlerResult(true, message); + _handler.HandleAsync(Arg.Any()).Returns(result); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken); + + await _handler.Received(1).HandleAsync(Arg.Is(message.ToJson())); + + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .RepublishToRetryQueueAsync(default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToRetryAsync(default, default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToDeadLetterAsync(default, default, default); + } + + [Theory, BitAutoData] + public async Task ProcessReceivedMessageAsync_TooEarlyRetry_RepublishesToRetryQueue(IntegrationMessage message) + { + var sutProvider = GetSutProvider(); + var cancellationToken = CancellationToken.None; + await sutProvider.Sut.StartAsync(cancellationToken); + + message.DelayUntilDate = DateTime.UtcNow.AddMinutes(1); + var eventArgs = new BasicDeliverEventArgs( + consumerTag: string.Empty, + deliveryTag: 0, + redelivered: true, + exchange: string.Empty, + routingKey: string.Empty, + new BasicProperties(), + body: Encoding.UTF8.GetBytes(message.ToJson()) + ); + + await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken); + + await _rabbitMqService.Received(1) + .RepublishToRetryQueueAsync(Arg.Any(), Arg.Any()); + + await _handler.DidNotReceiveWithAnyArgs().HandleAsync(default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToRetryAsync(default, default, default); + await _rabbitMqService.DidNotReceiveWithAnyArgs() + .PublishToDeadLetterAsync(default, default, default); + } +} diff --git a/test/Core.Test/AdminConsole/Services/SlackServiceTests.cs b/test/Core.Test/AdminConsole/Services/SlackServiceTests.cs index 93c0aa8dd4..92544551e0 100644 --- a/test/Core.Test/AdminConsole/Services/SlackServiceTests.cs +++ b/test/Core.Test/AdminConsole/Services/SlackServiceTests.cs @@ -1,4 +1,6 @@ -using System.Net; +#nullable enable + +using System.Net; using System.Text.Json; using Bit.Core.Services; using Bit.Test.Common.AutoFixture; @@ -257,10 +259,10 @@ public class SlackServiceTests public void GetRedirectUrl_ReturnsCorrectUrl() { var sutProvider = GetSutProvider(); - var ClientId = sutProvider.GetDependency().Slack.ClientId; - var Scopes = sutProvider.GetDependency().Slack.Scopes; + var clientId = sutProvider.GetDependency().Slack.ClientId; + var scopes = sutProvider.GetDependency().Slack.Scopes; var redirectUrl = "https://example.com/callback"; - var expectedUrl = $"https://slack.com/oauth/v2/authorize?client_id={ClientId}&scope={Scopes}&redirect_uri={redirectUrl}"; + var expectedUrl = $"https://slack.com/oauth/v2/authorize?client_id={clientId}&scope={scopes}&redirect_uri={redirectUrl}"; var result = sutProvider.Sut.GetRedirectUrl(redirectUrl); Assert.Equal(expectedUrl, result); } diff --git a/test/Core.Test/AdminConsole/Services/WebhookIntegrationHandlerTests.cs b/test/Core.Test/AdminConsole/Services/WebhookIntegrationHandlerTests.cs index 79c7569ea3..7870f543d1 100644 --- a/test/Core.Test/AdminConsole/Services/WebhookIntegrationHandlerTests.cs +++ b/test/Core.Test/AdminConsole/Services/WebhookIntegrationHandlerTests.cs @@ -79,6 +79,7 @@ public class WebhookIntegrationHandlerTests Assert.Equal(result.Message, message); Assert.True(result.DelayUntilDate.HasValue); Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61)); + Assert.Equal("Too Many Requests", result.FailureReason); } [Theory, BitAutoData] @@ -99,6 +100,7 @@ public class WebhookIntegrationHandlerTests Assert.Equal(result.Message, message); Assert.True(result.DelayUntilDate.HasValue); Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61)); + Assert.Equal("Too Many Requests", result.FailureReason); } [Theory, BitAutoData] @@ -117,6 +119,7 @@ public class WebhookIntegrationHandlerTests Assert.True(result.Retryable); Assert.Equal(result.Message, message); Assert.False(result.DelayUntilDate.HasValue); + Assert.Equal("Internal Server Error", result.FailureReason); } [Theory, BitAutoData] @@ -135,5 +138,6 @@ public class WebhookIntegrationHandlerTests Assert.False(result.Retryable); Assert.Equal(result.Message, message); Assert.Null(result.DelayUntilDate); + Assert.Equal("Temporary Redirect", result.FailureReason); } }