mirror of
https://github.com/bitwarden/server.git
synced 2025-07-03 00:52:49 -05:00
Support large organization sync (#1311)
* Increase organization max seat size from 30k to 2b (#1274) * Increase organization max seat size from 30k to 2b * PR review. Do not modify unless state matches expected * Organization sync simultaneous event reporting (#1275) * Split up azure messages according to max size * Allow simultaneous login of organization user events * Early resolve small event lists * Clarify logic Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com> * Improve readability This comes at the cost of multiple serializations, but the improvement in wire-time should more than make up for this on message where serialization time matters Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com> * Queue emails (#1286) * Extract common Azure queue methods * Do not use internal entity framework namespace * Prefer IEnumerable to IList unless needed All of these implementations were just using `Count == 1`, which is easily replicated. This will be used when abstracting Azure queues * Add model for azure queue message * Abstract Azure queue for reuse * Creat service to enqueue mail messages for later processing Azure queue mail service uses Azure queues. Blocking just blocks until all the work is done -- This is how emailing works today * Provide mail queue service to DI * Queue organization invite emails for later processing All emails can later be added to this queue * Create Admin hosted service to process enqueued mail messages * Prefer constructors to static generators * Mass delete organization users (#1287) * Add delete many to Organization Users * Correct formatting * Remove erroneous migration * Clarify parameter name * Formatting fixes * Simplify bump account revision sproc * Formatting fixes * Match file names to objects * Indicate if large import is expected * Early pull all existing users we were planning on inviting (#1290) * Early pull all existing users we were planning on inviting * Improve sproc name * Batch upsert org users (#1289) * Add UpsertMany sprocs to OrganizationUser * Add method to create TVPs from any object. Uses DbOrder attribute to generate. Sproc will fail unless TVP column order matches that of the db type * Combine migrations * Correct formatting * Include sql objects in sql project * Keep consisten parameter names * Batch deletes for performance * Correct formatting * consolidate migrations * Use batch methods in OrganizationImport * Declare @BatchSize * Transaction names limited to 32 chars Drop sproc before creating it if it exists * Update import tests * Allow for more users in org upgrades * Fix formatting * Improve class hierarchy structure * Use name tuple types * Fix formatting * Front load all reflection * Format constructor * Simplify ToTvp as class-specific extension Co-authored-by: Chad Scharf <3904944+cscharf@users.noreply.github.com>
This commit is contained in:
107
src/Admin/HostedServices/AzureQueueMailHostedService.cs
Normal file
107
src/Admin/HostedServices/AzureQueueMailHostedService.cs
Normal file
@ -0,0 +1,107 @@
|
||||
using System;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Azure.Storage.Queues;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Bit.Core.Settings;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading;
|
||||
using Bit.Core.Services;
|
||||
using Newtonsoft.Json;
|
||||
using Bit.Core.Models.Mail;
|
||||
using Azure.Storage.Queues.Models;
|
||||
using System.Linq;
|
||||
using System.Collections.Generic;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
||||
namespace Bit.Admin.HostedServices
|
||||
{
|
||||
public class AzureQueueMailHostedService : IHostedService
|
||||
{
|
||||
private readonly ILogger<AzureQueueMailHostedService> _logger;
|
||||
private readonly GlobalSettings _globalSettings;
|
||||
private readonly IMailService _mailService;
|
||||
private CancellationTokenSource _cts;
|
||||
private Task _executingTask;
|
||||
|
||||
private QueueClient _mailQueueClient;
|
||||
|
||||
public AzureQueueMailHostedService(
|
||||
ILogger<AzureQueueMailHostedService> logger,
|
||||
IMailService mailService,
|
||||
GlobalSettings globalSettings)
|
||||
{
|
||||
_logger = logger;
|
||||
_mailService = mailService;
|
||||
_globalSettings = globalSettings;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
_executingTask = ExecuteAsync(_cts.Token);
|
||||
return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_executingTask == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
_cts.Cancel();
|
||||
await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
private async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_mailQueueClient = new QueueClient(_globalSettings.Mail.ConnectionString, "mail");
|
||||
|
||||
QueueMessage[] mailMessages;
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
if (!(mailMessages = await RetrieveMessagesAsync()).Any())
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(15));
|
||||
}
|
||||
|
||||
foreach (var message in mailMessages)
|
||||
{
|
||||
try
|
||||
{
|
||||
var token = JToken.Parse(message.MessageText);
|
||||
if (token is JArray)
|
||||
{
|
||||
foreach (var mailQueueMessage in token.ToObject<List<MailQueueMessage>>())
|
||||
{
|
||||
await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage);
|
||||
}
|
||||
}
|
||||
else if (token is JObject)
|
||||
{
|
||||
var mailQueueMessage = token.ToObject<MailQueueMessage>();
|
||||
await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Failed to send email");
|
||||
// TODO: retries?
|
||||
}
|
||||
|
||||
await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
|
||||
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<QueueMessage[]> RetrieveMessagesAsync()
|
||||
{
|
||||
return (await _mailQueueClient.ReceiveMessagesAsync(maxMessages: 32))?.Value ?? new QueueMessage[] { };
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user