mirror of
https://github.com/bitwarden/server.git
synced 2025-06-20 02:48:03 -05:00
Merge branch 'km/db-signing-keys' into km/signing-api-changes
This commit is contained in:
commit
7900bcbfb3
14
.github/workflows/build.yml
vendored
14
.github/workflows/build.yml
vendored
@ -11,6 +11,9 @@ on:
|
||||
types: [opened, synchronize]
|
||||
workflow_call:
|
||||
inputs: {}
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
_AZ_REGISTRY: "bitwardenprod.azurecr.io"
|
||||
@ -237,18 +240,10 @@ jobs:
|
||||
fi
|
||||
echo "tags=$TAGS" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Generate image full name
|
||||
id: cache-name
|
||||
env:
|
||||
PROJECT_NAME: ${{ steps.setup.outputs.project_name }}
|
||||
run: echo "name=${_AZ_REGISTRY}/${PROJECT_NAME}:buildcache" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Build Docker image
|
||||
id: build-artifacts
|
||||
uses: docker/build-push-action@67a2d409c0a876cbe6b11854e3e25193efe4e62d # v6.12.0
|
||||
with:
|
||||
cache-from: type=registry,ref=${{ steps.cache-name.outputs.name }}
|
||||
cache-to: type=registry,ref=${{ steps.cache-name.outputs.name}},mode=max
|
||||
context: .
|
||||
file: ${{ matrix.base_path }}/${{ matrix.project_name }}/Dockerfile
|
||||
platforms: |
|
||||
@ -603,8 +598,9 @@ jobs:
|
||||
uses: bitwarden/gh-actions/.github/workflows/_ephemeral_environment_manager.yml@main
|
||||
with:
|
||||
project: server
|
||||
pull_request_number: ${{ github.event.number }}
|
||||
pull_request_number: ${{ github.event.number || 0 }}
|
||||
secrets: inherit
|
||||
permissions: read-all
|
||||
|
||||
check-failures:
|
||||
name: Check for failures
|
||||
|
@ -99,7 +99,7 @@ services:
|
||||
- idp
|
||||
|
||||
rabbitmq:
|
||||
image: rabbitmq:management
|
||||
image: rabbitmq:4.1.0-management
|
||||
container_name: rabbitmq
|
||||
ports:
|
||||
- "5672:5672"
|
||||
@ -108,7 +108,7 @@ services:
|
||||
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
|
||||
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
|
||||
volumes:
|
||||
- rabbitmq_data:/var/lib/rabbitmq_data
|
||||
- rabbitmq_data:/var/lib/rabbitmq
|
||||
profiles:
|
||||
- rabbitmq
|
||||
|
||||
|
@ -11,7 +11,7 @@ $corsRules = (@{
|
||||
AllowedMethods = @("Get", "PUT");
|
||||
});
|
||||
$containers = "attachments", "sendfiles", "misc";
|
||||
$queues = "event", "notifications", "reference-events", "mail";
|
||||
$queues = "event", "notifications", "mail";
|
||||
$tables = "event", "metadata", "installationdevice";
|
||||
# End configuration
|
||||
|
||||
|
@ -5,6 +5,6 @@
|
||||
},
|
||||
"msbuild-sdks": {
|
||||
"Microsoft.Build.Traversal": "4.1.0",
|
||||
"Microsoft.Build.Sql": "0.1.9-preview"
|
||||
"Microsoft.Build.Sql": "1.0.0"
|
||||
}
|
||||
}
|
||||
|
@ -81,13 +81,6 @@ public class ProviderBillingController(
|
||||
[FromRoute] Guid providerId,
|
||||
[FromBody] UpdatePaymentMethodRequestBody requestBody)
|
||||
{
|
||||
var allowProviderPaymentMethod = featureService.IsEnabled(FeatureFlagKeys.PM18794_ProviderPaymentMethod);
|
||||
|
||||
if (!allowProviderPaymentMethod)
|
||||
{
|
||||
return TypedResults.NotFound();
|
||||
}
|
||||
|
||||
var (provider, result) = await TryGetBillableProviderForAdminOperation(providerId);
|
||||
|
||||
if (provider == null)
|
||||
@ -111,13 +104,6 @@ public class ProviderBillingController(
|
||||
[FromRoute] Guid providerId,
|
||||
[FromBody] VerifyBankAccountRequestBody requestBody)
|
||||
{
|
||||
var allowProviderPaymentMethod = featureService.IsEnabled(FeatureFlagKeys.PM18794_ProviderPaymentMethod);
|
||||
|
||||
if (!allowProviderPaymentMethod)
|
||||
{
|
||||
return TypedResults.NotFound();
|
||||
}
|
||||
|
||||
var (provider, result) = await TryGetBillableProviderForAdminOperation(providerId);
|
||||
|
||||
if (provider == null)
|
||||
|
@ -1064,7 +1064,7 @@ public class CiphersController : Controller
|
||||
|
||||
[HttpPut("share")]
|
||||
[HttpPost("share")]
|
||||
public async Task<CipherMiniResponseModel[]> PutShareMany([FromBody] CipherBulkShareRequestModel model)
|
||||
public async Task<ListResponseModel<CipherMiniResponseModel>> PutShareMany([FromBody] CipherBulkShareRequestModel model)
|
||||
{
|
||||
var organizationId = new Guid(model.Ciphers.First().OrganizationId);
|
||||
if (!await _currentContext.OrganizationUser(organizationId))
|
||||
@ -1086,7 +1086,7 @@ public class CiphersController : Controller
|
||||
}
|
||||
}
|
||||
|
||||
var shareCiphers = new List<(Cipher, DateTime?)>();
|
||||
var shareCiphers = new List<(CipherDetails, DateTime?)>();
|
||||
foreach (var cipher in model.Ciphers)
|
||||
{
|
||||
if (!ciphersDict.TryGetValue(cipher.Id.Value, out var existingCipher))
|
||||
@ -1096,7 +1096,7 @@ public class CiphersController : Controller
|
||||
|
||||
ValidateClientVersionForFido2CredentialSupport(existingCipher);
|
||||
|
||||
shareCiphers.Add(((Cipher)existingCipher, cipher.LastKnownRevisionDate));
|
||||
shareCiphers.Add((cipher.ToCipherDetails(existingCipher), cipher.LastKnownRevisionDate));
|
||||
}
|
||||
|
||||
var updated = await _cipherService.ShareManyAsync(
|
||||
@ -1106,7 +1106,8 @@ public class CiphersController : Controller
|
||||
userId
|
||||
);
|
||||
|
||||
return updated.Select(c => new CipherMiniResponseModel(c, _globalSettings, false)).ToArray();
|
||||
var response = updated.Select(c => new CipherMiniResponseModel(c, _globalSettings, c.OrganizationUseTotp));
|
||||
return new ListResponseModel<CipherMiniResponseModel>(response);
|
||||
}
|
||||
|
||||
[HttpPost("purge")]
|
||||
|
@ -1,9 +1,7 @@
|
||||
using Bit.Api.Models.Response;
|
||||
using Bit.Api.Vault.Models.Request;
|
||||
using Bit.Api.Vault.Models.Response;
|
||||
using Bit.Core;
|
||||
using Bit.Core.Services;
|
||||
using Bit.Core.Utilities;
|
||||
using Bit.Core.Vault.Commands.Interfaces;
|
||||
using Bit.Core.Vault.Entities;
|
||||
using Bit.Core.Vault.Enums;
|
||||
@ -15,7 +13,6 @@ namespace Bit.Api.Vault.Controllers;
|
||||
|
||||
[Route("tasks")]
|
||||
[Authorize("Application")]
|
||||
[RequireFeature(FeatureFlagKeys.SecurityTasks)]
|
||||
public class SecurityTaskController : Controller
|
||||
{
|
||||
private readonly IUserService _userService;
|
||||
|
@ -17,6 +17,7 @@ public enum PolicyType : byte
|
||||
AutomaticAppLogIn = 12,
|
||||
FreeFamiliesSponsorshipPolicy = 13,
|
||||
RemoveUnlockWithPin = 14,
|
||||
RestrictedItemTypesPolicy = 15,
|
||||
}
|
||||
|
||||
public static class PolicyTypeExtensions
|
||||
@ -43,7 +44,8 @@ public static class PolicyTypeExtensions
|
||||
PolicyType.ActivateAutofill => "Active auto-fill",
|
||||
PolicyType.AutomaticAppLogIn => "Automatically log in users for allowed applications",
|
||||
PolicyType.FreeFamiliesSponsorshipPolicy => "Remove Free Bitwarden Families sponsorship",
|
||||
PolicyType.RemoveUnlockWithPin => "Remove unlock with PIN"
|
||||
PolicyType.RemoveUnlockWithPin => "Remove unlock with PIN",
|
||||
PolicyType.RestrictedItemTypesPolicy => "Restricted item types",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,15 @@
|
||||
using Bit.Core.Enums;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Enums;
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public interface IIntegrationMessage
|
||||
{
|
||||
IntegrationType IntegrationType { get; }
|
||||
int RetryCount { get; set; }
|
||||
DateTime? DelayUntilDate { get; set; }
|
||||
string MessageId { get; set; }
|
||||
int RetryCount { get; }
|
||||
DateTime? DelayUntilDate { get; }
|
||||
void ApplyRetry(DateTime? handlerDelayUntilDate);
|
||||
string ToJson();
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public class IntegrationHandlerResult
|
||||
{
|
||||
|
@ -1,13 +1,15 @@
|
||||
using System.Text.Json;
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Enums;
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public class IntegrationMessage<T> : IIntegrationMessage
|
||||
public class IntegrationMessage : IIntegrationMessage
|
||||
{
|
||||
public IntegrationType IntegrationType { get; set; }
|
||||
public T Configuration { get; set; }
|
||||
public string RenderedTemplate { get; set; }
|
||||
public required string MessageId { get; set; }
|
||||
public required string RenderedTemplate { get; set; }
|
||||
public int RetryCount { get; set; } = 0;
|
||||
public DateTime? DelayUntilDate { get; set; }
|
||||
|
||||
@ -22,12 +24,22 @@ public class IntegrationMessage<T> : IIntegrationMessage
|
||||
DelayUntilDate = baseTime.AddSeconds(backoffSeconds + jitterSeconds);
|
||||
}
|
||||
|
||||
public string ToJson()
|
||||
public virtual string ToJson()
|
||||
{
|
||||
return JsonSerializer.Serialize(this);
|
||||
}
|
||||
}
|
||||
|
||||
public class IntegrationMessage<T> : IntegrationMessage
|
||||
{
|
||||
public required T Configuration { get; set; }
|
||||
|
||||
public override string ToJson()
|
||||
{
|
||||
return JsonSerializer.Serialize(this);
|
||||
}
|
||||
|
||||
public static IntegrationMessage<T> FromJson(string json)
|
||||
public static IntegrationMessage<T>? FromJson(string json)
|
||||
{
|
||||
return JsonSerializer.Deserialize<IntegrationMessage<T>>(json);
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public record SlackIntegration(string token);
|
||||
|
@ -1,3 +1,5 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public record SlackIntegrationConfiguration(string channelId);
|
||||
|
@ -1,3 +1,5 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public record SlackIntegrationConfigurationDetails(string channelId, string token);
|
||||
|
@ -1,3 +1,5 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public record WebhookIntegrationConfiguration(string url);
|
||||
|
@ -1,3 +1,5 @@
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
public record WebhookIntegrationConfigurationDetails(string url);
|
||||
|
@ -1,4 +1,5 @@
|
||||
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace Bit.Core.Models.Slack;
|
||||
|
@ -1,13 +1,87 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public abstract class EventLoggingListenerService : BackgroundService
|
||||
{
|
||||
protected readonly IEventMessageHandler _handler;
|
||||
protected ILogger<EventLoggingListenerService> _logger;
|
||||
|
||||
protected EventLoggingListenerService(IEventMessageHandler handler)
|
||||
protected EventLoggingListenerService(IEventMessageHandler handler, ILogger<EventLoggingListenerService> logger)
|
||||
{
|
||||
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
|
||||
_handler = handler;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
internal async Task ProcessReceivedMessageAsync(string body, string? messageId)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var jsonDocument = JsonDocument.Parse(body);
|
||||
var root = jsonDocument.RootElement;
|
||||
|
||||
if (root.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
|
||||
await _handler.HandleManyEventsAsync(eventMessages);
|
||||
}
|
||||
else if (root.ValueKind == JsonValueKind.Object)
|
||||
{
|
||||
var eventMessage = root.Deserialize<EventMessage>();
|
||||
await _handler.HandleEventAsync(eventMessage);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!string.IsNullOrEmpty(messageId))
|
||||
{
|
||||
_logger.LogError("An error occurred while processing message: {MessageId} - Invalid JSON", messageId);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogError("An Invalid JSON error occurred while processing a message with an empty message id");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (JsonException exception)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(messageId))
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An error occurred while processing message: {MessageId} - Invalid JSON",
|
||||
messageId
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An Invalid JSON error occurred while processing a message with an empty message id"
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(messageId))
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An error occurred while processing message: {MessageId}",
|
||||
messageId
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An error occurred while processing a message with an empty message id"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
10
src/Core/AdminConsole/Services/IAzureServiceBusService.cs
Normal file
10
src/Core/AdminConsole/Services/IAzureServiceBusService.cs
Normal file
@ -0,0 +1,10 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public interface IAzureServiceBusService : IEventIntegrationPublisher, IAsyncDisposable
|
||||
{
|
||||
ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options);
|
||||
Task PublishToRetryAsync(IIntegrationMessage message);
|
||||
}
|
@ -2,7 +2,8 @@
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public interface IIntegrationPublisher
|
||||
public interface IEventIntegrationPublisher : IAsyncDisposable
|
||||
{
|
||||
Task PublishAsync(IIntegrationMessage message);
|
||||
Task PublishEventAsync(string body);
|
||||
}
|
19
src/Core/AdminConsole/Services/IRabbitMqService.cs
Normal file
19
src/Core/AdminConsole/Services/IRabbitMqService.cs
Normal file
@ -0,0 +1,19 @@
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public interface IRabbitMqService : IEventIntegrationPublisher
|
||||
{
|
||||
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
|
||||
Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default);
|
||||
Task CreateIntegrationQueuesAsync(
|
||||
string queueName,
|
||||
string retryQueueName,
|
||||
string routingKey,
|
||||
CancellationToken cancellationToken = default);
|
||||
Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken);
|
||||
Task PublishToDeadLetterAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken);
|
||||
Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs);
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
@ -9,67 +9,47 @@ namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private readonly ILogger<AzureServiceBusEventListenerService> _logger;
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
|
||||
public AzureServiceBusEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
ILogger<AzureServiceBusEventListenerService> logger,
|
||||
IAzureServiceBusService serviceBusService,
|
||||
string subscriptionName,
|
||||
GlobalSettings globalSettings,
|
||||
string subscriptionName) : base(handler)
|
||||
ILogger<AzureServiceBusEventListenerService> logger) : base(handler, logger)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.EventTopicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
_processor = serviceBusService.CreateProcessor(
|
||||
globalSettings.EventLogging.AzureServiceBus.EventTopicName,
|
||||
subscriptionName,
|
||||
new ServiceBusProcessorOptions());
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += async args =>
|
||||
{
|
||||
try
|
||||
{
|
||||
using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(args.Message.Body));
|
||||
var root = jsonDocument.RootElement;
|
||||
|
||||
if (root.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
|
||||
await _handler.HandleManyEventsAsync(eventMessages);
|
||||
}
|
||||
else if (root.ValueKind == JsonValueKind.Object)
|
||||
{
|
||||
var eventMessage = root.Deserialize<EventMessage>();
|
||||
await _handler.HandleEventAsync(eventMessage);
|
||||
|
||||
}
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"An error occured while processing message: {MessageId}",
|
||||
args.Message.MessageId
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
_processor.ProcessErrorAsync += args =>
|
||||
{
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
_processor.ProcessMessageAsync += ProcessReceivedMessageAsync;
|
||||
_processor.ProcessErrorAsync += ProcessErrorAsync;
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
|
||||
internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
|
||||
{
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task ProcessReceivedMessageAsync(ProcessMessageEventArgs args)
|
||||
{
|
||||
await ProcessReceivedMessageAsync(Encoding.UTF8.GetString(args.Message.Body), args.Message.MessageId);
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
@ -79,7 +59,6 @@ public class AzureServiceBusEventListenerService : EventLoggingListenerService
|
||||
public override void Dispose()
|
||||
{
|
||||
_processor.DisposeAsync().GetAwaiter().GetResult();
|
||||
_client.DisposeAsync().GetAwaiter().GetResult();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
|
@ -1,45 +0,0 @@
|
||||
using System.Text.Json;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Services;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.AdminConsole.Services.Implementations;
|
||||
|
||||
public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDisposable
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _sender;
|
||||
|
||||
public AzureServiceBusEventWriteService(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(e))
|
||||
{
|
||||
ContentType = "application/json"
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(events))
|
||||
{
|
||||
ContentType = "application/json"
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
#nullable enable
|
||||
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
@ -10,39 +9,30 @@ namespace Bit.Core.Services;
|
||||
public class AzureServiceBusIntegrationListenerService : BackgroundService
|
||||
{
|
||||
private readonly int _maxRetries;
|
||||
private readonly string _subscriptionName;
|
||||
private readonly string _topicName;
|
||||
private readonly IAzureServiceBusService _serviceBusService;
|
||||
private readonly IIntegrationHandler _handler;
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
private readonly ServiceBusSender _sender;
|
||||
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger;
|
||||
|
||||
public AzureServiceBusIntegrationListenerService(
|
||||
IIntegrationHandler handler,
|
||||
public AzureServiceBusIntegrationListenerService(IIntegrationHandler handler,
|
||||
string topicName,
|
||||
string subscriptionName,
|
||||
GlobalSettings globalSettings,
|
||||
int maxRetries,
|
||||
IAzureServiceBusService serviceBusService,
|
||||
ILogger<AzureServiceBusIntegrationListenerService> logger)
|
||||
{
|
||||
_handler = handler;
|
||||
_logger = logger;
|
||||
_maxRetries = globalSettings.EventLogging.AzureServiceBus.MaxRetries;
|
||||
_topicName = globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName;
|
||||
_subscriptionName = subscriptionName;
|
||||
_maxRetries = maxRetries;
|
||||
_serviceBusService = serviceBusService;
|
||||
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_processor = _client.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions());
|
||||
_sender = _client.CreateSender(_topicName);
|
||||
_processor = _serviceBusService.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += HandleMessageAsync;
|
||||
_processor.ProcessErrorAsync += args =>
|
||||
{
|
||||
_logger.LogError(args.Exception, "Azure Service Bus error");
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
_processor.ProcessErrorAsync += ProcessErrorAsync;
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
@ -51,51 +41,67 @@ public class AzureServiceBusIntegrationListenerService : BackgroundService
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
await _processor.DisposeAsync();
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
|
||||
internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
|
||||
{
|
||||
var json = args.Message.Body.ToString();
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
internal async Task<bool> HandleMessageAsync(string body)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await _handler.HandleAsync(json);
|
||||
var result = await _handler.HandleAsync(body);
|
||||
var message = result.Message;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
return;
|
||||
// Successful integration. Return true to indicate the message has been handled
|
||||
return true;
|
||||
}
|
||||
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (result.Retryable && message.RetryCount < _maxRetries)
|
||||
{
|
||||
var scheduledTime = (DateTime)message.DelayUntilDate!;
|
||||
var retryMsg = new ServiceBusMessage(message.ToJson())
|
||||
{
|
||||
Subject = args.Message.Subject,
|
||||
ScheduledEnqueueTime = scheduledTime
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(retryMsg);
|
||||
// Publish message to the retry queue. It will be re-published for retry after a delay
|
||||
// Return true to indicate the message has been handled
|
||||
await _serviceBusService.PublishToRetryAsync(message);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable");
|
||||
return;
|
||||
// Non-recoverable failure or exceeded the max number of retries
|
||||
// Return false to indicate this message should be dead-lettered
|
||||
return false;
|
||||
}
|
||||
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Unknown exception - log error, return true so the message will be acknowledged and not resent
|
||||
_logger.LogError(ex, "Unhandled error processing ASB message");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
|
||||
{
|
||||
var json = args.Message.Body.ToString();
|
||||
if (await HandleMessageAsync(json))
|
||||
{
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
else
|
||||
{
|
||||
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,36 +0,0 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _sender;
|
||||
|
||||
public AzureServiceBusIntegrationPublisher(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusService : IAzureServiceBusService
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _eventSender;
|
||||
private readonly ServiceBusSender _integrationSender;
|
||||
|
||||
public AzureServiceBusService(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_eventSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
|
||||
_integrationSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
|
||||
}
|
||||
|
||||
public ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options)
|
||||
{
|
||||
return _client.CreateProcessor(topicName, subscriptionName, options);
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
MessageId = message.MessageId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async Task PublishToRetryAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow,
|
||||
MessageId = message.MessageId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
{
|
||||
var message = new ServiceBusMessage(body)
|
||||
{
|
||||
ContentType = "application/json",
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
};
|
||||
|
||||
await _eventSender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _eventSender.DisposeAsync();
|
||||
await _integrationSender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
using Bit.Core.Models.Data;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
@ -0,0 +1,32 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDisposable
|
||||
{
|
||||
private readonly IEventIntegrationPublisher _eventIntegrationPublisher;
|
||||
|
||||
public EventIntegrationEventWriteService(IEventIntegrationPublisher eventIntegrationPublisher)
|
||||
{
|
||||
_eventIntegrationPublisher = eventIntegrationPublisher;
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(e);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(events);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _eventIntegrationPublisher.DisposeAsync();
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
using System.Text.Json;
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.AdminConsole.Utilities;
|
||||
using Bit.Core.Enums;
|
||||
@ -7,11 +9,9 @@ using Bit.Core.Repositories;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class EventIntegrationHandler<T>(
|
||||
IntegrationType integrationType,
|
||||
IIntegrationPublisher integrationPublisher,
|
||||
IEventIntegrationPublisher eventIntegrationPublisher,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository,
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository)
|
||||
@ -34,6 +34,7 @@ public class EventIntegrationHandler<T>(
|
||||
var template = configuration.Template ?? string.Empty;
|
||||
var context = await BuildContextAsync(eventMessage, template);
|
||||
var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context);
|
||||
var messageId = eventMessage.IdempotencyId ?? Guid.NewGuid();
|
||||
|
||||
var config = configuration.MergedConfiguration.Deserialize<T>()
|
||||
?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}");
|
||||
@ -41,13 +42,14 @@ public class EventIntegrationHandler<T>(
|
||||
var message = new IntegrationMessage<T>
|
||||
{
|
||||
IntegrationType = integrationType,
|
||||
MessageId = messageId.ToString(),
|
||||
Configuration = config,
|
||||
RenderedTemplate = renderedTemplate,
|
||||
RetryCount = 0,
|
||||
DelayUntilDate = null
|
||||
};
|
||||
|
||||
await integrationPublisher.PublishAsync(message);
|
||||
await eventIntegrationPublisher.PublishAsync(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,6 @@
|
||||
using Bit.Core.Models.Data;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
@ -1,4 +1,6 @@
|
||||
using Bit.Core.Models.Data;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
@ -1,7 +1,6 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Settings;
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
@ -10,94 +9,60 @@ namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private IChannel _channel;
|
||||
private IConnection _connection;
|
||||
private readonly string _exchangeName;
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly ILogger<RabbitMqEventListenerService> _logger;
|
||||
private readonly Lazy<Task<IChannel>> _lazyChannel;
|
||||
private readonly string _queueName;
|
||||
private readonly IRabbitMqService _rabbitMqService;
|
||||
|
||||
public RabbitMqEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
ILogger<RabbitMqEventListenerService> logger,
|
||||
GlobalSettings globalSettings,
|
||||
string queueName) : base(handler)
|
||||
string queueName,
|
||||
IRabbitMqService rabbitMqService,
|
||||
ILogger<RabbitMqEventListenerService> logger) : base(handler, logger)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
|
||||
_logger = logger;
|
||||
_queueName = queueName;
|
||||
_rabbitMqService = rabbitMqService;
|
||||
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_connection = await _factory.CreateConnectionAsync(cancellationToken);
|
||||
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
|
||||
await _channel.ExchangeDeclareAsync(exchange: _exchangeName,
|
||||
type: ExchangeType.Fanout,
|
||||
durable: true,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueDeclareAsync(queue: _queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueBindAsync(queue: _queueName,
|
||||
exchange: _exchangeName,
|
||||
routingKey: string.Empty,
|
||||
cancellationToken: cancellationToken);
|
||||
await _rabbitMqService.CreateEventQueueAsync(_queueName, cancellationToken);
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var consumer = new AsyncEventingBasicConsumer(_channel);
|
||||
consumer.ReceivedAsync += async (_, eventArgs) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
using var jsonDocument = JsonDocument.Parse(Encoding.UTF8.GetString(eventArgs.Body.Span));
|
||||
var root = jsonDocument.RootElement;
|
||||
var channel = await _lazyChannel.Value;
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, eventArgs) => { await ProcessReceivedMessageAsync(eventArgs); };
|
||||
|
||||
if (root.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
var eventMessages = root.Deserialize<IEnumerable<EventMessage>>();
|
||||
await _handler.HandleManyEventsAsync(eventMessages);
|
||||
}
|
||||
else if (root.ValueKind == JsonValueKind.Object)
|
||||
{
|
||||
var eventMessage = root.Deserialize<EventMessage>();
|
||||
await _handler.HandleEventAsync(eventMessage);
|
||||
await channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "An error occurred while processing the message");
|
||||
}
|
||||
};
|
||||
|
||||
await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
|
||||
internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs eventArgs)
|
||||
{
|
||||
await ProcessReceivedMessageAsync(
|
||||
Encoding.UTF8.GetString(eventArgs.Body.Span),
|
||||
eventArgs.BasicProperties.MessageId);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _channel.CloseAsync(cancellationToken);
|
||||
await _connection.CloseAsync(cancellationToken);
|
||||
if (_lazyChannel.IsValueCreated)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
await channel.CloseAsync(cancellationToken);
|
||||
}
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_channel.Dispose();
|
||||
_connection.Dispose();
|
||||
if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
|
||||
{
|
||||
_lazyChannel.Value.Result.Dispose();
|
||||
}
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
|
@ -1,62 +0,0 @@
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Settings;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
public class RabbitMqEventWriteService : IEventWriteService, IAsyncDisposable
|
||||
{
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly Lazy<Task<IConnection>> _lazyConnection;
|
||||
private readonly string _exchangeName;
|
||||
|
||||
public RabbitMqEventWriteService(GlobalSettings globalSettings)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_exchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
|
||||
|
||||
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
using var channel = await connection.CreateChannelAsync();
|
||||
|
||||
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
|
||||
|
||||
var body = JsonSerializer.SerializeToUtf8Bytes(e);
|
||||
|
||||
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
using var channel = await connection.CreateChannelAsync();
|
||||
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
|
||||
|
||||
var body = JsonSerializer.SerializeToUtf8Bytes(events);
|
||||
|
||||
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: string.Empty, body: body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_lazyConnection.IsValueCreated)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IConnection> CreateConnectionAsync()
|
||||
{
|
||||
return await _factory.CreateConnectionAsync();
|
||||
}
|
||||
}
|
@ -1,5 +1,8 @@
|
||||
using System.Text;
|
||||
using Bit.Core.Settings;
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
@ -9,183 +12,137 @@ namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqIntegrationListenerService : BackgroundService
|
||||
{
|
||||
private const string _deadLetterRoutingKey = "dead-letter";
|
||||
private IChannel _channel;
|
||||
private IConnection _connection;
|
||||
private readonly string _exchangeName;
|
||||
private readonly string _queueName;
|
||||
private readonly string _retryQueueName;
|
||||
private readonly string _deadLetterQueueName;
|
||||
private readonly string _routingKey;
|
||||
private readonly string _retryRoutingKey;
|
||||
private readonly int _maxRetries;
|
||||
private readonly string _queueName;
|
||||
private readonly string _routingKey;
|
||||
private readonly string _retryQueueName;
|
||||
private readonly IIntegrationHandler _handler;
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly Lazy<Task<IChannel>> _lazyChannel;
|
||||
private readonly IRabbitMqService _rabbitMqService;
|
||||
private readonly ILogger<RabbitMqIntegrationListenerService> _logger;
|
||||
private readonly int _retryTiming;
|
||||
|
||||
public RabbitMqIntegrationListenerService(IIntegrationHandler handler,
|
||||
string routingKey,
|
||||
string queueName,
|
||||
string retryQueueName,
|
||||
string deadLetterQueueName,
|
||||
GlobalSettings globalSettings,
|
||||
int maxRetries,
|
||||
IRabbitMqService rabbitMqService,
|
||||
ILogger<RabbitMqIntegrationListenerService> logger)
|
||||
{
|
||||
_handler = handler;
|
||||
_routingKey = routingKey;
|
||||
_retryRoutingKey = $"{_routingKey}-retry";
|
||||
_queueName = queueName;
|
||||
_retryQueueName = retryQueueName;
|
||||
_deadLetterQueueName = deadLetterQueueName;
|
||||
_queueName = queueName;
|
||||
_rabbitMqService = rabbitMqService;
|
||||
_logger = logger;
|
||||
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
|
||||
_maxRetries = globalSettings.EventLogging.RabbitMq.MaxRetries;
|
||||
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
|
||||
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_maxRetries = maxRetries;
|
||||
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_connection = await _factory.CreateConnectionAsync(cancellationToken);
|
||||
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
|
||||
await _channel.ExchangeDeclareAsync(exchange: _exchangeName,
|
||||
type: ExchangeType.Direct,
|
||||
durable: true,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
// Declare main queue
|
||||
await _channel.QueueDeclareAsync(queue: _queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueBindAsync(queue: _queueName,
|
||||
exchange: _exchangeName,
|
||||
routingKey: _routingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
|
||||
await _channel.QueueDeclareAsync(queue: _retryQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: new Dictionary<string, object>
|
||||
{
|
||||
{ "x-dead-letter-exchange", _exchangeName },
|
||||
{ "x-dead-letter-routing-key", _routingKey },
|
||||
{ "x-message-ttl", _retryTiming }
|
||||
},
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueBindAsync(queue: _retryQueueName,
|
||||
exchange: _exchangeName,
|
||||
routingKey: _retryRoutingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
// Declare dead letter queue
|
||||
await _channel.QueueDeclareAsync(queue: _deadLetterQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.QueueBindAsync(queue: _deadLetterQueueName,
|
||||
exchange: _exchangeName,
|
||||
routingKey: _deadLetterRoutingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
await _rabbitMqService.CreateIntegrationQueuesAsync(
|
||||
_queueName,
|
||||
_retryQueueName,
|
||||
_routingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var consumer = new AsyncEventingBasicConsumer(_channel);
|
||||
var channel = await _lazyChannel.Value;
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, ea) =>
|
||||
{
|
||||
await ProcessReceivedMessageAsync(ea, cancellationToken);
|
||||
};
|
||||
|
||||
await channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs ea, CancellationToken cancellationToken)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
try
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(ea.Body.Span);
|
||||
|
||||
try
|
||||
// Determine if the message came off of the retry queue too soon
|
||||
// If so, place it back on the retry queue
|
||||
var integrationMessage = JsonSerializer.Deserialize<IntegrationMessage>(json);
|
||||
if (integrationMessage is not null &&
|
||||
integrationMessage.DelayUntilDate.HasValue &&
|
||||
integrationMessage.DelayUntilDate.Value > DateTime.UtcNow)
|
||||
{
|
||||
var result = await _handler.HandleAsync(json);
|
||||
var message = result.Message;
|
||||
await _rabbitMqService.RepublishToRetryQueueAsync(channel, ea);
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.Success)
|
||||
var result = await _handler.HandleAsync(json);
|
||||
var message = result.Message;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
// Successful integration send. Acknowledge message delivery and return
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.Retryable)
|
||||
{
|
||||
// Integration failed, but is retryable - apply delay and check max retries
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (message.RetryCount < _maxRetries)
|
||||
{
|
||||
// Successful integration send. Acknowledge message delivery and return
|
||||
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.Retryable)
|
||||
{
|
||||
// Integration failed, but is retryable - apply delay and check max retries
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (message.RetryCount < _maxRetries)
|
||||
{
|
||||
// Publish message to the retry queue. It will be re-published for retry after a delay
|
||||
await _channel.BasicPublishAsync(
|
||||
exchange: _exchangeName,
|
||||
routingKey: _retryRoutingKey,
|
||||
body: Encoding.UTF8.GetBytes(message.ToJson()),
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Exceeded the max number of retries; fail and send to dead letter queue
|
||||
await PublishToDeadLetterAsync(message.ToJson());
|
||||
_logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
|
||||
}
|
||||
// Publish message to the retry queue. It will be re-published for retry after a delay
|
||||
await _rabbitMqService.PublishToRetryAsync(channel, message, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries
|
||||
await PublishToDeadLetterAsync(message.ToJson());
|
||||
_logger.LogWarning("Non-retryable failure. Sent to DLQ.");
|
||||
// Exceeded the max number of retries; fail and send to dead letter queue
|
||||
await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
|
||||
_logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
|
||||
}
|
||||
|
||||
// Message has been sent to retry or dead letter queues.
|
||||
// Acknowledge receipt so Rabbit knows it's been processed
|
||||
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
else
|
||||
{
|
||||
// Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error
|
||||
_logger.LogError(ex, "Unhandled error processing integration message.");
|
||||
await _channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
// Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries
|
||||
await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
|
||||
_logger.LogWarning("Non-retryable failure. Sent to DLQ.");
|
||||
}
|
||||
};
|
||||
|
||||
await _channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
private async Task PublishToDeadLetterAsync(string json)
|
||||
{
|
||||
await _channel.BasicPublishAsync(
|
||||
exchange: _exchangeName,
|
||||
routingKey: _deadLetterRoutingKey,
|
||||
body: Encoding.UTF8.GetBytes(json));
|
||||
// Message has been sent to retry or dead letter queues.
|
||||
// Acknowledge receipt so Rabbit knows it's been processed
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error
|
||||
_logger.LogError(ex, "Unhandled error processing integration message.");
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _channel.CloseAsync(cancellationToken);
|
||||
await _connection.CloseAsync(cancellationToken);
|
||||
if (_lazyChannel.IsValueCreated)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
await channel.CloseAsync(cancellationToken);
|
||||
}
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_channel.Dispose();
|
||||
_connection.Dispose();
|
||||
if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
|
||||
{
|
||||
_lazyChannel.Value.Result.Dispose();
|
||||
}
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
|
@ -1,54 +0,0 @@
|
||||
using System.Text;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
|
||||
{
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly Lazy<Task<IConnection>> _lazyConnection;
|
||||
private readonly string _exchangeName;
|
||||
|
||||
public RabbitMqIntegrationPublisher(GlobalSettings globalSettings)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_exchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
|
||||
|
||||
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var routingKey = message.IntegrationType.ToRoutingKey();
|
||||
var connection = await _lazyConnection.Value;
|
||||
await using var channel = await connection.CreateChannelAsync();
|
||||
|
||||
await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Direct, durable: true);
|
||||
|
||||
var body = Encoding.UTF8.GetBytes(message.ToJson());
|
||||
|
||||
await channel.BasicPublishAsync(exchange: _exchangeName, routingKey: routingKey, body: body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_lazyConnection.IsValueCreated)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IConnection> CreateConnectionAsync()
|
||||
{
|
||||
return await _factory.CreateConnectionAsync();
|
||||
}
|
||||
}
|
@ -0,0 +1,244 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqService : IRabbitMqService
|
||||
{
|
||||
private const string _deadLetterRoutingKey = "dead-letter";
|
||||
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly Lazy<Task<IConnection>> _lazyConnection;
|
||||
private readonly string _deadLetterQueueName;
|
||||
private readonly string _eventExchangeName;
|
||||
private readonly string _integrationExchangeName;
|
||||
private readonly int _retryTiming;
|
||||
private readonly bool _useDelayPlugin;
|
||||
|
||||
public RabbitMqService(GlobalSettings globalSettings)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_deadLetterQueueName = globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName;
|
||||
_eventExchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
|
||||
_integrationExchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
|
||||
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
|
||||
_useDelayPlugin = globalSettings.EventLogging.RabbitMq.UseDelayPlugin;
|
||||
|
||||
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
|
||||
}
|
||||
|
||||
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var channel = await CreateChannelAsync(cancellationToken);
|
||||
await channel.QueueDeclareAsync(queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(queue: queueName,
|
||||
exchange: _eventExchangeName,
|
||||
routingKey: string.Empty,
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task CreateIntegrationQueuesAsync(
|
||||
string queueName,
|
||||
string retryQueueName,
|
||||
string routingKey,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var channel = await CreateChannelAsync(cancellationToken);
|
||||
var retryRoutingKey = $"{routingKey}-retry";
|
||||
|
||||
// Declare main integration queue
|
||||
await channel.QueueDeclareAsync(
|
||||
queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(
|
||||
queue: queueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: routingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
if (!_useDelayPlugin)
|
||||
{
|
||||
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
|
||||
// Only needed if NOT using delay plugin
|
||||
await channel.QueueDeclareAsync(queue: retryQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: new Dictionary<string, object?>
|
||||
{
|
||||
{ "x-dead-letter-exchange", _integrationExchangeName },
|
||||
{ "x-dead-letter-routing-key", routingKey },
|
||||
{ "x-message-ttl", _retryTiming }
|
||||
},
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(queue: retryQueueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: retryRoutingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var routingKey = message.IntegrationType.ToRoutingKey();
|
||||
await using var channel = await CreateChannelAsync();
|
||||
|
||||
var body = Encoding.UTF8.GetBytes(message.ToJson());
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = message.MessageId,
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: routingKey,
|
||||
body: body);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
{
|
||||
await using var channel = await CreateChannelAsync();
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = Guid.NewGuid().ToString(),
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _eventExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: string.Empty,
|
||||
body: Encoding.UTF8.GetBytes(body));
|
||||
}
|
||||
|
||||
public async Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken)
|
||||
{
|
||||
var routingKey = message.IntegrationType.ToRoutingKey();
|
||||
var retryRoutingKey = $"{routingKey}-retry";
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
Persistent = true,
|
||||
MessageId = message.MessageId,
|
||||
Headers = _useDelayPlugin && message.DelayUntilDate.HasValue ?
|
||||
new Dictionary<string, object?>
|
||||
{
|
||||
["x-delay"] = Math.Max((int)(message.DelayUntilDate.Value - DateTime.UtcNow).TotalMilliseconds, 0)
|
||||
} :
|
||||
null
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: _useDelayPlugin ? routingKey : retryRoutingKey,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
body: Encoding.UTF8.GetBytes(message.ToJson()),
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task PublishToDeadLetterAsync(
|
||||
IChannel channel,
|
||||
IIntegrationMessage message,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = message.MessageId,
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: _deadLetterRoutingKey,
|
||||
body: Encoding.UTF8.GetBytes(message.ToJson()),
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs)
|
||||
{
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: eventArgs.RoutingKey,
|
||||
mandatory: true,
|
||||
basicProperties: new BasicProperties(eventArgs.BasicProperties),
|
||||
body: eventArgs.Body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_lazyConnection.IsValueCreated)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IConnection> CreateConnectionAsync()
|
||||
{
|
||||
var connection = await _factory.CreateConnectionAsync();
|
||||
using var channel = await connection.CreateChannelAsync();
|
||||
|
||||
// Declare Exchanges
|
||||
await channel.ExchangeDeclareAsync(exchange: _eventExchangeName, type: ExchangeType.Fanout, durable: true);
|
||||
if (_useDelayPlugin)
|
||||
{
|
||||
await channel.ExchangeDeclareAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
type: "x-delayed-message",
|
||||
durable: true,
|
||||
arguments: new Dictionary<string, object?>
|
||||
{
|
||||
{ "x-delayed-type", "direct" }
|
||||
}
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
await channel.ExchangeDeclareAsync(exchange: _integrationExchangeName, type: ExchangeType.Direct, durable: true);
|
||||
}
|
||||
|
||||
// Declare dead letter queue for Integration exchange
|
||||
await channel.QueueDeclareAsync(queue: _deadLetterQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null);
|
||||
await channel.QueueBindAsync(queue: _deadLetterQueueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: _deadLetterRoutingKey);
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
|
@ -1,4 +1,6 @@
|
||||
using System.Net.Http.Headers;
|
||||
#nullable enable
|
||||
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Http.Json;
|
||||
using System.Web;
|
||||
using Bit.Core.Models.Slack;
|
||||
@ -22,7 +24,7 @@ public class SlackService(
|
||||
|
||||
public async Task<string> GetChannelIdAsync(string token, string channelName)
|
||||
{
|
||||
return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault();
|
||||
return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault() ?? string.Empty;
|
||||
}
|
||||
|
||||
public async Task<List<string>> GetChannelIdsAsync(string token, List<string> channelNames)
|
||||
@ -58,7 +60,7 @@ public class SlackService(
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogError("Error getting Channel Ids: {Error}", result.Error);
|
||||
logger.LogError("Error getting Channel Ids: {Error}", result?.Error ?? "Unknown Error");
|
||||
nextCursor = string.Empty;
|
||||
}
|
||||
|
||||
@ -89,7 +91,7 @@ public class SlackService(
|
||||
new KeyValuePair<string, string>("redirect_uri", redirectUrl)
|
||||
}));
|
||||
|
||||
SlackOAuthResponse result;
|
||||
SlackOAuthResponse? result;
|
||||
try
|
||||
{
|
||||
result = await tokenResponse.Content.ReadFromJsonAsync<SlackOAuthResponse>();
|
||||
@ -99,7 +101,7 @@ public class SlackService(
|
||||
result = null;
|
||||
}
|
||||
|
||||
if (result == null)
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error obtaining token via OAuth: Unknown error");
|
||||
return string.Empty;
|
||||
@ -130,6 +132,11 @@ public class SlackService(
|
||||
var response = await _httpClient.SendAsync(request);
|
||||
var result = await response.Content.ReadFromJsonAsync<SlackUserResponse>();
|
||||
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error retrieving Slack user ID: Unknown error");
|
||||
return string.Empty;
|
||||
}
|
||||
if (!result.Ok)
|
||||
{
|
||||
logger.LogError("Error retrieving Slack user ID: {Error}", result.Error);
|
||||
@ -151,6 +158,11 @@ public class SlackService(
|
||||
var response = await _httpClient.SendAsync(request);
|
||||
var result = await response.Content.ReadFromJsonAsync<SlackDmResponse>();
|
||||
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error opening DM channel: Unknown error");
|
||||
return string.Empty;
|
||||
}
|
||||
if (!result.Ok)
|
||||
{
|
||||
logger.LogError("Error opening DM channel: {Error}", result.Error);
|
||||
|
@ -1,4 +1,6 @@
|
||||
using System.Globalization;
|
||||
#nullable enable
|
||||
|
||||
using System.Globalization;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
@ -29,7 +31,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
|
||||
case HttpStatusCode.ServiceUnavailable:
|
||||
case HttpStatusCode.GatewayTimeout:
|
||||
result.Retryable = true;
|
||||
result.FailureReason = response.ReasonPhrase;
|
||||
result.FailureReason = response.ReasonPhrase ?? $"Failure with status code: {(int)response.StatusCode}";
|
||||
|
||||
if (response.Headers.TryGetValues("Retry-After", out var values))
|
||||
{
|
||||
@ -52,7 +54,7 @@ public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
|
||||
break;
|
||||
default:
|
||||
result.Retryable = false;
|
||||
result.FailureReason = response.ReasonPhrase;
|
||||
result.FailureReason = response.ReasonPhrase ?? $"Failure with status code {(int)response.StatusCode}";
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -78,13 +78,14 @@ public class OrganizationBillingService(
|
||||
var isEligibleForSelfHost = await IsEligibleForSelfHostAsync(organization);
|
||||
|
||||
var isManaged = organization.Status == OrganizationStatusType.Managed;
|
||||
|
||||
var orgOccupiedSeats = await organizationUserRepository.GetOccupiedSeatCountByOrganizationIdAsync(organization.Id);
|
||||
if (string.IsNullOrWhiteSpace(organization.GatewaySubscriptionId))
|
||||
{
|
||||
return OrganizationMetadata.Default with
|
||||
{
|
||||
IsEligibleForSelfHost = isEligibleForSelfHost,
|
||||
IsManaged = isManaged
|
||||
IsManaged = isManaged,
|
||||
OrganizationOccupiedSeats = orgOccupiedSeats
|
||||
};
|
||||
}
|
||||
|
||||
@ -108,8 +109,6 @@ public class OrganizationBillingService(
|
||||
? await stripeAdapter.InvoiceGetAsync(subscription.LatestInvoiceId, new InvoiceGetOptions())
|
||||
: null;
|
||||
|
||||
var orgOccupiedSeats = await organizationUserRepository.GetOccupiedSeatCountByOrganizationIdAsync(organization.Id);
|
||||
|
||||
return new OrganizationMetadata(
|
||||
isEligibleForSelfHost,
|
||||
isManaged,
|
||||
@ -420,7 +419,7 @@ public class OrganizationBillingService(
|
||||
var setNonUSBusinessUseToReverseCharge =
|
||||
featureService.IsEnabled(FeatureFlagKeys.PM21092_SetNonUSBusinessUseToReverseCharge);
|
||||
|
||||
if (setNonUSBusinessUseToReverseCharge)
|
||||
if (setNonUSBusinessUseToReverseCharge && customer.HasBillingLocation())
|
||||
{
|
||||
subscriptionCreateOptions.AutomaticTax = new SubscriptionAutomaticTaxOptions { Enabled = true };
|
||||
}
|
||||
|
@ -145,7 +145,6 @@ public static class FeatureFlagKeys
|
||||
public const string PM17772_AdminInitiatedSponsorships = "pm-17772-admin-initiated-sponsorships";
|
||||
public const string UsePricingService = "use-pricing-service";
|
||||
public const string PM12276Breadcrumbing = "pm-12276-breadcrumbing-for-business-features";
|
||||
public const string PM18794_ProviderPaymentMethod = "pm-18794-provider-payment-method";
|
||||
public const string PM19422_AllowAutomaticTaxUpdates = "pm-19422-allow-automatic-tax-updates";
|
||||
public const string PM199566_UpdateMSPToChargeAutomatically = "pm-199566-update-msp-to-charge-automatically";
|
||||
public const string PM19956_RequireProviderPaymentMethodDuringSetup = "pm-19956-require-provider-payment-method-during-setup";
|
||||
|
@ -3,6 +3,8 @@ using Microsoft.AspNetCore.Mvc.ModelBinding;
|
||||
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class BadRequestException : Exception
|
||||
{
|
||||
public BadRequestException() : base()
|
||||
@ -41,5 +43,5 @@ public class BadRequestException : Exception
|
||||
}
|
||||
}
|
||||
|
||||
public ModelStateDictionary ModelState { get; set; }
|
||||
public ModelStateDictionary? ModelState { get; set; }
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class ConflictException : Exception
|
||||
{
|
||||
public ConflictException() : base("Conflict.") { }
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class DnsQueryException : Exception
|
||||
{
|
||||
public DnsQueryException(string message)
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class DomainClaimedException : Exception
|
||||
{
|
||||
public DomainClaimedException()
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class DomainVerifiedException : Exception
|
||||
{
|
||||
public DomainVerifiedException()
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class DuplicateDomainException : Exception
|
||||
{
|
||||
public DuplicateDomainException()
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
/// <summary>
|
||||
/// Exception to throw when a requested feature is not yet enabled/available for the requesting context.
|
||||
/// </summary>
|
||||
|
@ -1,8 +1,10 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class GatewayException : Exception
|
||||
{
|
||||
public GatewayException(string message, Exception innerException = null)
|
||||
public GatewayException(string message, Exception? innerException = null)
|
||||
: base(message, innerException)
|
||||
{ }
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class InvalidEmailException : Exception
|
||||
{
|
||||
public InvalidEmailException()
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class InvalidGatewayCustomerIdException : Exception
|
||||
{
|
||||
public InvalidGatewayCustomerIdException()
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.Exceptions;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class NotFoundException : Exception
|
||||
{
|
||||
public NotFoundException() : base()
|
||||
|
@ -10,9 +10,11 @@ using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.HostedServices;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
{
|
||||
private readonly InMemoryServiceBusApplicationCacheService _applicationCacheService;
|
||||
private readonly InMemoryServiceBusApplicationCacheService? _applicationCacheService;
|
||||
private readonly IOrganizationRepository _organizationRepository;
|
||||
protected readonly ILogger<ApplicationCacheHostedService> _logger;
|
||||
private readonly ServiceBusClient _serviceBusClient;
|
||||
@ -20,8 +22,8 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient;
|
||||
private readonly string _subName;
|
||||
private readonly string _topicName;
|
||||
private CancellationTokenSource _cts;
|
||||
private Task _executingTask;
|
||||
private CancellationTokenSource? _cts;
|
||||
private Task? _executingTask;
|
||||
|
||||
|
||||
public ApplicationCacheHostedService(
|
||||
@ -67,13 +69,17 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
{
|
||||
await _subscriptionReceiver.CloseAsync(cancellationToken);
|
||||
await _serviceBusClient.DisposeAsync();
|
||||
_cts.Cancel();
|
||||
_cts?.Cancel();
|
||||
try
|
||||
{
|
||||
await _serviceBusAdministrationClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
|
||||
}
|
||||
catch { }
|
||||
await _executingTask;
|
||||
|
||||
if (_executingTask != null)
|
||||
{
|
||||
await _executingTask;
|
||||
}
|
||||
}
|
||||
|
||||
public virtual void Dispose()
|
||||
|
@ -3,6 +3,8 @@ using Microsoft.Extensions.Hosting;
|
||||
|
||||
namespace Bit.Core.HostedServices;
|
||||
|
||||
#nullable enable
|
||||
|
||||
/// <summary>
|
||||
/// A startup service that will seed the IP rate limiting stores with any values in the
|
||||
/// GlobalSettings configuration.
|
||||
|
@ -3,6 +3,8 @@ using Quartz;
|
||||
|
||||
namespace Bit.Core.Jobs;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public abstract class BaseJob : IJob
|
||||
{
|
||||
protected readonly ILogger _logger;
|
||||
|
@ -8,6 +8,8 @@ using Quartz.Impl.Matchers;
|
||||
|
||||
namespace Bit.Core.Jobs;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
{
|
||||
private const int MaximumJobRetries = 10;
|
||||
@ -16,7 +18,7 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
private readonly ILogger<JobListener> _listenerLogger;
|
||||
protected readonly ILogger _logger;
|
||||
|
||||
private IScheduler _scheduler;
|
||||
private IScheduler? _scheduler;
|
||||
protected GlobalSettings _globalSettings;
|
||||
|
||||
public BaseJobsHostedService(
|
||||
@ -31,7 +33,7 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
_globalSettings = globalSettings;
|
||||
}
|
||||
|
||||
public IEnumerable<Tuple<Type, ITrigger>> Jobs { get; protected set; }
|
||||
public IEnumerable<Tuple<Type, ITrigger>>? Jobs { get; protected set; }
|
||||
|
||||
public virtual async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
@ -61,10 +63,19 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
_scheduler.ListenerManager.AddJobListener(new JobListener(_listenerLogger),
|
||||
GroupMatcher<JobKey>.AnyGroup());
|
||||
await _scheduler.Start(cancellationToken);
|
||||
|
||||
var jobKeys = new List<JobKey>();
|
||||
var triggerKeys = new List<TriggerKey>();
|
||||
|
||||
if (Jobs != null)
|
||||
{
|
||||
foreach (var (job, trigger) in Jobs)
|
||||
{
|
||||
jobKeys.Add(JobBuilder.Create(job)
|
||||
.WithIdentity(job.FullName!)
|
||||
.Build().Key);
|
||||
triggerKeys.Add(trigger.Key);
|
||||
|
||||
for (var retry = 0; retry < MaximumJobRetries; retry++)
|
||||
{
|
||||
// There's a race condition when starting multiple containers simultaneously, retry until it succeeds..
|
||||
@ -77,7 +88,7 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
}
|
||||
|
||||
var jobDetail = JobBuilder.Create(job)
|
||||
.WithIdentity(job.FullName)
|
||||
.WithIdentity(job.FullName!)
|
||||
.Build();
|
||||
|
||||
var dupeJ = await _scheduler.GetJobDetail(jobDetail.Key);
|
||||
@ -106,13 +117,6 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
|
||||
// Delete old Jobs and Triggers
|
||||
var existingJobKeys = await _scheduler.GetJobKeys(GroupMatcher<JobKey>.AnyGroup());
|
||||
var jobKeys = Jobs.Select(j =>
|
||||
{
|
||||
var job = j.Item1;
|
||||
return JobBuilder.Create(job)
|
||||
.WithIdentity(job.FullName)
|
||||
.Build().Key;
|
||||
});
|
||||
|
||||
foreach (var key in existingJobKeys)
|
||||
{
|
||||
@ -126,7 +130,6 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
}
|
||||
|
||||
var existingTriggerKeys = await _scheduler.GetTriggerKeys(GroupMatcher<TriggerKey>.AnyGroup());
|
||||
var triggerKeys = Jobs.Select(j => j.Item2.Key);
|
||||
|
||||
foreach (var key in existingTriggerKeys)
|
||||
{
|
||||
@ -142,7 +145,10 @@ public abstract class BaseJobsHostedService : IHostedService, IDisposable
|
||||
|
||||
public virtual async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _scheduler?.Shutdown(cancellationToken);
|
||||
if (_scheduler is not null)
|
||||
{
|
||||
await _scheduler.Shutdown(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public virtual void Dispose()
|
||||
|
@ -4,6 +4,8 @@ using Quartz.Spi;
|
||||
|
||||
namespace Bit.Core.Jobs;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class JobFactory : IJobFactory
|
||||
{
|
||||
private readonly IServiceProvider _container;
|
||||
@ -16,7 +18,7 @@ public class JobFactory : IJobFactory
|
||||
public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
|
||||
{
|
||||
var scope = _container.CreateScope();
|
||||
return scope.ServiceProvider.GetService(bundle.JobDetail.JobType) as IJob;
|
||||
return (scope.ServiceProvider.GetService(bundle.JobDetail.JobType) as IJob)!;
|
||||
}
|
||||
|
||||
public void ReturnJob(IJob job)
|
||||
|
@ -3,6 +3,8 @@ using Quartz;
|
||||
|
||||
namespace Bit.Core.Jobs;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class JobListener : IJobListener
|
||||
{
|
||||
private readonly ILogger<JobListener> _logger;
|
||||
@ -28,7 +30,7 @@ public class JobListener : IJobListener
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
|
||||
public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException,
|
||||
public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException,
|
||||
CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
_logger.LogInformation(Constants.BypassFiltersEventId, null, "Finished job {0} at {1}.",
|
||||
|
@ -1,6 +1,6 @@
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using Bit.Core.Entities;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.KeyManagement.Enums;
|
||||
using Bit.Core.KeyManagement.Models.Data;
|
||||
using Bit.Core.Utilities;
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
namespace Bit.Core.Enums;
|
||||
namespace Bit.Core.KeyManagement.Enums;
|
||||
|
||||
// <summary>
|
||||
// Represents the algorithm / digital signature scheme used for a signature key pair.
|
@ -1,7 +1,7 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json.Serialization;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.KeyManagement.Enums;
|
||||
|
||||
namespace Bit.Core.KeyManagement.Models.Data;
|
||||
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public interface INotificationHubProxy
|
||||
{
|
||||
Task<(INotificationHubClient Client, NotificationOutcome Outcome)[]> SendTemplateNotificationAsync(IDictionary<string, string> properties, string tagExpression);
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public interface INotificationHubPool
|
||||
{
|
||||
NotificationHubConnection ConnectionFor(Guid comb);
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class NotificationHubClientProxy : INotificationHubProxy
|
||||
{
|
||||
private readonly IEnumerable<INotificationHubClient> _clients;
|
||||
|
@ -1,4 +1,5 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Web;
|
||||
using Bit.Core.Settings;
|
||||
@ -7,21 +8,23 @@ using Microsoft.Azure.NotificationHubs;
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class NotificationHubConnection
|
||||
{
|
||||
public string HubName { get; init; }
|
||||
public string ConnectionString { get; init; }
|
||||
public string? HubName { get; init; }
|
||||
public string? ConnectionString { get; init; }
|
||||
private Lazy<NotificationHubConnectionStringBuilder> _parsedConnectionString;
|
||||
public Uri Endpoint => _parsedConnectionString.Value.Endpoint;
|
||||
private string SasKey => _parsedConnectionString.Value.SharedAccessKey;
|
||||
private string SasKeyName => _parsedConnectionString.Value.SharedAccessKeyName;
|
||||
public bool EnableSendTracing { get; init; }
|
||||
private NotificationHubClient _hubClient;
|
||||
private NotificationHubClient? _hubClient;
|
||||
/// <summary>
|
||||
/// Gets the NotificationHubClient for this connection.
|
||||
///
|
||||
///
|
||||
/// If the client is null, it will be initialized.
|
||||
///
|
||||
///
|
||||
/// <throws>Exception</throws> if the connection is invalid.
|
||||
/// </summary>
|
||||
public NotificationHubClient HubClient
|
||||
@ -45,13 +48,13 @@ public class NotificationHubConnection
|
||||
}
|
||||
/// <summary>
|
||||
/// Gets the start date for registration.
|
||||
///
|
||||
///
|
||||
/// If null, registration is always disabled.
|
||||
/// </summary>
|
||||
public DateTime? RegistrationStartDate { get; init; }
|
||||
/// <summary>
|
||||
/// Gets the end date for registration.
|
||||
///
|
||||
///
|
||||
/// If null, registration has no end date.
|
||||
/// </summary>
|
||||
public DateTime? RegistrationEndDate { get; init; }
|
||||
@ -155,9 +158,10 @@ public class NotificationHubConnection
|
||||
};
|
||||
}
|
||||
|
||||
[MemberNotNull(nameof(_hubClient))]
|
||||
private NotificationHubConnection Init()
|
||||
{
|
||||
HubClient = NotificationHubClient.CreateClientFromConnectionString(ConnectionString, HubName, EnableSendTracing);
|
||||
_hubClient = NotificationHubClient.CreateClientFromConnectionString(ConnectionString, HubName, EnableSendTracing);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@ using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class NotificationHubPool : INotificationHubPool
|
||||
{
|
||||
private List<NotificationHubConnection> _connections { get; }
|
||||
|
@ -19,6 +19,8 @@ using Notification = Bit.Core.NotificationCenter.Entities.Notification;
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
/// <summary>
|
||||
/// Sends mobile push notifications to the Azure Notification Hub.
|
||||
/// Used by Cloud-Hosted environments.
|
||||
|
@ -13,6 +13,8 @@ using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public class NotificationHubPushRegistrationService : IPushRegistrationService
|
||||
{
|
||||
private static readonly JsonSerializerOptions webPushSerializationOptions = new()
|
||||
@ -37,7 +39,7 @@ public class NotificationHubPushRegistrationService : IPushRegistrationService
|
||||
}
|
||||
|
||||
public async Task CreateOrUpdateRegistrationAsync(PushRegistrationData data, string deviceId, string userId,
|
||||
string identifier, DeviceType type, IEnumerable<string> organizationIds, Guid installationId)
|
||||
string? identifier, DeviceType type, IEnumerable<string> organizationIds, Guid installationId)
|
||||
{
|
||||
var orgIds = organizationIds.ToList();
|
||||
var clientType = DeviceTypes.ToClientType(type);
|
||||
@ -79,7 +81,7 @@ public class NotificationHubPushRegistrationService : IPushRegistrationService
|
||||
}
|
||||
|
||||
private async Task CreateOrUpdateMobileRegistrationAsync(Installation installation, string userId,
|
||||
string identifier, ClientType clientType, List<string> organizationIds, DeviceType type, Guid installationId)
|
||||
string? identifier, ClientType clientType, List<string> organizationIds, DeviceType type, Guid installationId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(installation.PushChannel))
|
||||
{
|
||||
@ -137,7 +139,7 @@ public class NotificationHubPushRegistrationService : IPushRegistrationService
|
||||
}
|
||||
|
||||
private async Task CreateOrUpdateWebRegistrationAsync(string endpoint, string p256dh, string auth, Installation installation, string userId,
|
||||
string identifier, ClientType clientType, List<string> organizationIds, Guid installationId)
|
||||
string? identifier, ClientType clientType, List<string> organizationIds, Guid installationId)
|
||||
{
|
||||
// The Azure SDK is currently lacking support for web push registrations.
|
||||
// We need to use the REST API directly.
|
||||
@ -187,7 +189,7 @@ public class NotificationHubPushRegistrationService : IPushRegistrationService
|
||||
}
|
||||
|
||||
private static KeyValuePair<string, InstallationTemplate> BuildInstallationTemplate(string templateId, [StringSyntax(StringSyntaxAttribute.Json)] string templateBody,
|
||||
string userId, string identifier, ClientType clientType, List<string> organizationIds, Guid installationId)
|
||||
string userId, string? identifier, ClientType clientType, List<string> organizationIds, Guid installationId)
|
||||
{
|
||||
var fullTemplateId = $"template:{templateId}";
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
namespace Bit.Core.NotificationHub;
|
||||
|
||||
#nullable enable
|
||||
|
||||
public record struct WebPushRegistrationData
|
||||
{
|
||||
public string Endpoint { get; init; }
|
||||
@ -9,9 +11,9 @@ public record struct WebPushRegistrationData
|
||||
|
||||
public record class PushRegistrationData
|
||||
{
|
||||
public string Token { get; set; }
|
||||
public string? Token { get; set; }
|
||||
public WebPushRegistrationData? WebPush { get; set; }
|
||||
public PushRegistrationData(string token)
|
||||
public PushRegistrationData(string? token)
|
||||
{
|
||||
Token = token;
|
||||
}
|
||||
|
@ -327,6 +327,7 @@ public class GlobalSettings : IGlobalSettings
|
||||
|
||||
public int MaxRetries { get; set; } = 3;
|
||||
public int RetryTiming { get; set; } = 30000; // 30s
|
||||
public bool UseDelayPlugin { get; set; } = false;
|
||||
public virtual string EventRepositoryQueueName { get; set; } = "events-write-queue";
|
||||
public virtual string IntegrationDeadLetterQueueName { get; set; } = "integration-dead-letter-queue";
|
||||
public virtual string SlackEventsQueueName { get; set; } = "events-slack-queue";
|
||||
|
@ -24,7 +24,7 @@ public interface ICipherService
|
||||
Task DeleteFolderAsync(Folder folder);
|
||||
Task ShareAsync(Cipher originalCipher, Cipher cipher, Guid organizationId, IEnumerable<Guid> collectionIds,
|
||||
Guid userId, DateTime? lastKnownRevisionDate);
|
||||
Task<IEnumerable<Cipher>> ShareManyAsync(IEnumerable<(Cipher cipher, DateTime? lastKnownRevisionDate)> ciphers, Guid organizationId,
|
||||
Task<IEnumerable<CipherDetails>> ShareManyAsync(IEnumerable<(CipherDetails cipher, DateTime? lastKnownRevisionDate)> ciphers, Guid organizationId,
|
||||
IEnumerable<Guid> collectionIds, Guid sharingUserId);
|
||||
Task SaveCollectionsAsync(Cipher cipher, IEnumerable<Guid> collectionIds, Guid savingUserId, bool orgAdmin);
|
||||
Task SoftDeleteAsync(CipherDetails cipherDetails, Guid deletingUserId, bool orgAdmin = false);
|
||||
|
@ -625,7 +625,7 @@ public class CipherService : ICipherService
|
||||
await _pushService.PushSyncCipherUpdateAsync(cipher, collectionIds);
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<Cipher>> ShareManyAsync(IEnumerable<(Cipher cipher, DateTime? lastKnownRevisionDate)> cipherInfos,
|
||||
public async Task<IEnumerable<CipherDetails>> ShareManyAsync(IEnumerable<(CipherDetails cipher, DateTime? lastKnownRevisionDate)> cipherInfos,
|
||||
Guid organizationId, IEnumerable<Guid> collectionIds, Guid sharingUserId)
|
||||
{
|
||||
var cipherIds = new List<Guid>();
|
||||
|
@ -8,16 +8,6 @@ namespace Bit.Icons.Controllers;
|
||||
[Route("")]
|
||||
public class IconsController : Controller
|
||||
{
|
||||
// Basic bwi-globe icon
|
||||
private static readonly byte[] _notFoundImage = Convert.FromBase64String("iVBORw0KGgoAAAANSUhEUg" +
|
||||
"AAABMAAAATCAQAAADYWf5HAAABu0lEQVR42nXSvWuTURTH8R+t0heI9Y04aJycdBLNJNrBFBU7OFgUER3q21I0bXK+JwZ" +
|
||||
"pXISm/QdcRB3EgqBBsNihsUbbgODQQSKCuKSDOApJuuhj8tCYQj/jvYfD795z1MZ+nBKrNKhSwrMxbZTrtRnqlEjZkB/x" +
|
||||
"C/xmhZrlc71qS0Up8yVzTCGucFNKD1JhORVd70SZNU4okNx5d4+U2UXRIpJFWLClsR79YzN88wQvLWNzzPKEeS/wkQGpW" +
|
||||
"VhhqhW8TtDJD3Mm1x/23zLSrZCdpBY8BueTNjHSbc+8wC9HlHgU5Aj5AW5zPdcVdpq0UcknWBSr/pjixO4gfp899Kd23p" +
|
||||
"M2qQCH7LkCnqAqGh73OK/8NPOcaibr90LrW/yWAnaUhqjaOSl9nFR2r5rsqo22ypn1B5IN8VOUMHVgOnNQIX+d62plcz6" +
|
||||
"rg1/jskK8CMb4we4pG6OWHtR/LBJkC2E4a7ZPkuX5ntumAOM2xxveclEhLvGH6XCmLPs735Eetrw63NnOgr9P9q1viC3x" +
|
||||
"lRUGOjImqFDuOBvrYYoaZU9z1uPpYae5NfdvbNVG2ZjDIlXq/oMi46lo++4vjjPBl2Dlg00AAAAASUVORK5CYII=");
|
||||
|
||||
private readonly IMemoryCache _memoryCache;
|
||||
private readonly IDomainMappingService _domainMappingService;
|
||||
private readonly IIconFetchingService _iconFetchingService;
|
||||
@ -99,7 +89,7 @@ public class IconsController : Controller
|
||||
|
||||
if (icon == null)
|
||||
{
|
||||
return new FileContentResult(_notFoundImage, "image/png");
|
||||
return new NotFoundResult();
|
||||
}
|
||||
|
||||
return new FileContentResult(icon.Image, icon.Format);
|
||||
|
@ -5,6 +5,7 @@ using Bit.Core.KeyManagement.Models.Data;
|
||||
using Bit.Core.KeyManagement.Repositories;
|
||||
using Bit.Core.KeyManagement.UserKey;
|
||||
using Bit.Core.Settings;
|
||||
using Bit.Core.Utilities;
|
||||
using Bit.Infrastructure.Dapper.Repositories;
|
||||
using Dapper;
|
||||
using Microsoft.Data.SqlClient;
|
||||
@ -45,7 +46,7 @@ public class UserSignatureKeyPairRepository : Repository<UserSignatureKeyPair, G
|
||||
"[dbo].[UserSignatureKeyPair_SetForRotation]",
|
||||
new
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
Id = CoreHelpers.GenerateComb(),
|
||||
UserId = userId,
|
||||
SignatureAlgorithm = (byte)signingKeys.SignatureAlgorithm,
|
||||
SigningKey = signingKeys.WrappedSigningKey,
|
||||
|
@ -4,6 +4,7 @@ using AutoMapper;
|
||||
using Bit.Core.KeyManagement.Models.Data;
|
||||
using Bit.Core.KeyManagement.Repositories;
|
||||
using Bit.Core.KeyManagement.UserKey;
|
||||
using Bit.Core.Utilities;
|
||||
using Bit.Infrastructure.EntityFramework.Repositories;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@ -37,7 +38,7 @@ public class UserSignatureKeyPairRepository : Repository<Core.KeyManagement.Enti
|
||||
var dbContext = GetDatabaseContext(scope);
|
||||
var entity = new Models.UserSignatureKeyPair
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
Id = CoreHelpers.GenerateComb(),
|
||||
UserId = userId,
|
||||
SignatureAlgorithm = signingKeys.SignatureAlgorithm,
|
||||
SigningKey = signingKeys.WrappedSigningKey,
|
||||
|
@ -550,7 +550,8 @@ public static class ServiceCollectionExtensions
|
||||
if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.ConnectionString) &&
|
||||
CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
|
||||
{
|
||||
services.AddKeyedSingleton<IEventWriteService, AzureServiceBusEventWriteService>("broadcast");
|
||||
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
|
||||
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -563,7 +564,8 @@ public static class ServiceCollectionExtensions
|
||||
|
||||
if (IsRabbitMqEnabled(globalSettings))
|
||||
{
|
||||
services.AddKeyedSingleton<IEventWriteService, RabbitMqEventWriteService>("broadcast");
|
||||
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
|
||||
services.AddKeyedSingleton<IEventWriteService, EventIntegrationEventWriteService>("broadcast");
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -585,13 +587,15 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IEventRepository, TableStorageRepos.EventRepository>();
|
||||
services.AddSingleton<AzureTableStorageEventHandler>();
|
||||
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
|
||||
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
new AzureServiceBusEventListenerService(
|
||||
handler: provider.GetRequiredService<AzureTableStorageEventHandler>(),
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
|
||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||
subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName,
|
||||
globalSettings: globalSettings,
|
||||
subscriptionName: globalSettings.EventLogging.AzureServiceBus.EventRepositorySubscriptionName));
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
|
||||
)
|
||||
);
|
||||
|
||||
return services;
|
||||
}
|
||||
@ -607,12 +611,10 @@ public static class ServiceCollectionExtensions
|
||||
{
|
||||
var routingKey = integrationType.ToRoutingKey();
|
||||
|
||||
services.AddSingleton<IIntegrationPublisher, AzureServiceBusIntegrationPublisher>();
|
||||
|
||||
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
|
||||
new EventIntegrationHandler<TConfig>(
|
||||
integrationType,
|
||||
provider.GetRequiredService<IIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IEventIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
|
||||
provider.GetRequiredService<IUserRepository>(),
|
||||
provider.GetRequiredService<IOrganizationRepository>()));
|
||||
@ -620,18 +622,22 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
new AzureServiceBusEventListenerService(
|
||||
handler: provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>(),
|
||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||
subscriptionName: eventSubscriptionName,
|
||||
globalSettings: globalSettings,
|
||||
subscriptionName: eventSubscriptionName));
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusEventListenerService>>()
|
||||
)
|
||||
);
|
||||
|
||||
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
|
||||
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
new AzureServiceBusIntegrationListenerService(
|
||||
handler: provider.GetRequiredService<IIntegrationHandler<TConfig>>(),
|
||||
topicName: globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName,
|
||||
subscriptionName: integrationSubscriptionName,
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService>>(),
|
||||
globalSettings: globalSettings));
|
||||
maxRetries: globalSettings.EventLogging.AzureServiceBus.MaxRetries,
|
||||
serviceBusService: provider.GetRequiredService<IAzureServiceBusService>(),
|
||||
logger: provider.GetRequiredService<ILogger<AzureServiceBusIntegrationListenerService>>()));
|
||||
|
||||
return services;
|
||||
}
|
||||
@ -642,6 +648,8 @@ public static class ServiceCollectionExtensions
|
||||
!CoreHelpers.SettingHasValue(globalSettings.EventLogging.AzureServiceBus.EventTopicName))
|
||||
return services;
|
||||
|
||||
services.AddSingleton<IAzureServiceBusService, AzureServiceBusService>();
|
||||
services.AddSingleton<IEventIntegrationPublisher, AzureServiceBusService>();
|
||||
services.AddAzureServiceBusEventRepositoryListener(globalSettings);
|
||||
|
||||
services.AddSlackService(globalSettings);
|
||||
@ -668,9 +676,9 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
new RabbitMqEventListenerService(
|
||||
provider.GetRequiredService<EventRepositoryHandler>(),
|
||||
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
|
||||
globalSettings,
|
||||
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName));
|
||||
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName,
|
||||
provider.GetRequiredService<IRabbitMqService>(),
|
||||
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
|
||||
|
||||
return services;
|
||||
}
|
||||
@ -679,19 +687,17 @@ public static class ServiceCollectionExtensions
|
||||
string eventQueueName,
|
||||
string integrationQueueName,
|
||||
string integrationRetryQueueName,
|
||||
string integrationDeadLetterQueueName,
|
||||
IntegrationType integrationType,
|
||||
GlobalSettings globalSettings)
|
||||
int maxRetries,
|
||||
IntegrationType integrationType)
|
||||
where TConfig : class
|
||||
where THandler : class, IIntegrationHandler<TConfig>
|
||||
{
|
||||
var routingKey = integrationType.ToRoutingKey();
|
||||
|
||||
services.AddSingleton<IIntegrationPublisher, RabbitMqIntegrationPublisher>();
|
||||
services.AddKeyedSingleton<IEventMessageHandler>(routingKey, (provider, _) =>
|
||||
new EventIntegrationHandler<TConfig>(
|
||||
integrationType,
|
||||
provider.GetRequiredService<IIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IEventIntegrationPublisher>(),
|
||||
provider.GetRequiredService<IOrganizationIntegrationConfigurationRepository>(),
|
||||
provider.GetRequiredService<IUserRepository>(),
|
||||
provider.GetRequiredService<IOrganizationRepository>()));
|
||||
@ -699,9 +705,9 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
new RabbitMqEventListenerService(
|
||||
provider.GetRequiredKeyedService<IEventMessageHandler>(routingKey),
|
||||
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
|
||||
globalSettings,
|
||||
eventQueueName));
|
||||
eventQueueName,
|
||||
provider.GetRequiredService<IRabbitMqService>(),
|
||||
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>()));
|
||||
|
||||
services.AddSingleton<IIntegrationHandler<TConfig>, THandler>();
|
||||
services.AddSingleton<IHostedService>(provider =>
|
||||
@ -710,8 +716,8 @@ public static class ServiceCollectionExtensions
|
||||
routingKey: routingKey,
|
||||
queueName: integrationQueueName,
|
||||
retryQueueName: integrationRetryQueueName,
|
||||
deadLetterQueueName: integrationDeadLetterQueueName,
|
||||
globalSettings: globalSettings,
|
||||
maxRetries: maxRetries,
|
||||
rabbitMqService: provider.GetRequiredService<IRabbitMqService>(),
|
||||
logger: provider.GetRequiredService<ILogger<RabbitMqIntegrationListenerService>>()));
|
||||
|
||||
return services;
|
||||
@ -724,6 +730,8 @@ public static class ServiceCollectionExtensions
|
||||
return services;
|
||||
}
|
||||
|
||||
services.AddSingleton<IRabbitMqService, RabbitMqService>();
|
||||
services.AddSingleton<IEventIntegrationPublisher, RabbitMqService>();
|
||||
services.AddRabbitMqEventRepositoryListener(globalSettings);
|
||||
|
||||
services.AddSlackService(globalSettings);
|
||||
@ -731,18 +739,16 @@ public static class ServiceCollectionExtensions
|
||||
globalSettings.EventLogging.RabbitMq.SlackEventsQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.SlackIntegrationQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.SlackIntegrationRetryQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName,
|
||||
IntegrationType.Slack,
|
||||
globalSettings);
|
||||
globalSettings.EventLogging.RabbitMq.MaxRetries,
|
||||
IntegrationType.Slack);
|
||||
|
||||
services.AddHttpClient(WebhookIntegrationHandler.HttpClientName);
|
||||
services.AddRabbitMqIntegration<WebhookIntegrationConfigurationDetails, WebhookIntegrationHandler>(
|
||||
globalSettings.EventLogging.RabbitMq.WebhookEventsQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.WebhookIntegrationQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.WebhookIntegrationRetryQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName,
|
||||
IntegrationType.Webhook,
|
||||
globalSettings);
|
||||
globalSettings.EventLogging.RabbitMq.MaxRetries,
|
||||
IntegrationType.Webhook);
|
||||
|
||||
return services;
|
||||
}
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user