mirror of
https://github.com/bitwarden/server.git
synced 2025-05-22 12:04:27 -05:00

We are interested in the rate at which signalR notifications are pushed to clients. This enables tracking only of that rate and the type of notification, nothing more identifiable. Data will be used to determine feasibility of transferring to web push
96 lines
3.4 KiB
C#
96 lines
3.4 KiB
C#
using Azure.Storage.Queues;
|
|
using Bit.Core.Settings;
|
|
using Bit.Core.Utilities;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
|
|
namespace Bit.Notifications;
|
|
|
|
public class AzureQueueHostedService : IHostedService, IDisposable
|
|
{
|
|
private readonly ILogger _logger;
|
|
private readonly IHubContext<NotificationsHub> _hubContext;
|
|
private readonly IHubContext<AnonymousNotificationsHub> _anonymousHubContext;
|
|
private readonly GlobalSettings _globalSettings;
|
|
|
|
private Task _executingTask;
|
|
private CancellationTokenSource _cts;
|
|
private QueueClient _queueClient;
|
|
|
|
public AzureQueueHostedService(
|
|
ILogger<AzureQueueHostedService> logger,
|
|
IHubContext<NotificationsHub> hubContext,
|
|
IHubContext<AnonymousNotificationsHub> anonymousHubContext,
|
|
GlobalSettings globalSettings)
|
|
{
|
|
_logger = logger;
|
|
_hubContext = hubContext;
|
|
_globalSettings = globalSettings;
|
|
_anonymousHubContext = anonymousHubContext;
|
|
}
|
|
|
|
public Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
_executingTask = ExecuteAsync(_cts.Token);
|
|
return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
|
|
}
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (_executingTask == null)
|
|
{
|
|
return;
|
|
}
|
|
_logger.LogWarning("Stopping service.");
|
|
_cts.Cancel();
|
|
await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
}
|
|
|
|
public void Dispose()
|
|
{ }
|
|
|
|
private async Task ExecuteAsync(CancellationToken cancellationToken)
|
|
{
|
|
_queueClient = new QueueClient(_globalSettings.Notifications.ConnectionString, "notifications");
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
var messages = await _queueClient.ReceiveMessagesAsync(32);
|
|
if (messages.Value?.Any() ?? false)
|
|
{
|
|
foreach (var message in messages.Value)
|
|
{
|
|
try
|
|
{
|
|
await HubHelpers.SendNotificationToHubAsync(
|
|
message.DecodeMessageText(), _hubContext, _anonymousHubContext, _logger, cancellationToken);
|
|
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "Error processing dequeued message: {MessageId} x{DequeueCount}.",
|
|
message.MessageId, message.DequeueCount);
|
|
if (message.DequeueCount > 2)
|
|
{
|
|
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "Error processing messages.");
|
|
}
|
|
}
|
|
|
|
_logger.LogWarning("Done processing.");
|
|
}
|
|
}
|