1
0
mirror of https://github.com/bitwarden/server.git synced 2025-05-20 19:14:32 -05:00

[PM-17562] Refactor to Support Multiple Message Payloads (#5400)

* [PM-17562] Refactor to Support Multiple Message Payloads

* Change signature as per PR suggestion
This commit is contained in:
Brant DeBow 2025-02-14 13:38:27 -05:00 committed by GitHub
parent 5709ea36f4
commit f80acaec0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 116 additions and 29 deletions

View File

@ -5,4 +5,6 @@ namespace Bit.Core.Services;
public interface IEventMessageHandler public interface IEventMessageHandler
{ {
Task HandleEventAsync(EventMessage eventMessage); Task HandleEventAsync(EventMessage eventMessage);
Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages);
} }

View File

@ -1,4 +1,5 @@
using System.Text.Json; using System.Text;
using System.Text.Json;
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus;
using Bit.Core.Models.Data; using Bit.Core.Models.Data;
using Bit.Core.Settings; using Bit.Core.Settings;
@ -29,9 +30,20 @@ public class AzureServiceBusEventListenerService : EventLoggingListenerService
{ {
try try
{ {
var eventMessage = JsonSerializer.Deserialize<EventMessage>(args.Message.Body.ToString()); using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(args.Message.Body));
var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array)
{
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
await _handler.HandleManyEventsAsync(eventMessages);
}
else if (root.ValueKind == JsonValueKind.Object)
{
var eventMessage = root.Deserialize<EventMessage>();
await _handler.HandleEventAsync(eventMessage); await _handler.HandleEventAsync(eventMessage);
}
await args.CompleteMessageAsync(args.Message); await args.CompleteMessageAsync(args.Message);
} }
catch (Exception exception) catch (Exception exception)

View File

@ -29,10 +29,12 @@ public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDispos
public async Task CreateManyAsync(IEnumerable<IEvent> events) public async Task CreateManyAsync(IEnumerable<IEvent> events)
{ {
foreach (var e in events) var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(events))
{ {
await CreateAsync(e); ContentType = "application/json"
} };
await _sender.SendMessageAsync(message);
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()

View File

@ -11,4 +11,9 @@ public class AzureTableStorageEventHandler(
{ {
return eventWriteService.CreateManyAsync(EventTableEntity.IndexEvent(eventMessage)); return eventWriteService.CreateManyAsync(EventTableEntity.IndexEvent(eventMessage));
} }
public Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
{
return eventWriteService.CreateManyAsync(eventMessages.SelectMany(EventTableEntity.IndexEvent));
}
} }

View File

@ -11,4 +11,9 @@ public class EventRepositoryHandler(
{ {
return eventWriteService.CreateAsync(eventMessage); return eventWriteService.CreateAsync(eventMessage);
} }
public Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
{
return eventWriteService.CreateManyAsync(eventMessages);
}
} }

View File

@ -1,4 +1,5 @@
using System.Text.Json; using System.Text;
using System.Text.Json;
using Bit.Core.Models.Data; using Bit.Core.Models.Data;
using Bit.Core.Settings; using Bit.Core.Settings;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -62,8 +63,20 @@ public class RabbitMqEventListenerService : EventLoggingListenerService
{ {
try try
{ {
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span); using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(eventArgs.Body.Span));
var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array)
{
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
await _handler.HandleManyEventsAsync(eventMessages);
}
else if (root.ValueKind == JsonValueKind.Object)
{
var eventMessage = root.Deserialize<EventMessage>();
await _handler.HandleEventAsync(eventMessage); await _handler.HandleEventAsync(eventMessage);
}
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -41,13 +41,10 @@ public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
using var channel = await connection.CreateChannelAsync(); using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true); await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
foreach (var e in events) var body = JsonSerializer.SerializeToUtf8Bytes(events);
{
var body = JsonSerializer.SerializeToUtf8Bytes(e);
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body); await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
} }
}
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {

View File

@ -4,20 +4,15 @@ using Bit.Core.Settings;
namespace Bit.Core.Services; namespace Bit.Core.Services;
public class WebhookEventHandler : IEventMessageHandler public class WebhookEventHandler(
{
private readonly HttpClient _httpClient;
private readonly string _webhookUrl;
public const string HttpClientName = "WebhookEventHandlerHttpClient";
public WebhookEventHandler(
IHttpClientFactory httpClientFactory, IHttpClientFactory httpClientFactory,
GlobalSettings globalSettings) GlobalSettings globalSettings)
{ : IEventMessageHandler
_httpClient = httpClientFactory.CreateClient(HttpClientName); {
_webhookUrl = globalSettings.EventLogging.WebhookUrl; private readonly HttpClient _httpClient = httpClientFactory.CreateClient(HttpClientName);
} private readonly string _webhookUrl = globalSettings.EventLogging.WebhookUrl;
public const string HttpClientName = "WebhookEventHandlerHttpClient";
public async Task HandleEventAsync(EventMessage eventMessage) public async Task HandleEventAsync(EventMessage eventMessage)
{ {
@ -25,4 +20,11 @@ public class WebhookEventHandler : IEventMessageHandler
var response = await _httpClient.PostAsync(_webhookUrl, content); var response = await _httpClient.PostAsync(_webhookUrl, content);
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
} }
public async Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
{
var content = JsonContent.Create(eventMessages);
var response = await _httpClient.PostAsync(_webhookUrl, content);
response.EnsureSuccessStatusCode();
}
} }

View File

@ -1,4 +1,5 @@
using System.Globalization; using System.Globalization;
using Bit.Core.AdminConsole.Services.Implementations;
using Bit.Core.Context; using Bit.Core.Context;
using Bit.Core.IdentityServer; using Bit.Core.IdentityServer;
using Bit.Core.Services; using Bit.Core.Services;
@ -62,13 +63,31 @@ public class Startup
} }
services.AddScoped<IEventService, EventService>(); services.AddScoped<IEventService, EventService>();
if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString)) if (!globalSettings.SelfHosted && CoreHelpers.SettingHasValue(globalSettings.Events.ConnectionString))
{
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.TopicName))
{
services.AddSingleton<IEventWriteService, AzureServiceBusEventWriteService>();
}
else
{ {
services.AddSingleton<IEventWriteService, AzureQueueEventWriteService>(); services.AddSingleton<IEventWriteService, AzureQueueEventWriteService>();
} }
}
else
{
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HostName) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Username) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddSingleton<IEventWriteService, RabbitMqEventWriteService>();
}
else else
{ {
services.AddSingleton<IEventWriteService, RepositoryEventWriteService>(); services.AddSingleton<IEventWriteService, RepositoryEventWriteService>();
} }
}
services.AddOptionality(); services.AddOptionality();

View File

@ -21,4 +21,15 @@ public class EventRepositoryHandlerTests
Arg.Is(AssertHelper.AssertPropertyEqual<IEvent>(eventMessage)) Arg.Is(AssertHelper.AssertPropertyEqual<IEvent>(eventMessage))
); );
} }
[Theory, BitAutoData]
public async Task HandleManyEventAsync_WritesEventsToIEventWriteService(
IEnumerable<EventMessage> eventMessages,
SutProvider<EventRepositoryHandler> sutProvider)
{
await sutProvider.Sut.HandleManyEventsAsync(eventMessages);
await sutProvider.GetDependency<IEventWriteService>().Received(1).CreateManyAsync(
Arg.Is(AssertHelper.AssertPropertyEqual<IEvent>(eventMessages))
);
}
} }

View File

@ -44,10 +44,9 @@ public class WebhookEventHandlerTests
} }
[Theory, BitAutoData] [Theory, BitAutoData]
public async Task HandleEventAsync_PostsEventsToUrl(EventMessage eventMessage) public async Task HandleEventAsync_PostsEventToUrl(EventMessage eventMessage)
{ {
var sutProvider = GetSutProvider(); var sutProvider = GetSutProvider();
var content = JsonContent.Create(eventMessage);
await sutProvider.Sut.HandleEventAsync(eventMessage); await sutProvider.Sut.HandleEventAsync(eventMessage);
sutProvider.GetDependency<IHttpClientFactory>().Received(1).CreateClient( sutProvider.GetDependency<IHttpClientFactory>().Received(1).CreateClient(
@ -63,4 +62,24 @@ public class WebhookEventHandlerTests
Assert.Equal(_webhookUrl, request.RequestUri.ToString()); Assert.Equal(_webhookUrl, request.RequestUri.ToString());
AssertHelper.AssertPropertyEqual(eventMessage, returned, new[] { "IdempotencyId" }); AssertHelper.AssertPropertyEqual(eventMessage, returned, new[] { "IdempotencyId" });
} }
[Theory, BitAutoData]
public async Task HandleEventManyAsync_PostsEventsToUrl(IEnumerable<EventMessage> eventMessages)
{
var sutProvider = GetSutProvider();
await sutProvider.Sut.HandleManyEventsAsync(eventMessages);
sutProvider.GetDependency<IHttpClientFactory>().Received(1).CreateClient(
Arg.Is(AssertHelper.AssertPropertyEqual<string>(WebhookEventHandler.HttpClientName))
);
Assert.Single(_handler.CapturedRequests);
var request = _handler.CapturedRequests[0];
Assert.NotNull(request);
var returned = request.Content.ReadFromJsonAsAsyncEnumerable<EventMessage>();
Assert.Equal(HttpMethod.Post, request.Method);
Assert.Equal(_webhookUrl, request.RequestUri.ToString());
AssertHelper.AssertPropertyEqual(eventMessages, returned, new[] { "IdempotencyId" });
}
} }