Implement APM methods in streams (#1335).

This commit is contained in:
Cesar Blum Silveira 2017-02-08 16:39:14 -08:00
parent aacb7d7453
commit 21be33023c
4 changed files with 133 additions and 42 deletions

View File

@ -132,5 +132,83 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
{
return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count);
}
#if NET451
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override int EndRead(IAsyncResult asyncResult)
{
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(task2.Result);
}
}, tcs, cancellationToken);
return tcs.Task;
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
#endif
}
}

View File

@ -24,8 +24,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
private bool _canWrite = true;
private object _writeLock = new object();
public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger)
{
_connectionId = connectionId;
@ -36,54 +34,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
public void Write(ArraySegment<byte> buffer, bool chunk)
{
lock (_writeLock)
if (buffer.Count == 0 )
{
if (buffer.Count == 0 )
{
return;
}
return;
}
try
{
if (!_canWrite)
{
return;
}
if (chunk && buffer.Array != null)
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
}
if (chunk && buffer.Array != null)
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
}
_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
if (chunk && buffer.Array != null)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
}
}
catch (Exception ex)
{
_canWrite = false;
_logger.ConnectionError(_connectionId, ex);
}
if (chunk && buffer.Array != null)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
}
}
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
#if NET451
Write(buffer, chunk);
return TaskUtilities.CompletedTask;
#else
if (chunk && buffer.Array != null)
{
return WriteAsyncChunked(buffer, cancellationToken);
}
return _outputStream.WriteAsync(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count, cancellationToken);
#endif
}
private async Task WriteAsyncChunked(ArraySegment<byte> buffer, CancellationToken cancellationToken)
@ -124,7 +101,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
block = block.Next;
returnBlock.Pool.Return(returnBlock);
}
if (_canWrite)
{
try

View File

@ -75,8 +75,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
#if NET451
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
ValidateState(default(CancellationToken));
var task = ReadAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
@ -92,11 +90,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
ValidateState(cancellationToken);
var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
task.AsTask().ContinueWith((task2, state2) =>
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)

View File

@ -85,6 +85,46 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_frameControl.Write(new ArraySegment<byte>(buffer, offset, count));
}
#if NET451
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
#endif
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var task = ValidateState(cancellationToken);