Implement Begin/End Read/Write methods in LibuvStream

This commit is contained in:
Stephen Halter 2016-07-28 16:02:30 -07:00
parent 8836eec7d8
commit 4337d2a4a7
2 changed files with 90 additions and 35 deletions

View File

@ -132,5 +132,83 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
{
return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count);
}
#if NET451
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override int EndRead(IAsyncResult asyncResult)
{
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(task2.Result);
}
}, tcs, cancellationToken);
return tcs.Task;
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default(CancellationToken), state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
#endif
}
}

View File

@ -24,8 +24,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
private bool _canWrite = true;
private object _writeLock = new object();
public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger)
{
_connectionId = connectionId;
@ -36,54 +34,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
public void Write(ArraySegment<byte> buffer, bool chunk)
{
lock (_writeLock)
if (buffer.Count == 0 )
{
if (buffer.Count == 0 )
{
return;
}
return;
}
try
{
if (!_canWrite)
{
return;
}
if (chunk && buffer.Array != null)
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
}
if (chunk && buffer.Array != null)
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
}
_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
if (chunk && buffer.Array != null)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
}
}
catch (Exception ex)
{
_canWrite = false;
_logger.ConnectionError(_connectionId, ex);
}
if (chunk && buffer.Array != null)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
}
}
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
#if NET451
Write(buffer, chunk);
return TaskUtilities.CompletedTask;
#else
if (chunk && buffer.Array != null)
{
return WriteAsyncChunked(buffer, cancellationToken);
}
return _outputStream.WriteAsync(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count, cancellationToken);
#endif
}
private async Task WriteAsyncChunked(ArraySegment<byte> buffer, CancellationToken cancellationToken)