diff --git a/src/EventsProcessor/AzureQueueHostedService.cs b/src/EventsProcessor/AzureQueueHostedService.cs new file mode 100644 index 0000000000..18e1b07ead --- /dev/null +++ b/src/EventsProcessor/AzureQueueHostedService.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Bit.Core.Models.Data; +using Bit.Core.Services; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Queue; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Bit.EventsProcessor +{ + public class AzureQueueHostedService : IHostedService, IDisposable + { + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + private Task _executingTask; + private CancellationTokenSource _cts; + private CloudQueue _queue; + private IEventWriteService _eventWriteService; + + public AzureQueueHostedService( + ILogger logger, + IConfiguration configuration) + { + _logger = logger; + _configuration = configuration; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _executingTask = ExecuteAsync(_cts.Token); + return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if(_executingTask == null) + { + return; + } + _cts.Cancel(); + await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken)); + cancellationToken.ThrowIfCancellationRequested(); + } + + public void Dispose() + { } + + private async Task ExecuteAsync(CancellationToken cancellationToken) + { + var storageConnectionString = _configuration["azureStorageConnectionString"]; + if(string.IsNullOrWhiteSpace(storageConnectionString)) + { + return; + } + + var repo = new Core.Repositories.TableStorage.EventRepository(storageConnectionString); + _eventWriteService = new RepositoryEventWriteService(repo); + + var storageAccount = CloudStorageAccount.Parse(storageConnectionString); + var queueClient = storageAccount.CreateCloudQueueClient(); + _queue = queueClient.GetQueueReference("event"); + + while(!cancellationToken.IsCancellationRequested) + { + var messages = await _queue.GetMessagesAsync(32, TimeSpan.FromMinutes(1), + null, null, cancellationToken); + if(messages.Any()) + { + foreach(var message in messages) + { + await ProcessQueueMessageAsync(message.AsString, cancellationToken); + await _queue.DeleteMessageAsync(message); + } + } + else + { + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + } + } + + public async Task ProcessQueueMessageAsync(string message, CancellationToken cancellationToken) + { + if(_eventWriteService == null || message == null || message.Length == 0) + { + return; + } + + try + { + _logger.LogInformation("Processing message."); + var events = new List(); + + var token = JToken.Parse(message); + if(token is JArray) + { + var indexedEntities = token.ToObject>() + .SelectMany(e => EventTableEntity.IndexEvent(e)); + events.AddRange(indexedEntities); + } + else if(token is JObject) + { + var eventMessage = token.ToObject(); + events.AddRange(EventTableEntity.IndexEvent(eventMessage)); + } + + await _eventWriteService.CreateManyAsync(events); + _logger.LogInformation("Processed message."); + } + catch(JsonReaderException) + { + _logger.LogError("JsonReaderException: Unable to parse message."); + } + catch(JsonSerializationException) + { + _logger.LogError("JsonSerializationException: Unable to serialize token."); + } + catch(Exception e) + { + _logger.LogError(e, "Exception occurred. " + e.Message); + throw e; + } + } + } +} diff --git a/src/EventsProcessor/EventsProcessor.csproj b/src/EventsProcessor/EventsProcessor.csproj index 3753e7fc3c..64afad1245 100644 --- a/src/EventsProcessor/EventsProcessor.csproj +++ b/src/EventsProcessor/EventsProcessor.csproj @@ -1,30 +1,18 @@ - + + - Exe + 1.30.0 netcoreapp2.1 Bit.EventsProcessor + bitwarden-EventsProcessor + - - - - + + - - - - - - PreserveNewest - PreserveNewest - - - - - Always - - - \ No newline at end of file + + diff --git a/src/EventsProcessor/Functions.cs b/src/EventsProcessor/Functions.cs deleted file mode 100644 index bd837a9693..0000000000 --- a/src/EventsProcessor/Functions.cs +++ /dev/null @@ -1,76 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Bit.Core.Models.Data; -using Bit.Core.Services; -using Microsoft.Azure.WebJobs; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; - -namespace Bit.EventsProcessor -{ - public class Functions - { - private readonly IEventWriteService _eventWriteService; - - public Functions(IConfiguration config) - { - var storageConnectionString = config["AzureWebJobsStorage"]; - if(string.IsNullOrWhiteSpace(storageConnectionString)) - { - return; - } - - var repo = new Core.Repositories.TableStorage.EventRepository(storageConnectionString); - _eventWriteService = new RepositoryEventWriteService(repo); - } - - public async Task ProcessQueueMessageAsync([QueueTrigger("event")] string message, - CancellationToken cancellationToken, ILogger logger) - { - if(_eventWriteService == null || message == null || message.Length == 0) - { - return; - } - - try - { - logger.LogInformation("Processing message."); - var events = new List(); - - var token = JToken.Parse(message); - if(token is JArray) - { - var indexedEntities = token.ToObject>() - .SelectMany(e => EventTableEntity.IndexEvent(e)); - events.AddRange(indexedEntities); - } - else if(token is JObject) - { - var eventMessage = token.ToObject(); - events.AddRange(EventTableEntity.IndexEvent(eventMessage)); - } - - await _eventWriteService.CreateManyAsync(events); - logger.LogInformation("Processed message."); - } - catch(JsonReaderException) - { - logger.LogError("JsonReaderException: Unable to parse message."); - } - catch(JsonSerializationException) - { - logger.LogError("JsonSerializationException: Unable to serialize token."); - } - catch(Exception e) - { - logger.LogError(e, "Exception occurred. " + e.Message); - throw e; - } - } - } -} diff --git a/src/EventsProcessor/Program.cs b/src/EventsProcessor/Program.cs index fb8266e509..cf7fb579b4 100644 --- a/src/EventsProcessor/Program.cs +++ b/src/EventsProcessor/Program.cs @@ -1,40 +1,17 @@ -using System; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; namespace Bit.EventsProcessor { - class Program + public class Program { - static void Main(string[] args) + public static void Main(string[] args) { - var builder = new HostBuilder(); - builder.ConfigureWebJobs(b => - { - b.AddAzureStorageCoreServices(); - b.AddAzureStorage(a => - { - a.BatchSize = 5; - }); - // Not working. ref: https://github.com/Azure/azure-webjobs-sdk/issues/1962 - b.AddDashboardLogging(); - }); - builder.ConfigureLogging((context, b) => - { - b.AddConsole(); - b.SetMinimumLevel(LogLevel.Warning); - }); - builder.ConfigureHostConfiguration(b => - { - b.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true); - b.AddEnvironmentVariables(); - }); - var host = builder.Build(); - using(host) - { - host.Run(); - } + WebHost + .CreateDefaultBuilder(args) + .UseStartup() + .Build() + .Run(); } } } diff --git a/src/EventsProcessor/Properties/launchSettings.json b/src/EventsProcessor/Properties/launchSettings.json new file mode 100644 index 0000000000..d7bb7a4129 --- /dev/null +++ b/src/EventsProcessor/Properties/launchSettings.json @@ -0,0 +1,27 @@ +{ + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:54103/", + "sslPort": 0 + } + }, + "profiles": { + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "EventsProcessor": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "http://localhost:54104/" + } + } +} \ No newline at end of file diff --git a/src/EventsProcessor/Settings.job b/src/EventsProcessor/Settings.job deleted file mode 100644 index e572f4eee0..0000000000 --- a/src/EventsProcessor/Settings.job +++ /dev/null @@ -1,22 +0,0 @@ -{ - - // Examples: - - // Runs every minute - // "schedule": "0 * * * * *" - - // Runs every 15 minutes - // "schedule": "0 */15 * * * *" - - // Runs every hour (i.e. whenever the count of minutes is 0) - // "schedule": "0 0 * * * *" - - // Runs every hour from 9 AM to 5 PM - // "schedule": "0 0 9-17 * * *" - - // Runs at 9:30 AM every day - // "schedule": "0 30 9 * * *" - - // Runs at 9:30 AM every week day - // "schedule": "0 30 9 * * 1-5" -} \ No newline at end of file diff --git a/src/EventsProcessor/Startup.cs b/src/EventsProcessor/Startup.cs new file mode 100644 index 0000000000..d52d9bc027 --- /dev/null +++ b/src/EventsProcessor/Startup.cs @@ -0,0 +1,34 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.IdentityModel.Logging; + +namespace Bit.EventsProcessor +{ + public class Startup + { + public Startup(IHostingEnvironment env, IConfiguration configuration) + { + Configuration = configuration; + Environment = env; + } + + public IConfiguration Configuration { get; } + public IHostingEnvironment Environment { get; set; } + + public void ConfigureServices(IServiceCollection services) + { + services.AddOptions(); + services.AddHostedService(); + } + + public void Configure(IApplicationBuilder app, IHostingEnvironment env) + { + if(env.IsDevelopment()) + { + IdentityModelEventSource.ShowPII = true; + } + } + } +} diff --git a/src/EventsProcessor/appsettings.Production.json b/src/EventsProcessor/appsettings.Production.json new file mode 100644 index 0000000000..0f530c147c --- /dev/null +++ b/src/EventsProcessor/appsettings.Production.json @@ -0,0 +1,2 @@ +{ +} diff --git a/src/EventsProcessor/appsettings.json b/src/EventsProcessor/appsettings.json index 32dd707c6f..a5abe9f788 100644 --- a/src/EventsProcessor/appsettings.json +++ b/src/EventsProcessor/appsettings.json @@ -1,3 +1,3 @@ { - "AzureWebJobsStorage": "" + "azureStorageConnectionString": "SECRET" }