mirror of
https://github.com/bitwarden/server.git
synced 2025-07-18 16:11:28 -05:00
[PM-17562] Add in-memory cache for event integrations (#6085)
* [PM-17562] Add in-memory cache for event integrations * Fix Sql error * Fix failing test * Add additional tests for new cache service * PR suggestions addressed
This commit is contained in:
@ -8,6 +8,7 @@ namespace Bit.Core.Models.Data.Organizations;
|
||||
public class OrganizationIntegrationConfigurationDetails
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
public Guid OrganizationId { get; set; }
|
||||
public Guid OrganizationIntegrationId { get; set; }
|
||||
public IntegrationType IntegrationType { get; set; }
|
||||
public EventType EventType { get; set; }
|
||||
|
@ -10,4 +10,6 @@ public interface IOrganizationIntegrationConfigurationRepository : IRepository<O
|
||||
Guid organizationId,
|
||||
IntegrationType integrationType,
|
||||
EventType eventType);
|
||||
|
||||
Task<List<OrganizationIntegrationConfigurationDetails>> GetAllConfigurationDetailsAsync();
|
||||
}
|
||||
|
@ -0,0 +1,14 @@
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public interface IIntegrationConfigurationDetailsCache
|
||||
{
|
||||
List<OrganizationIntegrationConfigurationDetails> GetConfigurationDetails(
|
||||
Guid organizationId,
|
||||
IntegrationType integrationType,
|
||||
EventType eventType);
|
||||
}
|
@ -14,7 +14,7 @@ public class EventIntegrationHandler<T>(
|
||||
IntegrationType integrationType,
|
||||
IEventIntegrationPublisher eventIntegrationPublisher,
|
||||
IIntegrationFilterService integrationFilterService,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository,
|
||||
IIntegrationConfigurationDetailsCache configurationCache,
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository,
|
||||
ILogger<EventIntegrationHandler<T>> logger)
|
||||
@ -27,7 +27,7 @@ public class EventIntegrationHandler<T>(
|
||||
return;
|
||||
}
|
||||
|
||||
var configurations = await configurationRepository.GetConfigurationDetailsAsync(
|
||||
var configurations = configurationCache.GetConfigurationDetails(
|
||||
organizationId,
|
||||
integrationType,
|
||||
eventMessage.Type);
|
||||
|
@ -0,0 +1,73 @@
|
||||
using System.Diagnostics;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
using Bit.Core.Repositories;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class IntegrationConfigurationDetailsCacheService : BackgroundService, IIntegrationConfigurationDetailsCache
|
||||
{
|
||||
private readonly record struct IntegrationCacheKey(Guid OrganizationId, IntegrationType IntegrationType, EventType EventType);
|
||||
private readonly IOrganizationIntegrationConfigurationRepository _repository;
|
||||
private readonly ILogger<IntegrationConfigurationDetailsCacheService> _logger;
|
||||
private readonly TimeSpan _refreshInterval;
|
||||
private Dictionary<IntegrationCacheKey, List<OrganizationIntegrationConfigurationDetails>> _cache = new();
|
||||
|
||||
public IntegrationConfigurationDetailsCacheService(
|
||||
IOrganizationIntegrationConfigurationRepository repository,
|
||||
GlobalSettings globalSettings,
|
||||
ILogger<IntegrationConfigurationDetailsCacheService> logger
|
||||
)
|
||||
{
|
||||
_repository = repository;
|
||||
_logger = logger;
|
||||
_refreshInterval = TimeSpan.FromMinutes(globalSettings.EventLogging.IntegrationCacheRefreshIntervalMinutes);
|
||||
}
|
||||
|
||||
public List<OrganizationIntegrationConfigurationDetails> GetConfigurationDetails(
|
||||
Guid organizationId,
|
||||
IntegrationType integrationType,
|
||||
EventType eventType)
|
||||
{
|
||||
var key = new IntegrationCacheKey(organizationId, integrationType, eventType);
|
||||
return _cache.TryGetValue(key, out var value)
|
||||
? value
|
||||
: new List<OrganizationIntegrationConfigurationDetails>();
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await RefreshAsync();
|
||||
|
||||
var timer = new PeriodicTimer(_refreshInterval);
|
||||
while (await timer.WaitForNextTickAsync(stoppingToken))
|
||||
{
|
||||
await RefreshAsync();
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task RefreshAsync()
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
try
|
||||
{
|
||||
var newCache = (await _repository.GetAllConfigurationDetailsAsync())
|
||||
.GroupBy(x => new IntegrationCacheKey(x.OrganizationId, x.IntegrationType, x.EventType))
|
||||
.ToDictionary(g => g.Key, g => g.ToList());
|
||||
_cache = newCache;
|
||||
|
||||
stopwatch.Stop();
|
||||
_logger.LogInformation(
|
||||
"[IntegrationConfigurationDetailsCacheService] Refreshed successfully: {Count} entries in {Duration}ms",
|
||||
newCache.Count,
|
||||
stopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError("[IntegrationConfigurationDetailsCacheService] Refresh failed: {ex}", ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -290,6 +290,35 @@ graph TD
|
||||
C1 -->|Has many| B1_2[IntegrationFilterRule]
|
||||
C1 -->|Can contain| C2[IntegrationFilterGroup...]
|
||||
```
|
||||
## Caching
|
||||
|
||||
To reduce database load and improve performance, integration configurations are cached in-memory as a Dictionary
|
||||
with a periodic load of all configurations. Without caching, each incoming `EventMessage` would trigger a database
|
||||
query to retrieve the relevant `OrganizationIntegrationConfigurationDetails`.
|
||||
|
||||
By loading all configurations into memory on a fixed interval, we ensure:
|
||||
|
||||
- Consistent performance for reads.
|
||||
- Reduced database pressure.
|
||||
- Predictable refresh timing, independent of event activity.
|
||||
|
||||
### Architecture / Design
|
||||
|
||||
- The cache is read-only for consumers. It is only updated in bulk by a background refresh process.
|
||||
- The cache is fully replaced on each refresh to avoid locking or partial state.
|
||||
- Reads return a `List<OrganizationIntegrationConfigurationDetails>` for a given key or an empty list if no
|
||||
match exists.
|
||||
- Failures or delays in the loading process do not affect the existing cache state. The cache will continue serving
|
||||
the last known good state until the update replaces the whole cache.
|
||||
|
||||
### Background Refresh
|
||||
|
||||
A hosted service (`IntegrationConfigurationDetailsCacheService`) runs in the background and:
|
||||
|
||||
- Loads all configuration records at application startup.
|
||||
- Refreshes the cache on a configurable interval.
|
||||
- Logs timing and entry count on success.
|
||||
- Logs exceptions on failure without disrupting application flow.
|
||||
|
||||
# Building a new integration
|
||||
|
||||
|
@ -287,6 +287,7 @@ public class GlobalSettings : IGlobalSettings
|
||||
{
|
||||
public AzureServiceBusSettings AzureServiceBus { get; set; } = new AzureServiceBusSettings();
|
||||
public RabbitMqSettings RabbitMq { get; set; } = new RabbitMqSettings();
|
||||
public int IntegrationCacheRefreshIntervalMinutes { get; set; } = 10;
|
||||
|
||||
public class AzureServiceBusSettings
|
||||
{
|
||||
|
@ -40,4 +40,16 @@ public class OrganizationIntegrationConfigurationRepository : Repository<Organiz
|
||||
return results.ToList();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<List<OrganizationIntegrationConfigurationDetails>> GetAllConfigurationDetailsAsync()
|
||||
{
|
||||
using (var connection = new SqlConnection(ConnectionString))
|
||||
{
|
||||
var results = await connection.QueryAsync<OrganizationIntegrationConfigurationDetails>(
|
||||
"[dbo].[OrganizationIntegrationConfigurationDetails_ReadMany]",
|
||||
commandType: CommandType.StoredProcedure);
|
||||
|
||||
return results.ToList();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,4 +30,14 @@ public class OrganizationIntegrationConfigurationRepository : Repository<Core.Ad
|
||||
return await query.Run(dbContext).ToListAsync();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<List<OrganizationIntegrationConfigurationDetails>> GetAllConfigurationDetailsAsync()
|
||||
{
|
||||
using (var scope = ServiceScopeFactory.CreateScope())
|
||||
{
|
||||
var dbContext = GetDatabaseContext(scope);
|
||||
var query = new OrganizationIntegrationConfigurationDetailsReadManyQuery();
|
||||
return await query.Run(dbContext).ToListAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
using Bit.Core.Enums;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
|
||||
namespace Bit.Infrastructure.EntityFramework.Repositories.Queries;
|
||||
@ -27,6 +29,7 @@ public class OrganizationIntegrationConfigurationDetailsReadManyByEventTypeOrgan
|
||||
select new OrganizationIntegrationConfigurationDetails()
|
||||
{
|
||||
Id = oic.Id,
|
||||
OrganizationId = oi.OrganizationId,
|
||||
OrganizationIntegrationId = oic.OrganizationIntegrationId,
|
||||
IntegrationType = oi.Type,
|
||||
EventType = oic.EventType,
|
||||
|
@ -0,0 +1,28 @@
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
|
||||
namespace Bit.Infrastructure.EntityFramework.Repositories.Queries;
|
||||
|
||||
public class OrganizationIntegrationConfigurationDetailsReadManyQuery : IQuery<OrganizationIntegrationConfigurationDetails>
|
||||
{
|
||||
public IQueryable<OrganizationIntegrationConfigurationDetails> Run(DatabaseContext dbContext)
|
||||
{
|
||||
var query = from oic in dbContext.OrganizationIntegrationConfigurations
|
||||
join oi in dbContext.OrganizationIntegrations on oic.OrganizationIntegrationId equals oi.Id into oioic
|
||||
from oi in dbContext.OrganizationIntegrations
|
||||
select new OrganizationIntegrationConfigurationDetails()
|
||||
{
|
||||
Id = oic.Id,
|
||||
OrganizationId = oi.OrganizationId,
|
||||
OrganizationIntegrationId = oic.OrganizationIntegrationId,
|
||||
IntegrationType = oi.Type,
|
||||
EventType = oic.EventType,
|
||||
Configuration = oic.Configuration,
|
||||
Filters = oic.Filters,
|
||||
IntegrationConfiguration = oi.Configuration,
|
||||
Template = oic.Template
|
||||
};
|
||||
return query;
|
||||
}
|
||||
}
|
@ -618,7 +618,7 @@ public static class ServiceCollectionExtensions
|
||||
integrationType,
|
||||
provider.GetRequiredService<IEventIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IIntegrationFilterService>(),
|
||||
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
|
||||
provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
|
||||
provider.GetRequiredService<IUserRepository>(),
|
||||
provider.GetRequiredService<IOrganizationRepository>(),
|
||||
provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()));
|
||||
@ -652,6 +652,10 @@ public static class ServiceCollectionExtensions
|
||||
!CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
|
||||
return services;
|
||||
|
||||
services.AddSingleton<IntegrationConfigurationDetailsCacheService>();
|
||||
services.AddSingleton<IIntegrationConfigurationDetailsCache>(provider =>
|
||||
provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
|
||||
services.AddHostedService(provider => provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
|
||||
services.AddSingleton<IIntegrationFilterService, IntegrationFilterService>();
|
||||
services.AddSingleton<IAzureServiceBusService, AzureServiceBusService>();
|
||||
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
|
||||
@ -664,6 +668,7 @@ public static class ServiceCollectionExtensions
|
||||
integrationType: IntegrationType.Slack,
|
||||
globalSettings: globalSettings);
|
||||
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
|
||||
services.AddAzureServiceBusIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
|
||||
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.WebhookEventSubscriptionName,
|
||||
@ -711,7 +716,7 @@ public static class ServiceCollectionExtensions
|
||||
integrationType,
|
||||
provider.GetRequiredService<IEventIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IIntegrationFilterService>(),
|
||||
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
|
||||
provider.GetRequiredService<IIntegrationConfigurationDetailsCache>(),
|
||||
provider.GetRequiredService<IUserRepository>(),
|
||||
provider.GetRequiredService<IOrganizationRepository>(),
|
||||
provider.GetRequiredService<ILogger<EventIntegrationHandler<TConfig>>>()));
|
||||
@ -745,6 +750,10 @@ public static class ServiceCollectionExtensions
|
||||
return services;
|
||||
}
|
||||
|
||||
services.AddSingleton<IntegrationConfigurationDetailsCacheService>();
|
||||
services.AddSingleton<IIntegrationConfigurationDetailsCache>(provider =>
|
||||
provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
|
||||
services.AddHostedService(provider => provider.GetRequiredService<IntegrationConfigurationDetailsCacheService>());
|
||||
services.AddSingleton<IIntegrationFilterService, IntegrationFilterService>();
|
||||
services.AddSingleton<IRabbitMqService, RabbitMqService>();
|
||||
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
|
||||
|
@ -0,0 +1,11 @@
|
||||
CREATE PROCEDURE [dbo].[OrganizationIntegrationConfigurationDetails_ReadMany]
|
||||
AS
|
||||
BEGIN
|
||||
SET NOCOUNT ON
|
||||
|
||||
SELECT
|
||||
oic.*
|
||||
FROM
|
||||
[dbo].[OrganizationIntegrationConfigurationDetailsView] oic
|
||||
END
|
||||
GO
|
@ -32,12 +32,12 @@ public class EventIntegrationHandlerTests
|
||||
private SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>> GetSutProvider(
|
||||
List<OrganizationIntegrationConfigurationDetails> configurations)
|
||||
{
|
||||
var configurationRepository = Substitute.For<IOrganizationIntegrationConfigurationRepository>();
|
||||
configurationRepository.GetConfigurationDetailsAsync(Arg.Any<Guid>(),
|
||||
var configurationCache = Substitute.For<IIntegrationConfigurationDetailsCache>();
|
||||
configurationCache.GetConfigurationDetails(Arg.Any<Guid>(),
|
||||
IntegrationType.Webhook, Arg.Any<EventType>()).Returns(configurations);
|
||||
|
||||
return new SutProvider<EventIntegrationHandler<WebhookIntegrationConfigurationDetails>>()
|
||||
.SetDependency(configurationRepository)
|
||||
.SetDependency(configurationCache)
|
||||
.SetDependency(_eventIntegrationPublisher)
|
||||
.SetDependency(IntegrationType.Webhook)
|
||||
.SetDependency(_logger)
|
||||
|
@ -0,0 +1,133 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
using Bit.Core.Repositories;
|
||||
using Bit.Core.Services;
|
||||
using Bit.Test.Common.AutoFixture;
|
||||
using Bit.Test.Common.AutoFixture.Attributes;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NSubstitute;
|
||||
using NSubstitute.ExceptionExtensions;
|
||||
using Xunit;
|
||||
|
||||
namespace Bit.Core.Test.Services;
|
||||
|
||||
[SutProviderCustomize]
|
||||
public class IntegrationConfigurationDetailsCacheServiceTests
|
||||
{
|
||||
private SutProvider<IntegrationConfigurationDetailsCacheService> GetSutProvider(
|
||||
List<OrganizationIntegrationConfigurationDetails> configurations)
|
||||
{
|
||||
var configurationRepository = Substitute.For<IOrganizationIntegrationConfigurationRepository>();
|
||||
configurationRepository.GetAllConfigurationDetailsAsync().Returns(configurations);
|
||||
|
||||
return new SutProvider<IntegrationConfigurationDetailsCacheService>()
|
||||
.SetDependency(configurationRepository)
|
||||
.Create();
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task GetConfigurationDetails_KeyExists_ReturnsExpectedList(OrganizationIntegrationConfigurationDetails config)
|
||||
{
|
||||
var sutProvider = GetSutProvider([config]);
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
var result = sutProvider.Sut.GetConfigurationDetails(
|
||||
config.OrganizationId,
|
||||
config.IntegrationType,
|
||||
config.EventType);
|
||||
Assert.Single(result);
|
||||
Assert.Same(config, result[0]);
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task GetConfigurationDetails_KeyMissing_ReturnsEmptyList(OrganizationIntegrationConfigurationDetails config)
|
||||
{
|
||||
var sutProvider = GetSutProvider([config]);
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
var result = sutProvider.Sut.GetConfigurationDetails(
|
||||
Guid.NewGuid(),
|
||||
config.IntegrationType,
|
||||
config.EventType);
|
||||
Assert.Empty(result);
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task GetConfigurationDetails_ReturnsCachedValue_EvenIfRepositoryChanges(OrganizationIntegrationConfigurationDetails config)
|
||||
{
|
||||
var sutProvider = GetSutProvider([config]);
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
|
||||
var newConfig = JsonSerializer.Deserialize<OrganizationIntegrationConfigurationDetails>(JsonSerializer.Serialize(config));
|
||||
Assert.NotNull(newConfig);
|
||||
newConfig.Template = "Changed";
|
||||
sutProvider.GetDependency<IOrganizationIntegrationConfigurationRepository>().GetAllConfigurationDetailsAsync()
|
||||
.Returns([newConfig]);
|
||||
|
||||
var result = sutProvider.Sut.GetConfigurationDetails(
|
||||
config.OrganizationId,
|
||||
config.IntegrationType,
|
||||
config.EventType);
|
||||
Assert.Single(result);
|
||||
Assert.NotEqual("Changed", result[0].Template); // should not yet pick up change from repository
|
||||
|
||||
await sutProvider.Sut.RefreshAsync(); // Pick up changes
|
||||
|
||||
result = sutProvider.Sut.GetConfigurationDetails(
|
||||
config.OrganizationId,
|
||||
config.IntegrationType,
|
||||
config.EventType);
|
||||
Assert.Single(result);
|
||||
Assert.Equal("Changed", result[0].Template); // Should have the new value
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task RefreshAsync_GroupsByCompositeKey(OrganizationIntegrationConfigurationDetails config1)
|
||||
{
|
||||
var config2 = JsonSerializer.Deserialize<OrganizationIntegrationConfigurationDetails>(
|
||||
JsonSerializer.Serialize(config1))!;
|
||||
config2.Template = "Another";
|
||||
|
||||
var sutProvider = GetSutProvider([config1, config2]);
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
|
||||
var results = sutProvider.Sut.GetConfigurationDetails(
|
||||
config1.OrganizationId,
|
||||
config1.IntegrationType,
|
||||
config1.EventType);
|
||||
|
||||
Assert.Equal(2, results.Count);
|
||||
Assert.Contains(results, r => r.Template == config1.Template);
|
||||
Assert.Contains(results, r => r.Template == config2.Template);
|
||||
}
|
||||
|
||||
[Theory, BitAutoData]
|
||||
public async Task RefreshAsync_LogsInformationOnSuccess(OrganizationIntegrationConfigurationDetails config)
|
||||
{
|
||||
var sutProvider = GetSutProvider([config]);
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
|
||||
sutProvider.GetDependency<ILogger<IntegrationConfigurationDetailsCacheService>>().Received().Log(
|
||||
LogLevel.Information,
|
||||
Arg.Any<EventId>(),
|
||||
Arg.Is<object>(o => o.ToString()!.Contains("Refreshed successfully")),
|
||||
null,
|
||||
Arg.Any<Func<object, Exception?, string>>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RefreshAsync_OnException_LogsError()
|
||||
{
|
||||
var sutProvider = GetSutProvider([]);
|
||||
sutProvider.GetDependency<IOrganizationIntegrationConfigurationRepository>().GetAllConfigurationDetailsAsync()
|
||||
.Throws(new Exception("Database failure"));
|
||||
await sutProvider.Sut.RefreshAsync();
|
||||
|
||||
sutProvider.GetDependency<ILogger<IntegrationConfigurationDetailsCacheService>>().Received(1).Log(
|
||||
LogLevel.Error,
|
||||
Arg.Any<EventId>(),
|
||||
Arg.Is<object>(o => o.ToString()!.Contains("Refresh failed")),
|
||||
Arg.Any<Exception>(),
|
||||
Arg.Any<Func<object, Exception?, string>>());
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
CREATE OR ALTER PROCEDURE [dbo].[OrganizationIntegrationConfigurationDetails_ReadMany]
|
||||
AS
|
||||
BEGIN
|
||||
SET NOCOUNT ON
|
||||
|
||||
SELECT
|
||||
oic.*
|
||||
FROM
|
||||
[dbo].[OrganizationIntegrationConfigurationDetailsView] oic
|
||||
END
|
||||
GO
|
Reference in New Issue
Block a user