1
0
mirror of https://github.com/bitwarden/server.git synced 2025-06-17 00:03:17 -05:00

[PM-17562] Add strict delay support for RabbitMQ; Refactor implementation (#5899)

* [PM-17562] Add strict delay support for RabbitMQ

* fix lint error

* Added more robust FailureReason handling and some additional tests

* Fix two issues noted by SonarQube

* Fix typo; Add alternate handling if MessageId is null or empty

* Set MessageId on all message publishers
This commit is contained in:
Brant DeBow 2025-06-03 10:48:24 -04:00 committed by GitHub
parent 8165651285
commit 59f5fafb87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1554 additions and 573 deletions

View File

@ -99,7 +99,7 @@ services:
- idp - idp
rabbitmq: rabbitmq:
image: rabbitmq:management image: rabbitmq:4.1.0-management
container_name: rabbitmq container_name: rabbitmq
ports: ports:
- "5672:5672" - "5672:5672"
@ -108,7 +108,7 @@ services:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER} RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS} RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
volumes: volumes:
- rabbitmq_data:/var/lib/rabbitmq_data - rabbitmq_data:/var/lib/rabbitmq
profiles: profiles:
- rabbitmq - rabbitmq

View File

@ -1,12 +1,15 @@
using Bit.Core.Enums; #nullable enable
using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.Integrations; namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public interface IIntegrationMessage public interface IIntegrationMessage
{ {
IntegrationType IntegrationType { get; } IntegrationType IntegrationType { get; }
int RetryCount { get; set; } string MessageId { get; set; }
DateTime? DelayUntilDate { get; set; } int RetryCount { get; }
DateTime? DelayUntilDate { get; }
void ApplyRetry(DateTime? handlerDelayUntilDate); void ApplyRetry(DateTime? handlerDelayUntilDate);
string ToJson(); string ToJson();
} }

View File

@ -1,4 +1,6 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public class IntegrationHandlerResult public class IntegrationHandlerResult
{ {

View File

@ -1,13 +1,15 @@
using System.Text.Json; #nullable enable
using System.Text.Json;
using Bit.Core.Enums; using Bit.Core.Enums;
namespace Bit.Core.AdminConsole.Models.Data.Integrations; namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public class IntegrationMessage<T> : IIntegrationMessage public class IntegrationMessage : IIntegrationMessage
{ {
public IntegrationType IntegrationType { get; set; } public IntegrationType IntegrationType { get; set; }
public T Configuration { get; set; } public required string MessageId { get; set; }
public string RenderedTemplate { get; set; } public required string RenderedTemplate { get; set; }
public int RetryCount { get; set; } = 0; public int RetryCount { get; set; } = 0;
public DateTime? DelayUntilDate { get; set; } public DateTime? DelayUntilDate { get; set; }
@ -22,12 +24,22 @@ public class IntegrationMessage<T> : IIntegrationMessage
DelayUntilDate = baseTime.AddSeconds(backoffSeconds + jitterSeconds); DelayUntilDate = baseTime.AddSeconds(backoffSeconds + jitterSeconds);
} }
public string ToJson() public virtual string ToJson()
{
return JsonSerializer.Serialize(this);
}
}
public class IntegrationMessage<T> : IntegrationMessage
{
public required T Configuration { get; set; }
public override string ToJson()
{ {
return JsonSerializer.Serialize(this); return JsonSerializer.Serialize(this);
} }
public static IntegrationMessage<T> FromJson(string json) public static IntegrationMessage<T>? FromJson(string json)
{ {
return JsonSerializer.Deserialize<IntegrationMessage<T>>(json); return JsonSerializer.Deserialize<IntegrationMessage<T>>(json);
} }

View File

@ -1,3 +1,5 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegration(string token); public record SlackIntegration(string token);

View File

@ -1,3 +1,5 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegrationConfiguration(string channelId); public record SlackIntegrationConfiguration(string channelId);

View File

@ -1,3 +1,5 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record SlackIntegrationConfigurationDetails(string channelId, string token); public record SlackIntegrationConfigurationDetails(string channelId, string token);

View File

@ -1,3 +1,5 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record WebhookIntegrationConfiguration(string url); public record WebhookIntegrationConfiguration(string url);

View File

@ -1,3 +1,5 @@
namespace Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
public record WebhookIntegrationConfigurationDetails(string url); public record WebhookIntegrationConfigurationDetails(string url);

View File

@ -1,4 +1,5 @@
 #nullable enable
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
namespace Bit.Core.Models.Slack; namespace Bit.Core.Models.Slack;

View File

@ -1,13 +1,87 @@
using Microsoft.Extensions.Hosting; #nullable enable
using System.Text.Json;
using Bit.Core.Models.Data;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Bit.Core.Services; namespace Bit.Core.Services;
public abstract class EventLoggingListenerService : BackgroundService public abstract class EventLoggingListenerService : BackgroundService
{ {
protected readonly IEventMessageHandler _handler; protected readonly IEventMessageHandler _handler;
protected ILogger<EventLoggingListenerService> _logger;
protected EventLoggingListenerService(IEventMessageHandler handler) protected EventLoggingListenerService(IEventMessageHandler handler, ILogger<EventLoggingListenerService> logger)
{ {
_handler = handler ?? throw new ArgumentNullException(nameof(handler)); _handler = handler;
_logger = logger;
}
internal async Task ProcessReceivedMessageAsync(string body, string? messageId)
{
try
{
using var jsonDocument = JsonDocument.Parse(body);
var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array)
{
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
await _handler.HandleManyEventsAsync(eventMessages);
}
else if (root.ValueKind == JsonValueKind.Object)
{
var eventMessage = root.Deserialize<EventMessage>();
await _handler.HandleEventAsync(eventMessage);
}
else
{
if (!string.IsNullOrEmpty(messageId))
{
_logger.LogError("An error occurred while processing message: {MessageId} - Invalid JSON", messageId);
}
else
{
_logger.LogError("An Invalid JSON error occurred while processing a message with an empty message id");
}
}
}
catch (JsonException exception)
{
if (!string.IsNullOrEmpty(messageId))
{
_logger.LogError(
exception,
"An error occurred while processing message: {MessageId} - Invalid JSON",
messageId
);
}
else
{
_logger.LogError(
exception,
"An Invalid JSON error occurred while processing a message with an empty message id"
);
}
}
catch (Exception exception)
{
if (!string.IsNullOrEmpty(messageId))
{
_logger.LogError(
exception,
"An error occurred while processing message: {MessageId}",
messageId
);
}
else
{
_logger.LogError(
exception,
"An error occurred while processing a message with an empty message id"
);
}
}
} }
} }

View File

@ -0,0 +1,10 @@
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.Integrations;
namespace Bit.Core.Services;
public interface IAzureServiceBusService : IEventIntegrationPublisher, IAsyncDisposable
{
ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options);
Task PublishToRetryAsync(IIntegrationMessage message);
}

View File

@ -2,7 +2,8 @@
namespace Bit.Core.Services; namespace Bit.Core.Services;
public interface IIntegrationPublisher public interface IEventIntegrationPublisher : IAsyncDisposable
{ {
Task PublishAsync(IIntegrationMessage message); Task PublishAsync(IIntegrationMessage message);
Task PublishEventAsync(string body);
} }

View File

@ -0,0 +1,19 @@
using Bit.Core.AdminConsole.Models.Data.Integrations;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public interface IRabbitMqService : IEventIntegrationPublisher
{
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default);
Task CreateIntegrationQueuesAsync(
string queueName,
string retryQueueName,
string routingKey,
CancellationToken cancellationToken = default);
Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken);
Task PublishToDeadLetterAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken);
Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs);
}

View File

@ -1,7 +1,7 @@
using System.Text; #nullable enable
using System.Text.Json;
using System.Text;
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus;
using Bit.Core.Models.Data;
using Bit.Core.Settings; using Bit.Core.Settings;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -9,67 +9,47 @@ namespace Bit.Core.Services;
public class AzureServiceBusEventListenerService : EventLoggingListenerService public class AzureServiceBusEventListenerService : EventLoggingListenerService
{ {
private readonly ILogger<AzureServiceBusEventListenerService> _logger;
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _processor; private readonly ServiceBusProcessor _processor;
public AzureServiceBusEventListenerService( public AzureServiceBusEventListenerService(
IEventMessageHandler handler, IEventMessageHandler handler,
ILogger<AzureServiceBusEventListenerService> logger, IAzureServiceBusService serviceBusService,
string subscriptionName,
GlobalSettings globalSettings, GlobalSettings globalSettings,
string subscriptionName) : base(handler) ILogger<AzureServiceBusEventListenerService> logger) : base(handler, logger)
{ {
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); _processor = serviceBusService.CreateProcessor(
_processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.EventTopicName, subscriptionName, new ServiceBusProcessorOptions()); globalSettings.EventLogging.AzureServiceBus.EventTopicName,
subscriptionName,
new ServiceBusProcessorOptions());
_logger = logger; _logger = logger;
} }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
_processor.ProcessMessageAsync += async args => _processor.ProcessMessageAsync += ProcessReceivedMessageAsync;
{ _processor.ProcessErrorAsync += ProcessErrorAsync;
try
{
using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(args.Message.Body));
var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array)
{
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
await _handler.HandleManyEventsAsync(eventMessages);
}
else if (root.ValueKind == JsonValueKind.Object)
{
var eventMessage = root.Deserialize<EventMessage>();
await _handler.HandleEventAsync(eventMessage);
}
await args.CompleteMessageAsync(args.Message);
}
catch (Exception exception)
{
_logger.LogError(
exception,
"An error occured while processing message: {MessageId}",
args.Message.MessageId
);
}
};
_processor.ProcessErrorAsync += args =>
{
_logger.LogError(
args.Exception,
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
args.EntityPath,
args.ErrorSource
);
return Task.CompletedTask;
};
await _processor.StartProcessingAsync(cancellationToken); await _processor.StartProcessingAsync(cancellationToken);
} }
internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(
args.Exception,
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
args.EntityPath,
args.ErrorSource
);
return Task.CompletedTask;
}
private async Task ProcessReceivedMessageAsync(ProcessMessageEventArgs args)
{
await ProcessReceivedMessageAsync(Encoding.UTF8.GetString(args.Message.Body), args.Message.MessageId);
await args.CompleteMessageAsync(args.Message);
}
public override async Task StopAsync(CancellationToken cancellationToken) public override async Task StopAsync(CancellationToken cancellationToken)
{ {
await _processor.StopProcessingAsync(cancellationToken); await _processor.StopProcessingAsync(cancellationToken);
@ -79,7 +59,6 @@ public class AzureServiceBusEventListenerService : EventLoggingListenerService
public override void Dispose() public override void Dispose()
{ {
_processor.DisposeAsync().GetAwaiter().GetResult(); _processor.DisposeAsync().GetAwaiter().GetResult();
_client.DisposeAsync().GetAwaiter().GetResult();
base.Dispose(); base.Dispose();
} }
} }

View File

@ -1,45 +0,0 @@
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Core.Settings;
namespace Bit.Core.AdminConsole.Services.Implementations;
public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public AzureServiceBusEventWriteService(GlobalSettings globalSettings)
{
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
}
public async Task CreateAsync(IEvent e)
{
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e))
{
ContentType = "application/json"
};
await _sender.SendMessageAsync(message);
}
public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(events))
{
ContentType = "application/json"
};
await _sender.SendMessageAsync(message);
}
public async ValueTask DisposeAsync()
{
await _sender.DisposeAsync();
await _client.DisposeAsync();
}
}

View File

@ -1,7 +1,6 @@
#nullable enable #nullable enable
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -10,39 +9,30 @@ namespace Bit.Core.Services;
public class AzureServiceBusIntegrationListenerService : BackgroundService public class AzureServiceBusIntegrationListenerService : BackgroundService
{ {
private readonly int _maxRetries; private readonly int _maxRetries;
private readonly string _subscriptionName; private readonly IAzureServiceBusService _serviceBusService;
private readonly string _topicName;
private readonly IIntegrationHandler _handler; private readonly IIntegrationHandler _handler;
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _processor; private readonly ServiceBusProcessor _processor;
private readonly ServiceBusSender _sender;
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger; private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger;
public AzureServiceBusIntegrationListenerService( public AzureServiceBusIntegrationListenerService(IIntegrationHandler handler,
IIntegrationHandler handler, string topicName,
string subscriptionName, string subscriptionName,
GlobalSettings globalSettings, int maxRetries,
IAzureServiceBusService serviceBusService,
ILogger<AzureServiceBusIntegrationListenerService> logger) ILogger<AzureServiceBusIntegrationListenerService> logger)
{ {
_handler = handler; _handler = handler;
_logger = logger; _logger = logger;
_maxRetries = globalSettings.EventLogging.AzureServiceBus.MaxRetries; _maxRetries = maxRetries;
_topicName = globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName; _serviceBusService = serviceBusService;
_subscriptionName = subscriptionName;
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString); _processor = _serviceBusService.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
_processor = _client.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions());
_sender = _client.CreateSender(_topicName);
} }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
_processor.ProcessMessageAsync += HandleMessageAsync; _processor.ProcessMessageAsync += HandleMessageAsync;
_processor.ProcessErrorAsync += args => _processor.ProcessErrorAsync += ProcessErrorAsync;
{
_logger.LogError(args.Exception, "Azure Service Bus error");
return Task.CompletedTask;
};
await _processor.StartProcessingAsync(cancellationToken); await _processor.StartProcessingAsync(cancellationToken);
} }
@ -51,51 +41,67 @@ public class AzureServiceBusIntegrationListenerService : BackgroundService
{ {
await _processor.StopProcessingAsync(cancellationToken); await _processor.StopProcessingAsync(cancellationToken);
await _processor.DisposeAsync(); await _processor.DisposeAsync();
await _sender.DisposeAsync();
await _client.DisposeAsync();
await base.StopAsync(cancellationToken); await base.StopAsync(cancellationToken);
} }
private async Task HandleMessageAsync(ProcessMessageEventArgs args) internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
{ {
var json = args.Message.Body.ToString(); _logger.LogError(
args.Exception,
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
args.EntityPath,
args.ErrorSource
);
return Task.CompletedTask;
}
internal async Task<bool> HandleMessageAsync(string body)
{
try try
{ {
var result = await _handler.HandleAsync(json); var result = await _handler.HandleAsync(body);
var message = result.Message; var message = result.Message;
if (result.Success) if (result.Success)
{ {
await args.CompleteMessageAsync(args.Message); // Successful integration. Return true to indicate the message has been handled
return; return true;
} }
message.ApplyRetry(result.DelayUntilDate); message.ApplyRetry(result.DelayUntilDate);
if (result.Retryable && message.RetryCount < _maxRetries) if (result.Retryable && message.RetryCount < _maxRetries)
{ {
var scheduledTime = (DateTime)message.DelayUntilDate!; // Publish message to the retry queue. It will be re-published for retry after a delay
var retryMsg = new ServiceBusMessage(message.ToJson()) // Return true to indicate the message has been handled
{ await _serviceBusService.PublishToRetryAsync(message);
Subject = args.Message.Subject, return true;
ScheduledEnqueueTime = scheduledTime
};
await _sender.SendMessageAsync(retryMsg);
} }
else else
{ {
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable"); // Non-recoverable failure or exceeded the max number of retries
return; // Return false to indicate this message should be dead-lettered
return false;
} }
await args.CompleteMessageAsync(args.Message);
} }
catch (Exception ex) catch (Exception ex)
{ {
// Unknown exception - log error, return true so the message will be acknowledged and not resent
_logger.LogError(ex, "Unhandled error processing ASB message"); _logger.LogError(ex, "Unhandled error processing ASB message");
return true;
}
}
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var json = args.Message.Body.ToString();
if (await HandleMessageAsync(json))
{
await args.CompleteMessageAsync(args.Message); await args.CompleteMessageAsync(args.Message);
} }
else
{
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable");
}
} }
} }

View File

@ -1,36 +0,0 @@
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Settings;
namespace Bit.Core.Services;
public class AzureServiceBusIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public AzureServiceBusIntegrationPublisher(GlobalSettings globalSettings)
{
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
}
public async Task PublishAsync(IIntegrationMessage message)
{
var json = message.ToJson();
var serviceBusMessage = new ServiceBusMessage(json)
{
Subject = message.IntegrationType.ToRoutingKey(),
};
await _sender.SendMessageAsync(serviceBusMessage);
}
public async ValueTask DisposeAsync()
{
await _sender.DisposeAsync();
await _client.DisposeAsync();
}
}

View File

@ -0,0 +1,70 @@
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Settings;
namespace Bit.Core.Services;
public class AzureServiceBusService : IAzureServiceBusService
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _eventSender;
private readonly ServiceBusSender _integrationSender;
public AzureServiceBusService(GlobalSettings globalSettings)
{
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
_eventSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
_integrationSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
}
public ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options)
{
return _client.CreateProcessor(topicName, subscriptionName, options);
}
public async Task PublishAsync(IIntegrationMessage message)
{
var json = message.ToJson();
var serviceBusMessage = new ServiceBusMessage(json)
{
Subject = message.IntegrationType.ToRoutingKey(),
MessageId = message.MessageId
};
await _integrationSender.SendMessageAsync(serviceBusMessage);
}
public async Task PublishToRetryAsync(IIntegrationMessage message)
{
var json = message.ToJson();
var serviceBusMessage = new ServiceBusMessage(json)
{
Subject = message.IntegrationType.ToRoutingKey(),
ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow,
MessageId = message.MessageId
};
await _integrationSender.SendMessageAsync(serviceBusMessage);
}
public async Task PublishEventAsync(string body)
{
var message = new ServiceBusMessage(body)
{
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString()
};
await _eventSender.SendMessageAsync(message);
}
public async ValueTask DisposeAsync()
{
await _eventSender.DisposeAsync();
await _integrationSender.DisposeAsync();
await _client.DisposeAsync();
}
}

View File

@ -1,4 +1,6 @@
using Bit.Core.Models.Data; #nullable enable
using Bit.Core.Models.Data;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace Bit.Core.Services; namespace Bit.Core.Services;

View File

@ -0,0 +1,32 @@
#nullable enable
using System.Text.Json;
using Bit.Core.Models.Data;
namespace Bit.Core.Services;
public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDisposable
{
private readonly IEventIntegrationPublisher _eventIntegrationPublisher;
public EventIntegrationEventWriteService(IEventIntegrationPublisher eventIntegrationPublisher)
{
_eventIntegrationPublisher = eventIntegrationPublisher;
}
public async Task CreateAsync(IEvent e)
{
var body = JsonSerializer.Serialize(e);
await _eventIntegrationPublisher.PublishEventAsync(body: body);
}
public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var body = JsonSerializer.Serialize(events);
await _eventIntegrationPublisher.PublishEventAsync(body: body);
}
public async ValueTask DisposeAsync()
{
await _eventIntegrationPublisher.DisposeAsync();
}
}

View File

@ -1,4 +1,6 @@
using System.Text.Json; #nullable enable
using System.Text.Json;
using Bit.Core.AdminConsole.Models.Data.Integrations; using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.AdminConsole.Utilities; using Bit.Core.AdminConsole.Utilities;
using Bit.Core.Enums; using Bit.Core.Enums;
@ -7,11 +9,9 @@ using Bit.Core.Repositories;
namespace Bit.Core.Services; namespace Bit.Core.Services;
#nullable enable
public class EventIntegrationHandler<T>( public class EventIntegrationHandler<T>(
IntegrationType integrationType, IntegrationType integrationType,
IIntegrationPublisher integrationPublisher, IEventIntegrationPublisher eventIntegrationPublisher,
IOrganizationIntegrationConfigurationRepository configurationRepository, IOrganizationIntegrationConfigurationRepository configurationRepository,
IUserRepository userRepository, IUserRepository userRepository,
IOrganizationRepository organizationRepository) IOrganizationRepository organizationRepository)
@ -34,6 +34,7 @@ public class EventIntegrationHandler<T>(
var template = configuration.Template ?? string.Empty; var template = configuration.Template ?? string.Empty;
var context = await BuildContextAsync(eventMessage, template); var context = await BuildContextAsync(eventMessage, template);
var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context); var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context);
var messageId = eventMessage.IdempotencyId ?? Guid.NewGuid();
var config = configuration.MergedConfiguration.Deserialize<T>() var config = configuration.MergedConfiguration.Deserialize<T>()
?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}"); ?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}");
@ -41,13 +42,14 @@ public class EventIntegrationHandler<T>(
var message = new IntegrationMessage<T> var message = new IntegrationMessage<T>
{ {
IntegrationType = integrationType, IntegrationType = integrationType,
MessageId = messageId.ToString(),
Configuration = config, Configuration = config,
RenderedTemplate = renderedTemplate, RenderedTemplate = renderedTemplate,
RetryCount = 0, RetryCount = 0,
DelayUntilDate = null DelayUntilDate = null
}; };
await integrationPublisher.PublishAsync(message); await eventIntegrationPublisher.PublishAsync(message);
} }
} }

View File

@ -1,4 +1,6 @@
using Bit.Core.Models.Data; #nullable enable
using Bit.Core.Models.Data;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace Bit.Core.Services; namespace Bit.Core.Services;

View File

@ -1,4 +1,6 @@
using Bit.Core.Models.Data; #nullable enable
using Bit.Core.Models.Data;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace Bit.Core.Services; namespace Bit.Core.Services;

View File

@ -1,7 +1,6 @@
using System.Text; #nullable enable
using System.Text.Json;
using Bit.Core.Models.Data; using System.Text;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Events; using RabbitMQ.Client.Events;
@ -10,94 +9,60 @@ namespace Bit.Core.Services;
public class RabbitMqEventListenerService : EventLoggingListenerService public class RabbitMqEventListenerService : EventLoggingListenerService
{ {
private IChannel _channel; private readonly Lazy<Task<IChannel>> _lazyChannel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerService> _logger;
private readonly string _queueName; private readonly string _queueName;
private readonly IRabbitMqService _rabbitMqService;
public RabbitMqEventListenerService( public RabbitMqEventListenerService(
IEventMessageHandler handler, IEventMessageHandler handler,
ILogger<RabbitMqEventListenerService> logger, string queueName,
GlobalSettings globalSettings, IRabbitMqService rabbitMqService,
string queueName) : base(handler) ILogger<RabbitMqEventListenerService> logger) : base(handler, logger)
{ {
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
_logger = logger; _logger = logger;
_queueName = queueName; _queueName = queueName;
_rabbitMqService = rabbitMqService;
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
} }
public override async Task StartAsync(CancellationToken cancellationToken) public override async Task StartAsync(CancellationToken cancellationToken)
{ {
_connection = await _factory.CreateConnectionAsync(cancellationToken); await _rabbitMqService.CreateEventQueueAsync(_queueName, cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await _channel.ExchangeDeclareAsync(exchange: _exchangeName,
type: ExchangeType.Fanout,
durable: true,
cancellationToken: cancellationToken);
await _channel.QueueDeclareAsync(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _queueName,
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken); await base.StartAsync(cancellationToken);
} }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
var consumer = new AsyncEventingBasicConsumer(_channel); var channel = await _lazyChannel.Value;
consumer.ReceivedAsync += async (_, eventArgs) => var consumer = new AsyncEventingBasicConsumer(channel);
{ consumer.ReceivedAsync += async (_, eventArgs) => { await ProcessReceivedMessageAsync(eventArgs); };
try
{
using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(eventArgs.Body.Span));
var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array) await channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
{ }
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
await _handler.HandleManyEventsAsync(eventMessages);
}
else if (root.ValueKind == JsonValueKind.Object)
{
var eventMessage = root.Deserialize<EventMessage>();
await _handler.HandleEventAsync(eventMessage);
} internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs eventArgs)
} {
catch (Exception ex) await ProcessReceivedMessageAsync(
{ Encoding.UTF8.GetString(eventArgs.Body.Span),
_logger.LogError(ex, "An error occurred while processing the message"); eventArgs.BasicProperties.MessageId);
}
};
await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
} }
public override async Task StopAsync(CancellationToken cancellationToken) public override async Task StopAsync(CancellationToken cancellationToken)
{ {
await _channel.CloseAsync(cancellationToken); if (_lazyChannel.IsValueCreated)
await _connection.CloseAsync(cancellationToken); {
var channel = await _lazyChannel.Value;
await channel.CloseAsync(cancellationToken);
}
await base.StopAsync(cancellationToken); await base.StopAsync(cancellationToken);
} }
public override void Dispose() public override void Dispose()
{ {
_channel.Dispose(); if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
_connection.Dispose(); {
_lazyChannel.Value.Result.Dispose();
}
base.Dispose(); base.Dispose();
} }
} }

View File

@ -1,62 +0,0 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using RabbitMQ.Client;
namespace Bit.Core.Services;
public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;
public RabbitMqEventWriteService(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}
public async Task CreateAsync(IEvent e)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
var body = JsonSerializer.SerializeToUtf8Bytes(e);
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
public async Task CreateManyAsync(IEnumerable<IEvent> events)
{
var connection = await _lazyConnection.Value;
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
var body = JsonSerializer.SerializeToUtf8Bytes(events);
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
}
public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}
private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}
}

View File

@ -1,5 +1,8 @@
using System.Text; #nullable enable
using Bit.Core.Settings;
using System.Text;
using System.Text.Json;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using RabbitMQ.Client; using RabbitMQ.Client;
@ -9,183 +12,137 @@ namespace Bit.Core.Services;
public class RabbitMqIntegrationListenerService : BackgroundService public class RabbitMqIntegrationListenerService : BackgroundService
{ {
private const string _deadLetterRoutingKey = "dead-letter";
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _retryQueueName;
private readonly string _deadLetterQueueName;
private readonly string _routingKey;
private readonly string _retryRoutingKey;
private readonly int _maxRetries; private readonly int _maxRetries;
private readonly string _queueName;
private readonly string _routingKey;
private readonly string _retryQueueName;
private readonly IIntegrationHandler _handler; private readonly IIntegrationHandler _handler;
private readonly ConnectionFactory _factory; private readonly Lazy<Task<IChannel>> _lazyChannel;
private readonly IRabbitMqService _rabbitMqService;
private readonly ILogger<RabbitMqIntegrationListenerService> _logger; private readonly ILogger<RabbitMqIntegrationListenerService> _logger;
private readonly int _retryTiming;
public RabbitMqIntegrationListenerService(IIntegrationHandler handler, public RabbitMqIntegrationListenerService(IIntegrationHandler handler,
string routingKey, string routingKey,
string queueName, string queueName,
string retryQueueName, string retryQueueName,
string deadLetterQueueName, int maxRetries,
GlobalSettings globalSettings, IRabbitMqService rabbitMqService,
ILogger<RabbitMqIntegrationListenerService> logger) ILogger<RabbitMqIntegrationListenerService> logger)
{ {
_handler = handler; _handler = handler;
_routingKey = routingKey; _routingKey = routingKey;
_retryRoutingKey = $"{_routingKey}-retry";
_queueName = queueName;
_retryQueueName = retryQueueName; _retryQueueName = retryQueueName;
_deadLetterQueueName = deadLetterQueueName; _queueName = queueName;
_rabbitMqService = rabbitMqService;
_logger = logger; _logger = logger;
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName; _maxRetries = maxRetries;
_maxRetries = globalSettings.EventLogging.RabbitMq.MaxRetries; _lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
} }
public override async Task StartAsync(CancellationToken cancellationToken) public override async Task StartAsync(CancellationToken cancellationToken)
{ {
_connection = await _factory.CreateConnectionAsync(cancellationToken); await _rabbitMqService.CreateIntegrationQueuesAsync(
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); _queueName,
_retryQueueName,
await _channel.ExchangeDeclareAsync(exchange: _exchangeName, _routingKey,
type: ExchangeType.Direct, cancellationToken: cancellationToken);
durable: true,
cancellationToken: cancellationToken);
// Declare main queue
await _channel.QueueDeclareAsync(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _queueName,
exchange: _exchangeName,
routingKey: _routingKey,
cancellationToken: cancellationToken);
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
await _channel.QueueDeclareAsync(queue: _retryQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{ "x-dead-letter-exchange", _exchangeName },
{ "x-dead-letter-routing-key", _routingKey },
{ "x-message-ttl", _retryTiming }
},
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _retryQueueName,
exchange: _exchangeName,
routingKey: _retryRoutingKey,
cancellationToken: cancellationToken);
// Declare dead letter queue
await _channel.QueueDeclareAsync(queue: _deadLetterQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: _deadLetterQueueName,
exchange: _exchangeName,
routingKey: _deadLetterRoutingKey,
cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken); await base.StartAsync(cancellationToken);
} }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
var consumer = new AsyncEventingBasicConsumer(_channel); var channel = await _lazyChannel.Value;
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) => consumer.ReceivedAsync += async (_, ea) =>
{
await ProcessReceivedMessageAsync(ea, cancellationToken);
};
await channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
}
internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs ea, CancellationToken cancellationToken)
{
var channel = await _lazyChannel.Value;
try
{ {
var json = Encoding.UTF8.GetString(ea.Body.Span); var json = Encoding.UTF8.GetString(ea.Body.Span);
try // Determine if the message came off of the retry queue too soon
// If so, place it back on the retry queue
var integrationMessage = JsonSerializer.Deserialize<IntegrationMessage>(json);
if (integrationMessage is not null &&
integrationMessage.DelayUntilDate.HasValue &&
integrationMessage.DelayUntilDate.Value > DateTime.UtcNow)
{ {
var result = await _handler.HandleAsync(json); await _rabbitMqService.RepublishToRetryQueueAsync(channel, ea);
var message = result.Message; await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
return;
}
if (result.Success) var result = await _handler.HandleAsync(json);
var message = result.Message;
if (result.Success)
{
// Successful integration send. Acknowledge message delivery and return
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
return;
}
if (result.Retryable)
{
// Integration failed, but is retryable - apply delay and check max retries
message.ApplyRetry(result.DelayUntilDate);
if (message.RetryCount < _maxRetries)
{ {
// Successful integration send. Acknowledge message delivery and return // Publish message to the retry queue. It will be re-published for retry after a delay
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); await _rabbitMqService.PublishToRetryAsync(channel, message, cancellationToken);
return;
}
if (result.Retryable)
{
// Integration failed, but is retryable - apply delay and check max retries
message.ApplyRetry(result.DelayUntilDate);
if (message.RetryCount < _maxRetries)
{
// Publish message to the retry queue. It will be re-published for retry after a delay
await _channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: _retryRoutingKey,
body: Encoding.UTF8.GetBytes(message.ToJson()),
cancellationToken: cancellationToken);
}
else
{
// Exceeded the max number of retries; fail and send to dead letter queue
await PublishToDeadLetterAsync(message.ToJson());
_logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
}
} }
else else
{ {
// Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries // Exceeded the max number of retries; fail and send to dead letter queue
await PublishToDeadLetterAsync(message.ToJson()); await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
_logger.LogWarning("Non-retryable failure. Sent to DLQ."); _logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
} }
// Message has been sent to retry or dead letter queues.
// Acknowledge receipt so Rabbit knows it's been processed
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
} }
catch (Exception ex) else
{ {
// Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error // Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries
_logger.LogError(ex, "Unhandled error processing integration message."); await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken); _logger.LogWarning("Non-retryable failure. Sent to DLQ.");
} }
};
await _channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken); // Message has been sent to retry or dead letter queues.
} // Acknowledge receipt so Rabbit knows it's been processed
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
private async Task PublishToDeadLetterAsync(string json) }
{ catch (Exception ex)
await _channel.BasicPublishAsync( {
exchange: _exchangeName, // Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error
routingKey: _deadLetterRoutingKey, _logger.LogError(ex, "Unhandled error processing integration message.");
body: Encoding.UTF8.GetBytes(json)); await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
} }
public override async Task StopAsync(CancellationToken cancellationToken) public override async Task StopAsync(CancellationToken cancellationToken)
{ {
await _channel.CloseAsync(cancellationToken); if (_lazyChannel.IsValueCreated)
await _connection.CloseAsync(cancellationToken); {
var channel = await _lazyChannel.Value;
await channel.CloseAsync(cancellationToken);
}
await base.StopAsync(cancellationToken); await base.StopAsync(cancellationToken);
} }
public override void Dispose() public override void Dispose()
{ {
_channel.Dispose(); if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
_connection.Dispose(); {
_lazyChannel.Value.Result.Dispose();
}
base.Dispose(); base.Dispose();
} }
} }

View File

@ -1,54 +0,0 @@
using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Settings;
using RabbitMQ.Client;
namespace Bit.Core.Services;
public class RabbitMqIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
{
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _exchangeName;
public RabbitMqIntegrationPublisher(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}
public async Task PublishAsync(IIntegrationMessage message)
{
var routingKey = message.IntegrationType.ToRoutingKey();
var connection = await _lazyConnection.Value;
await using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Direct, durable: true);
var body = Encoding.UTF8.GetBytes(message.ToJson());
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: routingKey, body: body);
}
public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}
private async Task<IConnection> CreateConnectionAsync()
{
return await _factory.CreateConnectionAsync();
}
}

View File

@ -0,0 +1,244 @@
#nullable enable
using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Enums;
using Bit.Core.Settings;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Bit.Core.Services;
public class RabbitMqService : IRabbitMqService
{
private const string _deadLetterRoutingKey = "dead-letter";
private readonly ConnectionFactory _factory;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly string _deadLetterQueueName;
private readonly string _eventExchangeName;
private readonly string _integrationExchangeName;
private readonly int _retryTiming;
private readonly bool _useDelayPlugin;
public RabbitMqService(GlobalSettings globalSettings)
{
_factory = new ConnectionFactory
{
HostName = globalSettings.EventLogging.RabbitMq.HostName,
UserName = globalSettings.EventLogging.RabbitMq.Username,
Password = globalSettings.EventLogging.RabbitMq.Password
};
_deadLetterQueueName = globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName;
_eventExchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
_integrationExchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
_useDelayPlugin = globalSettings.EventLogging.RabbitMq.UseDelayPlugin;
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
}
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
var connection = await _lazyConnection.Value;
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
}
public async Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default)
{
using var channel = await CreateChannelAsync(cancellationToken);
await channel.QueueDeclareAsync(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(queue: queueName,
exchange: _eventExchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
}
public async Task CreateIntegrationQueuesAsync(
string queueName,
string retryQueueName,
string routingKey,
CancellationToken cancellationToken = default)
{
using var channel = await CreateChannelAsync(cancellationToken);
var retryRoutingKey = $"{routingKey}-retry";
// Declare main integration queue
await channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await channel.QueueBindAsync(
queue: queueName,
exchange: _integrationExchangeName,
routingKey: routingKey,
cancellationToken: cancellationToken);
if (!_useDelayPlugin)
{
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
// Only needed if NOT using delay plugin
await channel.QueueDeclareAsync(queue: retryQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object?>
{
{ "x-dead-letter-exchange", _integrationExchangeName },
{ "x-dead-letter-routing-key", routingKey },
{ "x-message-ttl", _retryTiming }
},
cancellationToken: cancellationToken);
await channel.QueueBindAsync(queue: retryQueueName,
exchange: _integrationExchangeName,
routingKey: retryRoutingKey,
cancellationToken: cancellationToken);
}
}
public async Task PublishAsync(IIntegrationMessage message)
{
var routingKey = message.IntegrationType.ToRoutingKey();
await using var channel = await CreateChannelAsync();
var body = Encoding.UTF8.GetBytes(message.ToJson());
var properties = new BasicProperties
{
MessageId = message.MessageId,
Persistent = true
};
await channel.BasicPublishAsync(
exchange: _integrationExchangeName,
mandatory: true,
basicProperties: properties,
routingKey: routingKey,
body: body);
}
public async Task PublishEventAsync(string body)
{
await using var channel = await CreateChannelAsync();
var properties = new BasicProperties
{
MessageId = Guid.NewGuid().ToString(),
Persistent = true
};
await channel.BasicPublishAsync(
exchange: _eventExchangeName,
mandatory: true,
basicProperties: properties,
routingKey: string.Empty,
body: Encoding.UTF8.GetBytes(body));
}
public async Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken)
{
var routingKey = message.IntegrationType.ToRoutingKey();
var retryRoutingKey = $"{routingKey}-retry";
var properties = new BasicProperties
{
Persistent = true,
MessageId = message.MessageId,
Headers = _useDelayPlugin && message.DelayUntilDate.HasValue ?
new Dictionary<string, object?>
{
["x-delay"] = Math.Max((int)(message.DelayUntilDate.Value - DateTime.UtcNow).TotalMilliseconds, 0)
} :
null
};
await channel.BasicPublishAsync(
exchange: _integrationExchangeName,
routingKey: _useDelayPlugin ? routingKey : retryRoutingKey,
mandatory: true,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(message.ToJson()),
cancellationToken: cancellationToken);
}
public async Task PublishToDeadLetterAsync(
IChannel channel,
IIntegrationMessage message,
CancellationToken cancellationToken)
{
var properties = new BasicProperties
{
MessageId = message.MessageId,
Persistent = true
};
await channel.BasicPublishAsync(
exchange: _integrationExchangeName,
mandatory: true,
basicProperties: properties,
routingKey: _deadLetterRoutingKey,
body: Encoding.UTF8.GetBytes(message.ToJson()),
cancellationToken: cancellationToken);
}
public async Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs)
{
await channel.BasicPublishAsync(
exchange: _integrationExchangeName,
routingKey: eventArgs.RoutingKey,
mandatory: true,
basicProperties: new BasicProperties(eventArgs.BasicProperties),
body: eventArgs.Body);
}
public async ValueTask DisposeAsync()
{
if (_lazyConnection.IsValueCreated)
{
var connection = await _lazyConnection.Value;
await connection.DisposeAsync();
}
}
private async Task<IConnection> CreateConnectionAsync()
{
var connection = await _factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
// Declare Exchanges
await channel.ExchangeDeclareAsync(exchange: _eventExchangeName, type: ExchangeType.Fanout, durable: true);
if (_useDelayPlugin)
{
await channel.ExchangeDeclareAsync(
exchange: _integrationExchangeName,
type: "x-delayed-message",
durable: true,
arguments: new Dictionary<string, object?>
{
{ "x-delayed-type", "direct" }
}
);
}
else
{
await channel.ExchangeDeclareAsync(exchange: _integrationExchangeName, type: ExchangeType.Direct, durable: true);
}
// Declare dead letter queue for Integration exchange
await channel.QueueDeclareAsync(queue: _deadLetterQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
await channel.QueueBindAsync(queue: _deadLetterQueueName,
exchange: _integrationExchangeName,
routingKey: _deadLetterRoutingKey);
return connection;
}
}

View File

@ -1,4 +1,6 @@
using Bit.Core.AdminConsole.Models.Data.Integrations; #nullable enable
using Bit.Core.AdminConsole.Models.Data.Integrations;
namespace Bit.Core.Services; namespace Bit.Core.Services;

View File

@ -1,4 +1,6 @@
using System.Net.Http.Headers; #nullable enable
using System.Net.Http.Headers;
using System.Net.Http.Json; using System.Net.Http.Json;
using System.Web; using System.Web;
using Bit.Core.Models.Slack; using Bit.Core.Models.Slack;
@ -22,7 +24,7 @@ public class SlackService(
public async Task<string> GetChannelIdAsync(string token, string channelName) public async Task<string> GetChannelIdAsync(string token, string channelName)
{ {
return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault(); return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault() ?? string.Empty;
} }
public async Task<List<string>> GetChannelIdsAsync(string token, List<string> channelNames) public async Task<List<string>> GetChannelIdsAsync(string token, List<string> channelNames)
@ -58,7 +60,7 @@ public class SlackService(
} }
else else
{ {
logger.LogError("Error getting Channel Ids: {Error}", result.Error); logger.LogError("Error getting Channel Ids: {Error}", result?.Error ?? "Unknown Error");
nextCursor = string.Empty; nextCursor = string.Empty;
} }
@ -89,7 +91,7 @@ public class SlackService(
new KeyValuePair<string, string>("redirect_uri", redirectUrl) new KeyValuePair<string, string>("redirect_uri", redirectUrl)
})); }));
SlackOAuthResponse result; SlackOAuthResponse? result;
try try
{ {
result = await tokenResponse.Content.ReadFromJsonAsync<SlackOAuthResponse>(); result = await tokenResponse.Content.ReadFromJsonAsync<SlackOAuthResponse>();
@ -99,7 +101,7 @@ public class SlackService(
result = null; result = null;
} }
if (result == null) if (result is null)
{ {
logger.LogError("Error obtaining token via OAuth: Unknown error"); logger.LogError("Error obtaining token via OAuth: Unknown error");
return string.Empty; return string.Empty;
@ -130,6 +132,11 @@ public class SlackService(
var response = await _httpClient.SendAsync(request); var response = await _httpClient.SendAsync(request);
var result = await response.Content.ReadFromJsonAsync<SlackUserResponse>(); var result = await response.Content.ReadFromJsonAsync<SlackUserResponse>();
if (result is null)
{
logger.LogError("Error retrieving Slack user ID: Unknown error");
return string.Empty;
}
if (!result.Ok) if (!result.Ok)
{ {
logger.LogError("Error retrieving Slack user ID: {Error}", result.Error); logger.LogError("Error retrieving Slack user ID: {Error}", result.Error);
@ -151,6 +158,11 @@ public class SlackService(
var response = await _httpClient.SendAsync(request); var response = await _httpClient.SendAsync(request);
var result = await response.Content.ReadFromJsonAsync<SlackDmResponse>(); var result = await response.Content.ReadFromJsonAsync<SlackDmResponse>();
if (result is null)
{
logger.LogError("Error opening DM channel: Unknown error");
return string.Empty;
}
if (!result.Ok) if (!result.Ok)
{ {
logger.LogError("Error opening DM channel: {Error}", result.Error); logger.LogError("Error opening DM channel: {Error}", result.Error);

View File

@ -1,4 +1,6 @@
using System.Globalization; #nullable enable
using System.Globalization;
using System.Net; using System.Net;
using System.Text; using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations; using Bit.Core.AdminConsole.Models.Data.Integrations;
@ -29,7 +31,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
case HttpStatusCode.ServiceUnavailable: case HttpStatusCode.ServiceUnavailable:
case HttpStatusCode.GatewayTimeout: case HttpStatusCode.GatewayTimeout:
result.Retryable = true; result.Retryable = true;
result.FailureReason = response.ReasonPhrase; result.FailureReason = response.ReasonPhrase ?? $"Failure with status code: {(int)response.StatusCode}";
if (response.Headers.TryGetValues("Retry-After", out var values)) if (response.Headers.TryGetValues("Retry-After", out var values))
{ {
@ -52,7 +54,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
break; break;
default: default:
result.Retryable = false; result.Retryable = false;
result.FailureReason = response.ReasonPhrase; result.FailureReason = response.ReasonPhrase ?? $"Failure with status code {(int)response.StatusCode}";
break; break;
} }

View File

@ -327,6 +327,7 @@ public class GlobalSettings : IGlobalSettings
public int MaxRetries { get; set; } = 3; public int MaxRetries { get; set; } = 3;
public int RetryTiming { get; set; } = 30000; // 30s public int RetryTiming { get; set; } = 30000; // 30s
public bool UseDelayPlugin { get; set; } = false;
public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue"; public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";
public virtual string IntegrationDeadLetterQueueName { get; set; } = "integration-dead-letter-queue"; public virtual string IntegrationDeadLetterQueueName { get; set; } = "integration-dead-letter-queue";
public virtual string SlackEventsQueueName { get; set; } = "events-slack-queue"; public virtual string SlackEventsQueueName { get; set; } = "events-slack-queue";

View File

@ -550,7 +550,8 @@ public static class ServiceCollectionExtensions
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) && if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName)) CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
{ {
services.AddKeyedSingleton<IEventWriteService, AzureServiceBusEventWriteService>("broadcast"); services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
} }
else else
{ {
@ -563,7 +564,8 @@ public static class ServiceCollectionExtensions
if (IsRabbitMqEnabled(globalSettings)) if (IsRabbitMqEnabled(globalSettings))
{ {
services.AddKeyedSingleton<IEventWriteService, RabbitMqEventWriteService>("broadcast"); services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
} }
else else
{ {
@ -585,13 +587,15 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>(); services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
services.AddSingleton<AzureTableStorageEventHandler>(); services.AddSingleton<AzureTableStorageEventHandler>();
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent"); services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService( new AzureServiceBusEventListenerService(
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(), handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(), serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName,
globalSettings: globalSettings, globalSettings: globalSettings,
subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName)); logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
)
);
return services; return services;
} }
@ -607,12 +611,10 @@ public static class ServiceCollectionExtensions
{ {
var routingKey = integrationType.ToRoutingKey(); var routingKey = integrationType.ToRoutingKey();
services.AddSingleton<IIntegrationPublisher, AzureServiceBusIntegrationPublisher>();
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) => services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
new EventIntegrationHandler<TConfig>( new EventIntegrationHandler<TConfig>(
integrationType, integrationType,
provider.GetRequiredService<IIntegrationPublisher>(), provider.GetRequiredService<IEventIntegrationPublisher>(),
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(), provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
provider.GetRequiredService<IUserRepository>(), provider.GetRequiredService<IUserRepository>(),
provider.GetRequiredService<IOrganizationRepository>())); provider.GetRequiredService<IOrganizationRepository>()));
@ -620,18 +622,22 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusEventListenerService( new AzureServiceBusEventListenerService(
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey), handler: provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(), serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
subscriptionName: eventSubscriptionName,
globalSettings: globalSettings, globalSettings: globalSettings,
subscriptionName: eventSubscriptionName)); logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
)
);
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>(); services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
new AzureServiceBusIntegrationListenerService( new AzureServiceBusIntegrationListenerService(
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(), handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
topicName: globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName,
subscriptionName: integrationSubscriptionName, subscriptionName: integrationSubscriptionName,
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService>>(), maxRetries: globalSettings.EventLogging.AzureServiceBus.MaxRetries,
globalSettings: globalSettings)); serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService>>()));
return services; return services;
} }
@ -642,6 +648,8 @@ public static class ServiceCollectionExtensions
!CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName)) !CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
return services; return services;
services.AddSingleton<IAzureServiceBusService, AzureServiceBusService>();
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
services.AddAzureServiceBusEventRepositoryListener(globalSettings); services.AddAzureServiceBusEventRepositoryListener(globalSettings);
services.AddSlackService(globalSettings); services.AddSlackService(globalSettings);
@ -668,9 +676,9 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService( new RabbitMqEventListenerService(
provider.GetRequiredService<EventRepositoryHandler>(), provider.GetRequiredService<EventRepositoryHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(), globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName,
globalSettings, provider.GetRequiredService<IRabbitMqService>(),
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName)); provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
return services; return services;
} }
@ -679,19 +687,17 @@ public static class ServiceCollectionExtensions
string eventQueueName, string eventQueueName,
string integrationQueueName, string integrationQueueName,
string integrationRetryQueueName, string integrationRetryQueueName,
string integrationDeadLetterQueueName, int maxRetries,
IntegrationType integrationType, IntegrationType integrationType)
GlobalSettings globalSettings)
where TConfig : class where TConfig : class
where THandler : class, IIntegrationHandler<TConfig> where THandler : class, IIntegrationHandler<TConfig>
{ {
var routingKey = integrationType.ToRoutingKey(); var routingKey = integrationType.ToRoutingKey();
services.AddSingleton<IIntegrationPublisher, RabbitMqIntegrationPublisher>();
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) => services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
new EventIntegrationHandler<TConfig>( new EventIntegrationHandler<TConfig>(
integrationType, integrationType,
provider.GetRequiredService<IIntegrationPublisher>(), provider.GetRequiredService<IEventIntegrationPublisher>(),
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(), provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
provider.GetRequiredService<IUserRepository>(), provider.GetRequiredService<IUserRepository>(),
provider.GetRequiredService<IOrganizationRepository>())); provider.GetRequiredService<IOrganizationRepository>()));
@ -699,9 +705,9 @@ public static class ServiceCollectionExtensions
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService( new RabbitMqEventListenerService(
provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey), provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(), eventQueueName,
globalSettings, provider.GetRequiredService<IRabbitMqService>(),
eventQueueName)); provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>(); services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
services.AddSingleton<IHostedService>(provider => services.AddSingleton<IHostedService>(provider =>
@ -710,8 +716,8 @@ public static class ServiceCollectionExtensions
routingKey: routingKey, routingKey: routingKey,
queueName: integrationQueueName, queueName: integrationQueueName,
retryQueueName: integrationRetryQueueName, retryQueueName: integrationRetryQueueName,
deadLetterQueueName: integrationDeadLetterQueueName, maxRetries: maxRetries,
globalSettings: globalSettings, rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService>>())); logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService>>()));
return services; return services;
@ -724,6 +730,8 @@ public static class ServiceCollectionExtensions
return services; return services;
} }
services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
services.AddRabbitMqEventRepositoryListener(globalSettings); services.AddRabbitMqEventRepositoryListener(globalSettings);
services.AddSlackService(globalSettings); services.AddSlackService(globalSettings);
@ -731,18 +739,16 @@ public static class ServiceCollectionExtensions
globalSettings.EventLogging.RabbitMq.SlackEventsQueueName, globalSettings.EventLogging.RabbitMq.SlackEventsQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName, globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName, globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName, globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Slack, IntegrationType.Slack);
globalSettings);
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName); services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>( services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName, globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName, globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName,
globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName, globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName,
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName, globalSettings.EventLogging.RabbitMq.MaxRetries,
IntegrationType.Webhook, IntegrationType.Webhook);
globalSettings);
return services; return services;
} }

View File

@ -7,12 +7,17 @@ namespace Bit.Core.Test.Models.Data.Integrations;
public class IntegrationMessageTests public class IntegrationMessageTests
{ {
private const string _messageId = "TestMessageId";
[Fact] [Fact]
public void ApplyRetry_IncrementsRetryCountAndSetsDelayUntilDate() public void ApplyRetry_IncrementsRetryCountAndSetsDelayUntilDate()
{ {
var message = new IntegrationMessage<WebhookIntegrationConfigurationDetails> var message = new IntegrationMessage<WebhookIntegrationConfigurationDetails>
{ {
Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"),
MessageId = _messageId,
RetryCount = 2, RetryCount = 2,
RenderedTemplate = string.Empty,
DelayUntilDate = null DelayUntilDate = null
}; };
@ -30,19 +35,22 @@ public class IntegrationMessageTests
var message = new IntegrationMessage<WebhookIntegrationConfigurationDetails> var message = new IntegrationMessage<WebhookIntegrationConfigurationDetails>
{ {
Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"), Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"),
MessageId = _messageId,
RenderedTemplate = "This is the message", RenderedTemplate = "This is the message",
IntegrationType = IntegrationType.Webhook, IntegrationType = IntegrationType.Webhook,
RetryCount = 2, RetryCount = 2,
DelayUntilDate = null DelayUntilDate = DateTime.UtcNow
}; };
var json = message.ToJson(); var json = message.ToJson();
var result = IntegrationMessage<WebhookIntegrationConfigurationDetails>.FromJson(json); var result = IntegrationMessage<WebhookIntegrationConfigurationDetails>.FromJson(json);
Assert.Equal(message.Configuration, result.Configuration); Assert.Equal(message.Configuration, result.Configuration);
Assert.Equal(message.MessageId, result.MessageId);
Assert.Equal(message.RenderedTemplate, result.RenderedTemplate); Assert.Equal(message.RenderedTemplate, result.RenderedTemplate);
Assert.Equal(message.IntegrationType, result.IntegrationType); Assert.Equal(message.IntegrationType, result.IntegrationType);
Assert.Equal(message.RetryCount, result.RetryCount); Assert.Equal(message.RetryCount, result.RetryCount);
Assert.Equal(message.DelayUntilDate, result.DelayUntilDate);
} }
[Fact] [Fact]
@ -51,4 +59,26 @@ public class IntegrationMessageTests
var json = "{ Invalid JSON"; var json = "{ Invalid JSON";
Assert.Throws<JsonException>(() => IntegrationMessage<WebhookIntegrationConfigurationDetails>.FromJson(json)); Assert.Throws<JsonException>(() => IntegrationMessage<WebhookIntegrationConfigurationDetails>.FromJson(json));
} }
[Fact]
public void ToJson_BaseIntegrationMessage_DeserializesCorrectly()
{
var message = new IntegrationMessage
{
MessageId = _messageId,
RenderedTemplate = "This is the message",
IntegrationType = IntegrationType.Webhook,
RetryCount = 2,
DelayUntilDate = DateTime.UtcNow
};
var json = message.ToJson();
var result = JsonSerializer.Deserialize<IntegrationMessage>(json);
Assert.Equal(message.MessageId, result.MessageId);
Assert.Equal(message.RenderedTemplate, result.RenderedTemplate);
Assert.Equal(message.IntegrationType, result.IntegrationType);
Assert.Equal(message.RetryCount, result.RetryCount);
Assert.Equal(message.DelayUntilDate, result.DelayUntilDate);
}
} }

View File

@ -0,0 +1,133 @@
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using Microsoft.Extensions.Logging;
using NSubstitute;
using Xunit;
namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class AzureServiceBusEventListenerServiceTests
{
private readonly IEventMessageHandler _handler = Substitute.For<IEventMessageHandler>();
private readonly ILogger<AzureServiceBusEventListenerService> _logger =
Substitute.For<ILogger<AzureServiceBusEventListenerService>>();
private const string _messageId = "messageId";
private SutProvider<AzureServiceBusEventListenerService> GetSutProvider()
{
return new SutProvider<AzureServiceBusEventListenerService>()
.SetDependency(_handler)
.SetDependency(_logger)
.SetDependency("test-subscription", "subscriptionName")
.Create();
}
[Theory, BitAutoData]
public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessErrorAsync(args);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<Exception>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_EmptyJson_LogsError()
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(string.Empty, _messageId);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJson_LogsError()
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync("{ Inavlid JSON }", _messageId);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Is<object>(o => o.ToString().Contains("Invalid JSON")),
Arg.Any<Exception>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJsonArray_LogsError()
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(
"{ \"not a valid\", \"list of event messages\" }",
_messageId
);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJsonObject_LogsError()
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(
JsonSerializer.Serialize(DateTime.UtcNow), // wrong object - not EventMessage
_messageId
);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_SingleEvent_DelegatesToHandler(EventMessage message)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(
JsonSerializer.Serialize(message),
_messageId
);
await sutProvider.GetDependency<IEventMessageHandler>().Received(1).HandleEventAsync(
Arg.Is(AssertHelper.AssertPropertyEqual(message, new[] { "IdempotencyId" })));
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_ManyEvents_DelegatesToHandler(IEnumerable<EventMessage> messages)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessReceivedMessageAsync(
JsonSerializer.Serialize(messages),
_messageId
);
await sutProvider.GetDependency<IEventMessageHandler>().Received(1).HandleManyEventsAsync(
Arg.Is(AssertHelper.AssertPropertyEqual(messages, new[] { "IdempotencyId" })));
}
}

View File

@ -0,0 +1,124 @@
#nullable enable
using Azure.Messaging.ServiceBus;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Microsoft.Extensions.Logging;
using NSubstitute;
using Xunit;
namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class AzureServiceBusIntegrationListenerServiceTests
{
private const int _maxRetries = 3;
private const string _topicName = "test_topic";
private const string _subscriptionName = "test_subscription";
private readonly IIntegrationHandler _handler = Substitute.For<IIntegrationHandler>();
private readonly IAzureServiceBusService _serviceBusService = Substitute.For<IAzureServiceBusService>();
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger =
Substitute.For<ILogger<AzureServiceBusIntegrationListenerService>>();
private SutProvider<AzureServiceBusIntegrationListenerService> GetSutProvider()
{
return new SutProvider<AzureServiceBusIntegrationListenerService>()
.SetDependency(_handler)
.SetDependency(_serviceBusService)
.SetDependency(_topicName, "topicName")
.SetDependency(_subscriptionName, "subscriptionName")
.SetDependency(_maxRetries, "maxRetries")
.SetDependency(_logger)
.Create();
}
[Theory, BitAutoData]
public async Task ProcessErrorAsync_LogsError(ProcessErrorEventArgs args)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.ProcessErrorAsync(args);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<Exception>(),
Arg.Any<Func<object, Exception?, string>>());
}
[Theory, BitAutoData]
public async Task HandleMessageAsync_FailureNotRetryable_PublishesToDeadLetterQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = 0;
var result = new IntegrationHandlerResult(false, message);
result.Retryable = false;
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = (IntegrationMessage<WebhookIntegrationConfiguration>)IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson())!;
Assert.False(await sutProvider.Sut.HandleMessageAsync(message.ToJson()));
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default);
}
[Theory, BitAutoData]
public async Task HandleMessageAsync_FailureRetryableButTooManyRetries_PublishesToDeadLetterQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = _maxRetries;
var result = new IntegrationHandlerResult(false, message);
result.Retryable = true;
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = (IntegrationMessage<WebhookIntegrationConfiguration>)IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson())!;
Assert.False(await sutProvider.Sut.HandleMessageAsync(message.ToJson()));
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default);
}
[Theory, BitAutoData]
public async Task HandleMessageAsync_FailureRetryable_PublishesToRetryQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = 0;
var result = new IntegrationHandlerResult(false, message);
result.Retryable = true;
result.DelayUntilDate = DateTime.UtcNow.AddMinutes(1);
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = (IntegrationMessage<WebhookIntegrationConfiguration>)IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson())!;
Assert.True(await sutProvider.Sut.HandleMessageAsync(message.ToJson()));
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
await _serviceBusService.Received(1).PublishToRetryAsync(message);
}
[Theory, BitAutoData]
public async Task HandleMessageAsync_SuccessfulResult_Succeeds(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
var result = new IntegrationHandlerResult(true, message);
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = (IntegrationMessage<WebhookIntegrationConfiguration>)IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson())!;
Assert.True(await sutProvider.Sut.HandleMessageAsync(message.ToJson()));
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
await _serviceBusService.DidNotReceiveWithAnyArgs().PublishToRetryAsync(default);
}
}

View File

@ -0,0 +1,57 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using NSubstitute;
using Xunit;
namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class EventIntegrationEventWriteServiceTests
{
private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For<IEventIntegrationPublisher>();
private readonly EventIntegrationEventWriteService Subject;
public EventIntegrationEventWriteServiceTests()
{
Subject = new EventIntegrationEventWriteService(_eventIntegrationPublisher);
}
[Theory, BitAutoData]
public async Task CreateAsync_EventPublishedToEventQueue(EventMessage eventMessage)
{
var expected = JsonSerializer.Serialize(eventMessage);
await Subject.CreateAsync(eventMessage);
await _eventIntegrationPublisher.Received(1).PublishEventAsync(
Arg.Is<string>(body => AssertJsonStringsMatch(eventMessage, body)));
}
[Theory, BitAutoData]
public async Task CreateManyAsync_EventsPublishedToEventQueue(IEnumerable<EventMessage> eventMessages)
{
await Subject.CreateManyAsync(eventMessages);
await _eventIntegrationPublisher.Received(1).PublishEventAsync(
Arg.Is<string>(body => AssertJsonStringsMatch(eventMessages, body)));
}
private static bool AssertJsonStringsMatch(EventMessage expected, string body)
{
var actual = JsonSerializer.Deserialize<EventMessage>(body);
AssertHelper.AssertPropertyEqual(expected, actual, new[] { "IdempotencyId" });
return true;
}
private static bool AssertJsonStringsMatch(IEnumerable<EventMessage> expected, string body)
{
using var actual = JsonSerializer.Deserialize<IEnumerable<EventMessage>>(body).GetEnumerator();
foreach (var expectedMessage in expected)
{
actual.MoveNext();
AssertHelper.AssertPropertyEqual(expectedMessage, actual.Current, new[] { "IdempotencyId" });
}
return true;
}
}

View File

@ -24,7 +24,7 @@ public class EventIntegrationHandlerTests
private const string _templateWithActingUser = "#ActingUserName#, #ActingUserEmail#"; private const string _templateWithActingUser = "#ActingUserName#, #ActingUserEmail#";
private const string _url = "https://localhost"; private const string _url = "https://localhost";
private const string _url2 = "https://example.com"; private const string _url2 = "https://example.com";
private readonly IIntegrationPublisher _integrationPublisher = Substitute.For<IIntegrationPublisher>(); private readonly IEventIntegrationPublisher _eventIntegrationPublisher = Substitute.For<IEventIntegrationPublisher>();
private SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>> GetSutProvider( private SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>> GetSutProvider(
List<OrganizationIntegrationConfigurationDetails> configurations) List<OrganizationIntegrationConfigurationDetails> configurations)
@ -35,7 +35,7 @@ public class EventIntegrationHandlerTests
return new SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>>() return new SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>>()
.SetDependency(configurationRepository) .SetDependency(configurationRepository)
.SetDependency(_integrationPublisher) .SetDependency(_eventIntegrationPublisher)
.SetDependency(IntegrationType.Webhook) .SetDependency(IntegrationType.Webhook)
.Create(); .Create();
} }
@ -45,6 +45,7 @@ public class EventIntegrationHandlerTests
return new IntegrationMessage<WebhookIntegrationConfigurationDetails>() return new IntegrationMessage<WebhookIntegrationConfigurationDetails>()
{ {
IntegrationType = IntegrationType.Webhook, IntegrationType = IntegrationType.Webhook,
MessageId = "TestMessageId",
Configuration = new WebhookIntegrationConfigurationDetails(_url), Configuration = new WebhookIntegrationConfigurationDetails(_url),
RenderedTemplate = template, RenderedTemplate = template,
RetryCount = 0, RetryCount = 0,
@ -87,7 +88,7 @@ public class EventIntegrationHandlerTests
var sutProvider = GetSutProvider(NoConfigurations()); var sutProvider = GetSutProvider(NoConfigurations());
await sutProvider.Sut.HandleEventAsync(eventMessage); await sutProvider.Sut.HandleEventAsync(eventMessage);
Assert.Empty(_integrationPublisher.ReceivedCalls()); Assert.Empty(_eventIntegrationPublisher.ReceivedCalls());
} }
[Theory, BitAutoData] [Theory, BitAutoData]
@ -101,8 +102,9 @@ public class EventIntegrationHandlerTests
$"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}"
); );
Assert.Single(_integrationPublisher.ReceivedCalls()); Assert.Single(_eventIntegrationPublisher.ReceivedCalls());
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>()); await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>());
await sutProvider.GetDependency<IUserRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>()); await sutProvider.GetDependency<IUserRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>());
} }
@ -120,8 +122,9 @@ public class EventIntegrationHandlerTests
var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}"); var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}");
Assert.Single(_integrationPublisher.ReceivedCalls()); Assert.Single(_eventIntegrationPublisher.ReceivedCalls());
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>()); await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>());
await sutProvider.GetDependency<IUserRepository>().Received(1).GetByIdAsync(eventMessage.ActingUserId ?? Guid.Empty); await sutProvider.GetDependency<IUserRepository>().Received(1).GetByIdAsync(eventMessage.ActingUserId ?? Guid.Empty);
} }
@ -136,12 +139,13 @@ public class EventIntegrationHandlerTests
sutProvider.GetDependency<IOrganizationRepository>().GetByIdAsync(Arg.Any<Guid>()).Returns(organization); sutProvider.GetDependency<IOrganizationRepository>().GetByIdAsync(Arg.Any<Guid>()).Returns(organization);
await sutProvider.Sut.HandleEventAsync(eventMessage); await sutProvider.Sut.HandleEventAsync(eventMessage);
Assert.Single(_integrationPublisher.ReceivedCalls()); Assert.Single(_eventIntegrationPublisher.ReceivedCalls());
var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"Org: {organization.Name}"); var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"Org: {organization.Name}");
Assert.Single(_integrationPublisher.ReceivedCalls()); Assert.Single(_eventIntegrationPublisher.ReceivedCalls());
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
await sutProvider.GetDependency<IOrganizationRepository>().Received(1).GetByIdAsync(eventMessage.OrganizationId ?? Guid.Empty); await sutProvider.GetDependency<IOrganizationRepository>().Received(1).GetByIdAsync(eventMessage.OrganizationId ?? Guid.Empty);
await sutProvider.GetDependency<IUserRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>()); await sutProvider.GetDependency<IUserRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>());
} }
@ -159,8 +163,9 @@ public class EventIntegrationHandlerTests
var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}"); var expectedMessage = EventIntegrationHandlerTests.expectedMessage($"{user.Name}, {user.Email}");
Assert.Single(_integrationPublisher.ReceivedCalls()); Assert.Single(_eventIntegrationPublisher.ReceivedCalls());
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>()); await sutProvider.GetDependency<IOrganizationRepository>().DidNotReceiveWithAnyArgs().GetByIdAsync(Arg.Any<Guid>());
await sutProvider.GetDependency<IUserRepository>().Received(1).GetByIdAsync(eventMessage.UserId ?? Guid.Empty); await sutProvider.GetDependency<IUserRepository>().Received(1).GetByIdAsync(eventMessage.UserId ?? Guid.Empty);
} }
@ -171,7 +176,7 @@ public class EventIntegrationHandlerTests
var sutProvider = GetSutProvider(NoConfigurations()); var sutProvider = GetSutProvider(NoConfigurations());
await sutProvider.Sut.HandleManyEventsAsync(eventMessages); await sutProvider.Sut.HandleManyEventsAsync(eventMessages);
Assert.Empty(_integrationPublisher.ReceivedCalls()); Assert.Empty(_eventIntegrationPublisher.ReceivedCalls());
} }
[Theory, BitAutoData] [Theory, BitAutoData]
@ -186,7 +191,8 @@ public class EventIntegrationHandlerTests
var expectedMessage = EventIntegrationHandlerTests.expectedMessage( var expectedMessage = EventIntegrationHandlerTests.expectedMessage(
$"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}"
); );
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
} }
} }
@ -203,10 +209,12 @@ public class EventIntegrationHandlerTests
var expectedMessage = EventIntegrationHandlerTests.expectedMessage( var expectedMessage = EventIntegrationHandlerTests.expectedMessage(
$"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}" $"Date: {eventMessage.Date}, Type: {eventMessage.Type}, UserId: {eventMessage.UserId}"
); );
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
expectedMessage.Configuration = new WebhookIntegrationConfigurationDetails(_url2); expectedMessage.Configuration = new WebhookIntegrationConfigurationDetails(_url2);
await _integrationPublisher.Received(1).PublishAsync(Arg.Is(AssertHelper.AssertPropertyEqual(expectedMessage))); await _eventIntegrationPublisher.Received(1).PublishAsync(Arg.Is(
AssertHelper.AssertPropertyEqual(expectedMessage, new[] { "MessageId" })));
} }
} }
} }

View File

@ -15,6 +15,7 @@ public class IntegrationHandlerTests
var expected = new IntegrationMessage<WebhookIntegrationConfigurationDetails>() var expected = new IntegrationMessage<WebhookIntegrationConfigurationDetails>()
{ {
Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"), Configuration = new WebhookIntegrationConfigurationDetails("https://localhost"),
MessageId = "TestMessageId",
IntegrationType = IntegrationType.Webhook, IntegrationType = IntegrationType.Webhook,
RenderedTemplate = "Template", RenderedTemplate = "Template",
DelayUntilDate = null, DelayUntilDate = null,

View File

@ -0,0 +1,173 @@
using System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using Microsoft.Extensions.Logging;
using NSubstitute;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class RabbitMqEventListenerServiceTests
{
private const string _queueName = "test_queue";
private readonly IRabbitMqService _rabbitMqService = Substitute.For<IRabbitMqService>();
private readonly ILogger<RabbitMqEventListenerService> _logger = Substitute.For<ILogger<RabbitMqEventListenerService>>();
private SutProvider<RabbitMqEventListenerService> GetSutProvider()
{
return new SutProvider<RabbitMqEventListenerService>()
.SetDependency(_rabbitMqService)
.SetDependency(_logger)
.SetDependency(_queueName, "queueName")
.Create();
}
[Fact]
public async Task StartAsync_CreatesQueue()
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
await _rabbitMqService.Received(1).CreateEventQueueAsync(
Arg.Is(_queueName),
Arg.Is(cancellationToken)
);
}
[Fact]
public async Task ProcessReceivedMessageAsync_EmptyJson_LogsError()
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: new byte[0]);
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJson_LogsError()
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes("{ Inavlid JSON"));
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Is<object>(o => o.ToString().Contains("Invalid JSON")),
Arg.Any<Exception>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJsonArray_LogsError()
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes(new[] { "not a valid", "list of event messages" }));
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Fact]
public async Task ProcessReceivedMessageAsync_InvalidJsonObject_LogsError()
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes(DateTime.UtcNow)); // wrong object - not EventMessage
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
_logger.Received(1).Log(
LogLevel.Error,
Arg.Any<EventId>(),
Arg.Any<object>(),
Arg.Any<JsonException>(),
Arg.Any<Func<object, Exception, string>>());
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_SingleEvent_DelegatesToHandler(EventMessage message)
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes(message));
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
await sutProvider.GetDependency<IEventMessageHandler>().Received(1).HandleEventAsync(
Arg.Is(AssertHelper.AssertPropertyEqual(message, new[] { "IdempotencyId" })));
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_ManyEvents_DelegatesToHandler(IEnumerable<EventMessage> messages)
{
var sutProvider = GetSutProvider();
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: JsonSerializer.SerializeToUtf8Bytes(messages));
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs);
await sutProvider.GetDependency<IEventMessageHandler>().Received(1).HandleManyEventsAsync(
Arg.Is(AssertHelper.AssertPropertyEqual(messages, new[] { "IdempotencyId" })));
}
}

View File

@ -0,0 +1,230 @@
using System.Text;
using Bit.Core.AdminConsole.Models.Data.Integrations;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using NSubstitute;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
namespace Bit.Core.Test.Services;
[SutProviderCustomize]
public class RabbitMqIntegrationListenerServiceTests
{
private const int _maxRetries = 3;
private const string _queueName = "test_queue";
private const string _retryQueueName = "test_queue_retry";
private const string _routingKey = "test_routing_key";
private readonly IIntegrationHandler _handler = Substitute.For<IIntegrationHandler>();
private readonly IRabbitMqService _rabbitMqService = Substitute.For<IRabbitMqService>();
private SutProvider<RabbitMqIntegrationListenerService> GetSutProvider()
{
return new SutProvider<RabbitMqIntegrationListenerService>()
.SetDependency(_handler)
.SetDependency(_rabbitMqService)
.SetDependency(_queueName, "queueName")
.SetDependency(_retryQueueName, "retryQueueName")
.SetDependency(_routingKey, "routingKey")
.SetDependency(_maxRetries, "maxRetries")
.Create();
}
[Fact]
public async Task StartAsync_CreatesQueues()
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
await _rabbitMqService.Received(1).CreateIntegrationQueuesAsync(
Arg.Is(_queueName),
Arg.Is(_retryQueueName),
Arg.Is(_routingKey),
Arg.Is(cancellationToken)
);
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_FailureNotRetryable_PublishesToDeadLetterQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = 0;
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: Encoding.UTF8.GetBytes(message.ToJson())
);
var result = new IntegrationHandlerResult(false, message);
result.Retryable = false;
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson());
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken);
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
await _rabbitMqService.Received(1).PublishToDeadLetterAsync(
Arg.Any<IChannel>(),
Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })),
Arg.Any<CancellationToken>());
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.RepublishToRetryQueueAsync(default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToRetryAsync(default, default, default);
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_FailureRetryableButTooManyRetries_PublishesToDeadLetterQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = _maxRetries;
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: Encoding.UTF8.GetBytes(message.ToJson())
);
var result = new IntegrationHandlerResult(false, message);
result.Retryable = true;
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson());
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken);
expected.ApplyRetry(result.DelayUntilDate);
await _rabbitMqService.Received(1).PublishToDeadLetterAsync(
Arg.Any<IChannel>(),
Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })),
Arg.Any<CancellationToken>());
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.RepublishToRetryQueueAsync(default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToRetryAsync(default, default, default);
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_FailureRetryable_PublishesToRetryQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
message.RetryCount = 0;
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: Encoding.UTF8.GetBytes(message.ToJson())
);
var result = new IntegrationHandlerResult(false, message);
result.Retryable = true;
result.DelayUntilDate = DateTime.UtcNow.AddMinutes(1);
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
var expected = IntegrationMessage<WebhookIntegrationConfiguration>.FromJson(message.ToJson());
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken);
await _handler.Received(1).HandleAsync(Arg.Is(expected.ToJson()));
expected.ApplyRetry(result.DelayUntilDate);
await _rabbitMqService.Received(1).PublishToRetryAsync(
Arg.Any<IChannel>(),
Arg.Is(AssertHelper.AssertPropertyEqual(expected, new[] { "DelayUntilDate" })),
Arg.Any<CancellationToken>());
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.RepublishToRetryQueueAsync(default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToDeadLetterAsync(default, default, default);
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_SuccessfulResult_Succeeds(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(-1);
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: Encoding.UTF8.GetBytes(message.ToJson())
);
var result = new IntegrationHandlerResult(true, message);
_handler.HandleAsync(Arg.Any<string>()).Returns(result);
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken);
await _handler.Received(1).HandleAsync(Arg.Is(message.ToJson()));
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.RepublishToRetryQueueAsync(default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToRetryAsync(default, default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToDeadLetterAsync(default, default, default);
}
[Theory, BitAutoData]
public async Task ProcessReceivedMessageAsync_TooEarlyRetry_RepublishesToRetryQueue(IntegrationMessage<WebhookIntegrationConfiguration> message)
{
var sutProvider = GetSutProvider();
var cancellationToken = CancellationToken.None;
await sutProvider.Sut.StartAsync(cancellationToken);
message.DelayUntilDate = DateTime.UtcNow.AddMinutes(1);
var eventArgs = new BasicDeliverEventArgs(
consumerTag: string.Empty,
deliveryTag: 0,
redelivered: true,
exchange: string.Empty,
routingKey: string.Empty,
new BasicProperties(),
body: Encoding.UTF8.GetBytes(message.ToJson())
);
await sutProvider.Sut.ProcessReceivedMessageAsync(eventArgs, cancellationToken);
await _rabbitMqService.Received(1)
.RepublishToRetryQueueAsync(Arg.Any<IChannel>(), Arg.Any<BasicDeliverEventArgs>());
await _handler.DidNotReceiveWithAnyArgs().HandleAsync(default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToRetryAsync(default, default, default);
await _rabbitMqService.DidNotReceiveWithAnyArgs()
.PublishToDeadLetterAsync(default, default, default);
}
}

View File

@ -1,4 +1,6 @@
using System.Net; #nullable enable
using System.Net;
using System.Text.Json; using System.Text.Json;
using Bit.Core.Services; using Bit.Core.Services;
using Bit.Test.Common.AutoFixture; using Bit.Test.Common.AutoFixture;
@ -257,10 +259,10 @@ public class SlackServiceTests
public void GetRedirectUrl_ReturnsCorrectUrl() public void GetRedirectUrl_ReturnsCorrectUrl()
{ {
var sutProvider = GetSutProvider(); var sutProvider = GetSutProvider();
var ClientId = sutProvider.GetDependency<GlobalSettings>().Slack.ClientId; var clientId = sutProvider.GetDependency<GlobalSettings>().Slack.ClientId;
var Scopes = sutProvider.GetDependency<GlobalSettings>().Slack.Scopes; var scopes = sutProvider.GetDependency<GlobalSettings>().Slack.Scopes;
var redirectUrl = "https://example.com/callback"; var redirectUrl = "https://example.com/callback";
var expectedUrl = $"https://slack.com/oauth/v2/authorize?client_id={ClientId}&scope={Scopes}&redirect_uri={redirectUrl}"; var expectedUrl = $"https://slack.com/oauth/v2/authorize?client_id={clientId}&scope={scopes}&redirect_uri={redirectUrl}";
var result = sutProvider.Sut.GetRedirectUrl(redirectUrl); var result = sutProvider.Sut.GetRedirectUrl(redirectUrl);
Assert.Equal(expectedUrl, result); Assert.Equal(expectedUrl, result);
} }

View File

@ -79,6 +79,7 @@ public class WebhookIntegrationHandlerTests
Assert.Equal(result.Message, message); Assert.Equal(result.Message, message);
Assert.True(result.DelayUntilDate.HasValue); Assert.True(result.DelayUntilDate.HasValue);
Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61)); Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61));
Assert.Equal("Too Many Requests", result.FailureReason);
} }
[Theory, BitAutoData] [Theory, BitAutoData]
@ -99,6 +100,7 @@ public class WebhookIntegrationHandlerTests
Assert.Equal(result.Message, message); Assert.Equal(result.Message, message);
Assert.True(result.DelayUntilDate.HasValue); Assert.True(result.DelayUntilDate.HasValue);
Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61)); Assert.InRange(result.DelayUntilDate.Value, DateTime.UtcNow.AddSeconds(59), DateTime.UtcNow.AddSeconds(61));
Assert.Equal("Too Many Requests", result.FailureReason);
} }
[Theory, BitAutoData] [Theory, BitAutoData]
@ -117,6 +119,7 @@ public class WebhookIntegrationHandlerTests
Assert.True(result.Retryable); Assert.True(result.Retryable);
Assert.Equal(result.Message, message); Assert.Equal(result.Message, message);
Assert.False(result.DelayUntilDate.HasValue); Assert.False(result.DelayUntilDate.HasValue);
Assert.Equal("Internal Server Error", result.FailureReason);
} }
[Theory, BitAutoData] [Theory, BitAutoData]
@ -135,5 +138,6 @@ public class WebhookIntegrationHandlerTests
Assert.False(result.Retryable); Assert.False(result.Retryable);
Assert.Equal(result.Message, message); Assert.Equal(result.Message, message);
Assert.Null(result.DelayUntilDate); Assert.Null(result.DelayUntilDate);
Assert.Equal("Temporary Redirect", result.FailureReason);
} }
} }