Remove SocketInputStream (#753).
This commit is contained in:
parent
23090e68dd
commit
179b57b01f
|
|
@ -14,7 +14,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
{
|
||||
private readonly string _connectionId;
|
||||
private readonly Stream _filteredStream;
|
||||
private readonly Stream _socketInputStream;
|
||||
private readonly IKestrelTrace _log;
|
||||
private readonly MemoryPool _memory;
|
||||
private MemoryPoolBlock _block;
|
||||
|
|
@ -33,7 +32,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
_connectionId = connectionId;
|
||||
_log = logger;
|
||||
_filteredStream = filteredStream;
|
||||
_socketInputStream = new SocketInputStream(SocketInput);
|
||||
_memory = memory;
|
||||
}
|
||||
|
||||
|
|
@ -45,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
{
|
||||
_block = _memory.Lease();
|
||||
// Use pooled block for copy
|
||||
return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
|
||||
return FilterInputAsync(_block).ContinueWith((task, state) =>
|
||||
{
|
||||
((FilteredStreamAdapter)state).OnStreamClose(task);
|
||||
}, this);
|
||||
|
|
@ -60,6 +58,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
{
|
||||
SocketInput.Dispose();
|
||||
}
|
||||
|
||||
private async Task FilterInputAsync(MemoryPoolBlock block)
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await _filteredStream.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0)
|
||||
{
|
||||
SocketInput.IncomingData(block.Array, block.Data.Offset, bytesRead);
|
||||
}
|
||||
}
|
||||
|
||||
private void OnStreamClose(Task copyAsyncTask)
|
||||
{
|
||||
|
|
@ -82,7 +89,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
|||
|
||||
try
|
||||
{
|
||||
_socketInputStream.Dispose();
|
||||
SocketInput.IncomingFin();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1,93 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
||||
{
|
||||
/// <summary>
|
||||
/// This is a write-only stream that copies what is written into a
|
||||
/// <see cref="SocketInput"/> object. This is used as an argument to
|
||||
/// <see cref="Stream.CopyToAsync(Stream)" /> so input filtered by a
|
||||
/// ConnectionFilter (e.g. SslStream) can be consumed by <see cref="Frame"/>.
|
||||
/// </summary>
|
||||
public class SocketInputStream : Stream
|
||||
{
|
||||
private readonly SocketInput _socketInput;
|
||||
|
||||
public SocketInputStream(SocketInput socketInput)
|
||||
{
|
||||
_socketInput = socketInput;
|
||||
}
|
||||
|
||||
public override bool CanRead => false;
|
||||
|
||||
public override bool CanSeek => false;
|
||||
|
||||
public override bool CanWrite => true;
|
||||
|
||||
public override long Length
|
||||
{
|
||||
get
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
set
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
// No-op
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
_socketInput.IncomingData(buffer, offset, count);
|
||||
}
|
||||
|
||||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
|
||||
{
|
||||
Write(buffer, offset, count);
|
||||
return TaskUtilities.CompletedTask;
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
// Close _socketInput with a fake zero-length write that will result in a zero-length read.
|
||||
_socketInput.IncomingFin();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Filter
|
||||
{
|
||||
public static class StreamExtensions
|
||||
{
|
||||
public static async Task CopyToAsync(this Stream source, Stream destination, MemoryPoolBlock block)
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await source.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0)
|
||||
{
|
||||
await destination.WriteAsync(block.Array, block.Data.Offset, bytesRead);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
private ConnectionFilterContext _filterContext;
|
||||
private LibuvStream _libuvStream;
|
||||
private FilteredStreamAdapter _filteredStreamAdapter;
|
||||
private Task _readInputContinuation;
|
||||
private Task _readInputTask;
|
||||
|
||||
private readonly SocketInput _rawSocketInput;
|
||||
private readonly SocketOutput _rawSocketOutput;
|
||||
|
|
@ -181,7 +181,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
{
|
||||
_filteredStreamAdapter.Abort();
|
||||
_rawSocketInput.IncomingFin();
|
||||
_readInputContinuation.ContinueWith((task, state) =>
|
||||
_readInputTask.ContinueWith((task, state) =>
|
||||
{
|
||||
((Connection)state)._filterContext.Connection.Dispose();
|
||||
((Connection)state)._filteredStreamAdapter.Dispose();
|
||||
|
|
@ -221,7 +221,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
|
|||
SocketInput = _filteredStreamAdapter.SocketInput;
|
||||
SocketOutput = _filteredStreamAdapter.SocketOutput;
|
||||
|
||||
_readInputContinuation = _filteredStreamAdapter.ReadInputAsync();
|
||||
_readInputTask = _filteredStreamAdapter.ReadInputAsync();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue