// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.IO.Pipelines; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.SignalR.Protocol; using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace Microsoft.AspNetCore.SignalR.Client.Tests { internal class TestConnection : ConnectionContext { private readonly bool _autoHandshake; private readonly TaskCompletionSource _started = new TaskCompletionSource(); private readonly TaskCompletionSource _disposed = new TaskCompletionSource(); private int _disposeCount = 0; public Task Started => _started.Task; public Task Disposed => _disposed.Task; private readonly Func _onStart; private readonly Func _onDispose; public override string ConnectionId { get; set; } public IDuplexPipe Application { get; } public override IDuplexPipe Transport { get; set; } public override IFeatureCollection Features { get; } = new FeatureCollection(); public int DisposeCount => _disposeCount; public override IDictionary Items { get; set; } = new ConnectionItems(); public TestConnection(Func onStart = null, Func onDispose = null, bool autoHandshake = true) { _autoHandshake = autoHandshake; _onStart = onStart ?? (() => Task.CompletedTask); _onDispose = onDispose ?? (() => Task.CompletedTask); var options = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); var pair = DuplexPipe.CreateConnectionPair(options, options); Application = pair.Application; Transport = pair.Transport; Application.Input.OnWriterCompleted((ex, _) => { Application.Output.Complete(); }, null); } public Task DisposeAsync() => DisposeCoreAsync(); public async Task StartAsync(TransferFormat transferFormat = TransferFormat.Binary) { _started.TrySetResult(null); await _onStart(); if (_autoHandshake) { // We can't await this as it will block StartAsync which will block // HubConnection.StartAsync which sends the Handshake in the first place! _ = ReadHandshakeAndSendResponseAsync(); } return this; } public async Task ReadHandshakeAndSendResponseAsync(int minorVersion = 0) { var s = await ReadSentTextMessageAsync(); byte[] response; var output = MemoryBufferWriter.Get(); try { HandshakeProtocol.WriteResponseMessage(new HandshakeResponseMessage(minorVersion), output); response = output.ToArray(); } finally { MemoryBufferWriter.Return(output); } await Application.Output.WriteAsync(response); return s; } public Task ReceiveJsonMessage(object jsonObject) { var json = JsonConvert.SerializeObject(jsonObject, Formatting.None); var bytes = FormatMessageToArray(Encoding.UTF8.GetBytes(json)); return Application.Output.WriteAsync(bytes).AsTask(); } public Task ReceiveTextAsync(string rawText) { return ReceiveBytesAsync(Encoding.UTF8.GetBytes(rawText)); } public Task ReceiveBytesAsync(byte[] bytes) { return Application.Output.WriteAsync(bytes).AsTask(); } public async Task ReadSentTextMessageAsync(bool ignorePings = true) { // Read a single text message from the Application Input pipe while (true) { var result = await ReadSentTextMessageAsyncInner(); var receivedMessageType = (int?)JObject.Parse(result)["type"]; if (ignorePings && receivedMessageType == HubProtocolConstants.PingMessageType) { continue; } return result; } } private async Task ReadSentTextMessageAsyncInner() { while (true) { var result = await Application.Input.ReadAsync(); var buffer = result.Buffer; var consumed = buffer.Start; try { if (TextMessageParser.TryParseMessage(ref buffer, out var payload)) { consumed = buffer.Start; return Encoding.UTF8.GetString(payload.ToArray()); } else if (result.IsCompleted) { return null; } } finally { Application.Input.AdvanceTo(consumed); } } } public async Task> ReadAllSentMessagesAsync(bool ignorePings = true) { if (!Disposed.IsCompleted) { throw new InvalidOperationException("The connection must be stopped before this method can be used."); } var results = new List(); while (true) { var message = await ReadSentTextMessageAsync(ignorePings); if (message == null) { break; } results.Add(message); } return results; } public void CompleteFromTransport(Exception ex = null) { Application.Output.Complete(ex); } private async Task DisposeCoreAsync(Exception ex = null) { Interlocked.Increment(ref _disposeCount); _disposed.TrySetResult(null); await _onDispose(); // Simulate HttpConnection's behavior by Completing the Transport pipe. Transport.Input.Complete(); Transport.Output.Complete(); } private byte[] FormatMessageToArray(byte[] message) { var output = new MemoryStream(); output.Write(message, 0, message.Length); output.WriteByte(TextMessageFormatter.RecordSeparator); return output.ToArray(); } } }