mirror of
https://github.com/bitwarden/server.git
synced 2025-07-03 00:52:49 -05:00
Added cipher service with bulk import to account controller
This commit is contained in:
@ -4,6 +4,7 @@ using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Documents.Client;
|
||||
using Bit.Core.Domains;
|
||||
using Bit.Core.Repositories.DocumentDB.Utilities;
|
||||
|
||||
namespace Bit.Core.Repositories.DocumentDB
|
||||
{
|
||||
@ -15,26 +16,53 @@ namespace Bit.Core.Repositories.DocumentDB
|
||||
|
||||
public async Task UpdateDirtyCiphersAsync(IEnumerable<dynamic> ciphers)
|
||||
{
|
||||
// Make sure we are dealing with cipher types since we accept any via dynamic.
|
||||
var cleanedCiphers = ciphers.Where(c => c is Cipher);
|
||||
if(cleanedCiphers.Count() == 0)
|
||||
await DocumentDBHelpers.QueryWithRetryAsync(async () =>
|
||||
{
|
||||
return;
|
||||
}
|
||||
// Make sure we are dealing with cipher types since we accept any via dynamic.
|
||||
var cleanedCiphers = ciphers.Where(c => c is Cipher);
|
||||
if(cleanedCiphers.Count() == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var userId = ((Cipher)cleanedCiphers.First()).UserId;
|
||||
StoredProcedureResponse<int> sprocResponse = await Client.ExecuteStoredProcedureAsync<int>(
|
||||
ResolveSprocIdLink(userId, "bulkUpdateDirtyCiphers"),
|
||||
// Do sets of 50. Recursion will handle the rest below.
|
||||
cleanedCiphers.Take(50),
|
||||
userId,
|
||||
Cipher.TypeValue);
|
||||
var userId = ((Cipher)cleanedCiphers.First()).UserId;
|
||||
StoredProcedureResponse<int> sprocResponse = await Client.ExecuteStoredProcedureAsync<int>(
|
||||
ResolveSprocIdLink(userId, "bulkUpdateDirtyCiphers"),
|
||||
// Do sets of 50. Recursion will handle the rest below.
|
||||
cleanedCiphers.Take(50),
|
||||
userId);
|
||||
|
||||
var replacedCount = sprocResponse.Response;
|
||||
if(replacedCount != cleanedCiphers.Count())
|
||||
var replacedCount = sprocResponse.Response;
|
||||
if(replacedCount != cleanedCiphers.Count())
|
||||
{
|
||||
await UpdateDirtyCiphersAsync(cleanedCiphers.Skip(replacedCount));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEnumerable<dynamic> ciphers)
|
||||
{
|
||||
await DocumentDBHelpers.QueryWithRetryAsync(async () =>
|
||||
{
|
||||
await UpdateDirtyCiphersAsync(cleanedCiphers.Skip(replacedCount));
|
||||
}
|
||||
// Make sure we are dealing with cipher types since we accept any via dynamic.
|
||||
var cleanedCiphers = ciphers.Where(c => c is Cipher);
|
||||
if(cleanedCiphers.Count() == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var userId = ((Cipher)cleanedCiphers.First()).UserId;
|
||||
StoredProcedureResponse<int> sprocResponse = await Client.ExecuteStoredProcedureAsync<int>(
|
||||
ResolveSprocIdLink(userId, "bulkCreate"),
|
||||
// Do sets of 50. Recursion will handle the rest below.
|
||||
cleanedCiphers.Take(50));
|
||||
|
||||
var createdCount = sprocResponse.Response;
|
||||
if(createdCount != cleanedCiphers.Count())
|
||||
{
|
||||
await CreateAsync(cleanedCiphers.Skip(createdCount));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* This script called as stored procedure to import lots of documents in one batch.
|
||||
* The script sets response body to the number of docs imported and is called multiple times
|
||||
* by the client until total number of docs desired by the client is imported.
|
||||
* @param {Object[]} docs - Array of documents to import.
|
||||
*/
|
||||
|
||||
function bulkCreate(docs) {
|
||||
var collection = getContext().getCollection();
|
||||
var collectionLink = collection.getSelfLink();
|
||||
|
||||
// The count of imported docs, also used as current doc index.
|
||||
var count = 0;
|
||||
|
||||
// Validate input.
|
||||
if (!docs) throw new Error('The array is undefined or null.');
|
||||
|
||||
var docsLength = docs.length;
|
||||
if (docsLength == 0) {
|
||||
getContext().getResponse().setBody(0);
|
||||
return;
|
||||
}
|
||||
|
||||
// Call the CRUD API to create a document.
|
||||
tryCreate(docs[count], callback);
|
||||
|
||||
// Note that there are 2 exit conditions:
|
||||
// 1) The createDocument request was not accepted.
|
||||
// In this case the callback will not be called, we just call setBody and we are done.
|
||||
// 2) The callback was called docs.length times.
|
||||
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
|
||||
function tryCreate(doc, callback) {
|
||||
var isAccepted = collection.createDocument(collectionLink, doc, callback);
|
||||
|
||||
// If the request was accepted, callback will be called.
|
||||
// Otherwise report current count back to the client,
|
||||
// which will call the script again with remaining set of docs.
|
||||
// This condition will happen when this stored procedure has been running too long
|
||||
// and is about to get cancelled by the server. This will allow the calling client
|
||||
// to resume this batch from the point we got to before isAccepted was set to false
|
||||
if (!isAccepted) getContext().getResponse().setBody(count);
|
||||
}
|
||||
|
||||
// This is called when collection.createDocument is done and the document has been persisted.
|
||||
function callback(err, doc, options) {
|
||||
if (err) throw err;
|
||||
|
||||
// One more document has been inserted, increment the count.
|
||||
count++;
|
||||
|
||||
if (count >= docsLength) {
|
||||
// If we have created all documents, we are done. Just set the response.
|
||||
getContext().getResponse().setBody(count);
|
||||
}
|
||||
else {
|
||||
// Create next document.
|
||||
tryCreate(docs[count], callback);
|
||||
}
|
||||
}
|
||||
}
|
@ -10,7 +10,7 @@ function bulkUpdateDirtyCiphers(ciphers, userId) {
|
||||
|
||||
// Validate input.
|
||||
if (!ciphers) {
|
||||
throw new Error("The ciphers array is undefined or null.");
|
||||
throw new Error('The ciphers array is undefined or null.');
|
||||
}
|
||||
|
||||
var ciphersLength = ciphers.length;
|
||||
|
@ -1,6 +1,8 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Bit.Core.Repositories.DocumentDB.Utilities;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.Client;
|
||||
|
||||
namespace Bit.Core.Repositories.DocumentDB
|
||||
@ -26,7 +28,10 @@ namespace Bit.Core.Repositories.DocumentDB
|
||||
|
||||
public async Task ReplaceAndDirtyCiphersAsync(Domains.User user)
|
||||
{
|
||||
await Client.ExecuteStoredProcedureAsync<Domains.User>(ResolveSprocIdLink(user, "replaceUserAndDirtyCiphers"), user);
|
||||
await DocumentDBHelpers.QueryWithRetryAsync(async () =>
|
||||
{
|
||||
await Client.ExecuteStoredProcedureAsync<Domains.User>(ResolveSprocIdLink(user, "replaceUserAndDirtyCiphers"), user);
|
||||
});
|
||||
}
|
||||
|
||||
public override async Task DeleteByIdAsync(string id)
|
||||
|
@ -1,9 +1,11 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Azure.Documents;
|
||||
using Microsoft.Azure.Documents.Client;
|
||||
|
||||
namespace Bit.Core.Repositories.DocumentDB.Utilities
|
||||
{
|
||||
public class DocumentClientHelpers
|
||||
public class DocumentDBHelpers
|
||||
{
|
||||
public static DocumentClient InitClient(GlobalSettings.DocumentDBSettings settings)
|
||||
{
|
||||
@ -29,6 +31,43 @@ namespace Bit.Core.Repositories.DocumentDB.Utilities
|
||||
return client;
|
||||
}
|
||||
|
||||
public static async Task QueryWithRetryAsync(Func<Task> func)
|
||||
{
|
||||
var queryComplete = false;
|
||||
while(!queryComplete)
|
||||
{
|
||||
try
|
||||
{
|
||||
await func();
|
||||
queryComplete = true;
|
||||
}
|
||||
catch(DocumentClientException e)
|
||||
{
|
||||
await HandleDocumentClientExceptionAsync(e);
|
||||
}
|
||||
catch(AggregateException e)
|
||||
{
|
||||
var docEx = e.InnerException as DocumentClientException;
|
||||
if(docEx != null)
|
||||
{
|
||||
await HandleDocumentClientExceptionAsync(docEx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task HandleDocumentClientExceptionAsync(DocumentClientException e)
|
||||
{
|
||||
var statusCode = (int)e.StatusCode;
|
||||
if(statusCode == 429 || statusCode == 503)
|
||||
{
|
||||
await Task.Delay(e.RetryAfter);
|
||||
}
|
||||
else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private static Func<object, string> GetPartitionKeyExtractor()
|
||||
{
|
||||
return doc =>
|
@ -6,5 +6,6 @@ namespace Bit.Core.Repositories
|
||||
public interface ICipherRepository
|
||||
{
|
||||
Task UpdateDirtyCiphersAsync(IEnumerable<dynamic> ciphers);
|
||||
Task CreateAsync(IEnumerable<dynamic> ciphers);
|
||||
}
|
||||
}
|
||||
|
54
src/Core/Services/CipherService.cs
Normal file
54
src/Core/Services/CipherService.cs
Normal file
@ -0,0 +1,54 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Bit.Core.Domains;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
namespace Bit.Core.Services
|
||||
{
|
||||
public class CipherService : ICipherService
|
||||
{
|
||||
private readonly IFolderRepository _folderRepository;
|
||||
private readonly ICipherRepository _cipherRepository;
|
||||
|
||||
public CipherService(
|
||||
IFolderRepository folderRepository,
|
||||
ICipherRepository cipherRepository)
|
||||
{
|
||||
_folderRepository = folderRepository;
|
||||
_cipherRepository = cipherRepository;
|
||||
}
|
||||
|
||||
public async Task ImportCiphersAsync(
|
||||
List<Folder> folders,
|
||||
List<Site> sites,
|
||||
IEnumerable<KeyValuePair<int, int>> siteRelationships)
|
||||
{
|
||||
// create all the folders
|
||||
var folderTasks = new List<Task>();
|
||||
foreach(var folder in folders)
|
||||
{
|
||||
folderTasks.Add(_folderRepository.CreateAsync(folder));
|
||||
}
|
||||
await Task.WhenAll(folderTasks);
|
||||
|
||||
// associate the newly created folders to the sites
|
||||
foreach(var relationship in siteRelationships)
|
||||
{
|
||||
var site = sites.ElementAtOrDefault(relationship.Key);
|
||||
var folder = folders.ElementAtOrDefault(relationship.Value);
|
||||
|
||||
if(site == null || folder == null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
site.FolderId = folder.Id;
|
||||
}
|
||||
|
||||
// create all the sites
|
||||
await _cipherRepository.CreateAsync(sites);
|
||||
}
|
||||
}
|
||||
}
|
11
src/Core/Services/ICipherService.cs
Normal file
11
src/Core/Services/ICipherService.cs
Normal file
@ -0,0 +1,11 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Bit.Core.Domains;
|
||||
|
||||
namespace Bit.Core.Services
|
||||
{
|
||||
public interface ICipherService
|
||||
{
|
||||
Task ImportCiphersAsync(List<Folder> folders, List<Site> sites, IEnumerable<KeyValuePair<int, int>> siteRelationships);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user