Upload to Azure in parallel (#952)

This commit is contained in:
Nate McMaster 2018-03-13 10:01:55 -07:00 committed by GitHub
parent b030623b11
commit a3f0260ee8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 31 deletions

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
@ -43,6 +44,11 @@ namespace RepoTasks
[Required]
public string ContainerName { get; set; }
/// <summary>
/// The maximum number of parallel pushes.
/// </summary>
public int MaxParallelism { get; set; } = 8;
public void Cancel() => _cts.Cancel();
public override bool Execute()
@ -57,44 +63,82 @@ namespace RepoTasks
var container = client.GetContainerReference(ContainerName);
var ctx = new OperationContext();
var tasks = new List<Task>();
foreach (var item in Files)
using (var throttler = new SemaphoreSlim(MaxParallelism))
{
// normalize slashes
var dest = item.GetMetadata("RelativeBlobPath")
.Replace('\\', '/')
.Replace("//", "/");
var contentType = item.GetMetadata("ContentType");
var cacheControl = item.GetMetadata("CacheControl");
if (string.IsNullOrEmpty(dest))
foreach (var item in Files)
{
Log.LogError($"Item {item.ItemSpec} is missing required metadata 'RelativeBlobPath'");
return false;
_cts.Token.ThrowIfCancellationRequested();
await throttler.WaitAsync( _cts.Token);
tasks.Add(
Task.Run(async () =>
{
try
{
await PushFileAsync(ctx, container, item, _cts.Token);
}
finally
{
throttler.Release();
}
}));
}
var blob = container.GetBlockBlobReference(dest);
if (!string.IsNullOrEmpty(cacheControl))
{
blob.Properties.CacheControl = cacheControl;
}
if (!string.IsNullOrEmpty(contentType))
{
blob.Properties.ContentType = contentType;
}
Log.LogMessage(MessageImportance.High, $"Publishing {item.ItemSpec} to https://{AccountName}.blob.core.windows.net/{ContainerName}/{dest}");
var accessCondition = bool.TryParse(item.GetMetadata("Overwrite"), out var overwrite) && overwrite
? AccessCondition.GenerateEmptyCondition()
: AccessCondition.GenerateIfNotExistsCondition();
await blob.UploadFromFileAsync(item.ItemSpec, accessCondition, new BlobRequestOptions(), ctx, _cts.Token);
await Task.WhenAll(tasks);
}
return true;
return !Log.HasLoggedErrors;
}
private async Task PushFileAsync(OperationContext ctx, CloudBlobContainer container, ITaskItem item, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
// normalize slashes
var dest = item.GetMetadata("RelativeBlobPath")
.Replace('\\', '/')
.Replace("//", "/");
var contentType = item.GetMetadata("ContentType");
var cacheControl = item.GetMetadata("CacheControl");
if (string.IsNullOrEmpty(dest))
{
Log.LogError($"Item {item.ItemSpec} is missing required metadata 'RelativeBlobPath'");
return;
}
var blob = container.GetBlockBlobReference(dest);
if (!string.IsNullOrEmpty(cacheControl))
{
blob.Properties.CacheControl = cacheControl;
}
if (!string.IsNullOrEmpty(contentType))
{
blob.Properties.ContentType = contentType;
}
Log.LogMessage(MessageImportance.High, $"Beginning push of {item.ItemSpec} to https://{AccountName}.blob.core.windows.net/{ContainerName}/{dest}");
var accessCondition = bool.TryParse(item.GetMetadata("Overwrite"), out var overwrite) && overwrite
? AccessCondition.GenerateEmptyCondition()
: AccessCondition.GenerateIfNotExistsCondition();
try
{
await blob.UploadFromFileAsync(item.ItemSpec, accessCondition, new BlobRequestOptions(), ctx, cancellationToken);
}
catch (Exception ex)
{
Log.LogError($"Error publishing {item.ItemSpec}: {ex}");
return;
}
finally
{
Log.LogMessage(MessageImportance.High, $"Done publishing {item.ItemSpec} to https://{AccountName}.blob.core.windows.net/{ContainerName}/{dest}");
}
}
}
}