Drop messages in azure loggers if queue is full (#494)

This commit is contained in:
Pavel Krymets 2018-11-14 09:59:30 -08:00 committed by GitHub
parent fc4e64de38
commit ef4b214125
2 changed files with 28 additions and 16 deletions

View File

@ -18,6 +18,8 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Internal
private readonly int? _batchSize;
private readonly IDisposable _optionsChangeToken;
private int _messagesDropped;
private BlockingCollection<LogMessage> _messageQueue;
private Task _outputTask;
private CancellationTokenSource _cancellationTokenSource;
@ -85,6 +87,16 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Internal
limit--;
}
var messagesDropped = Interlocked.Exchange(ref _messagesDropped, 0);
if (messagesDropped != 0)
{
_currentBatch.Add(new LogMessage()
{
Message = $"{messagesDropped} message(s) dropped because of queue size limit. Increase the queue size or decrease logging verbosity to avoid this.{Environment.NewLine}",
Timestamp = DateTimeOffset.Now
});
}
if (_currentBatch.Count > 0)
{
try
@ -98,8 +110,10 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Internal
_currentBatch.Clear();
}
await IntervalAsync(_interval, _cancellationTokenSource.Token);
else
{
await IntervalAsync(_interval, _cancellationTokenSource.Token);
}
}
}
@ -114,7 +128,10 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Internal
{
try
{
_messageQueue.Add(new LogMessage { Message = message, Timestamp = timestamp }, _cancellationTokenSource.Token);
if (!_messageQueue.TryAdd(new LogMessage { Message = message, Timestamp = timestamp }, millisecondsTimeout: 0, cancellationToken: _cancellationTokenSource.Token))
{
Interlocked.Increment(ref _messagesDropped);
}
}
catch
{
@ -172,4 +189,4 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Internal
_scopeProvider = scopeProvider;
}
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -77,21 +77,16 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Test
provider.IntervalControl.Resume();
await provider.IntervalControl.Pause;
Assert.Single(provider.Batches);
Assert.Equal(2, provider.Batches.Count);
Assert.Single(provider.Batches[0]);
Assert.Equal("2016-05-04 03:02:01.000 +00:00 [Information] Cat: Info message" + _nl, provider.Batches[0][0].Message);
provider.IntervalControl.Resume();
await provider.IntervalControl.Pause;
Assert.Equal(2, provider.Batches.Count);
Assert.Single(provider.Batches[1]);
Assert.Equal("2016-05-04 04:02:01.000 +00:00 [Error] Cat: Error message" + _nl, provider.Batches[1][0].Message);
}
[Fact]
public async Task BlocksWhenReachingMaxQueue()
public async Task DropsMessagesWhenReachingMaxQueue()
{
var provider = new TestBatchingLoggingProvider(maxQueueSize: 1);
var logger = (BatchingLogger)provider.CreateLogger("Cat");
@ -99,14 +94,14 @@ namespace Microsoft.Extensions.Logging.AzureAppServices.Test
await provider.IntervalControl.Pause;
logger.Log(_timestampOne, LogLevel.Information, 0, "Info message", null, (state, ex) => state);
var task = Task.Run(() => logger.Log(_timestampOne.AddHours(1), LogLevel.Error, 0, "Error message", null, (state, ex) => state));
Assert.False(task.Wait(1000));
logger.Log(_timestampOne.AddHours(1), LogLevel.Error, 0, "Error message", null, (state, ex) => state);
provider.IntervalControl.Resume();
await provider.IntervalControl.Pause;
Assert.True(task.Wait(1000));
Assert.Equal(2, provider.Batches[0].Length);
Assert.Equal("2016-05-04 03:02:01.000 +00:00 [Information] Cat: Info message" + _nl, provider.Batches[0][0].Message);
Assert.Equal("1 message(s) dropped because of queue size limit. Increase the queue size or decrease logging verbosity to avoid this." + _nl, provider.Batches[0][1].Message);
}
private class TestBatchingLoggingProvider: BatchingLoggerProvider