1
0
mirror of https://github.com/bitwarden/server.git synced 2025-04-06 05:28:15 -05:00

Event processor tuning (#3945)

This commit is contained in:
Matt Bishop 2024-04-02 15:45:18 -04:00 committed by GitHub
parent a048d6d9e3
commit 88f34836f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -30,6 +30,7 @@ public class AzureQueueHostedService : IHostedService, IDisposable
_logger.LogInformation(Constants.BypassFiltersEventId, "Starting service."); _logger.LogInformation(Constants.BypassFiltersEventId, "Starting service.");
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token); _executingTask = ExecuteAsync(_cts.Token);
return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask; return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
} }
@ -39,8 +40,10 @@ public class AzureQueueHostedService : IHostedService, IDisposable
{ {
return; return;
} }
_logger.LogWarning("Stopping service."); _logger.LogWarning("Stopping service.");
_cts.Cancel();
await _cts.CancelAsync();
await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken)); await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
} }
@ -64,13 +67,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable
{ {
try try
{ {
var messages = await _queueClient.ReceiveMessagesAsync(32); var messages = await _queueClient.ReceiveMessagesAsync(32,
cancellationToken: cancellationToken);
if (messages.Value?.Any() ?? false) if (messages.Value?.Any() ?? false)
{ {
foreach (var message in messages.Value) foreach (var message in messages.Value)
{ {
await ProcessQueueMessageAsync(message.DecodeMessageText(), cancellationToken); await ProcessQueueMessageAsync(message.DecodeMessageText(), cancellationToken);
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt,
cancellationToken);
} }
} }
else else
@ -78,14 +83,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
} }
} }
catch (Exception e) catch (Exception ex)
{ {
_logger.LogError(e, "Exception occurred: " + e.Message); _logger.LogError(ex, "Error occurred processing message block.");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
} }
} }
_logger.LogWarning("Done processing."); _logger.LogWarning("Done processing messages.");
} }
public async Task ProcessQueueMessageAsync(string message, CancellationToken cancellationToken) public async Task ProcessQueueMessageAsync(string message, CancellationToken cancellationToken)
@ -98,14 +104,14 @@ public class AzureQueueHostedService : IHostedService, IDisposable
try try
{ {
_logger.LogInformation("Processing message."); _logger.LogInformation("Processing message.");
var events = new List<IEvent>();
var events = new List<IEvent>();
using var jsonDocument = JsonDocument.Parse(message); using var jsonDocument = JsonDocument.Parse(message);
var root = jsonDocument.RootElement; var root = jsonDocument.RootElement;
if (root.ValueKind == JsonValueKind.Array) if (root.ValueKind == JsonValueKind.Array)
{ {
var indexedEntities = root.Deserialize<List<EventMessage>>() var indexedEntities = root.Deserialize<List<EventMessage>>()
.SelectMany(e => EventTableEntity.IndexEvent(e)); .SelectMany(EventTableEntity.IndexEvent);
events.AddRange(indexedEntities); events.AddRange(indexedEntities);
} }
else if (root.ValueKind == JsonValueKind.Object) else if (root.ValueKind == JsonValueKind.Object)
@ -114,12 +120,15 @@ public class AzureQueueHostedService : IHostedService, IDisposable
events.AddRange(EventTableEntity.IndexEvent(eventMessage)); events.AddRange(EventTableEntity.IndexEvent(eventMessage));
} }
cancellationToken.ThrowIfCancellationRequested();
await _eventWriteService.CreateManyAsync(events); await _eventWriteService.CreateManyAsync(events);
_logger.LogInformation("Processed message."); _logger.LogInformation("Processed message.");
} }
catch (JsonException) catch (JsonException ex)
{ {
_logger.LogError("JsonReaderException: Unable to parse message."); _logger.LogError(ex, "Unable to parse message.");
} }
} }
} }