132 lines
4.2 KiB
C#
132 lines
4.2 KiB
C#
// Copyright (c) .NET Foundation. All rights reserved.
|
|
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
|
|
|
using System;
|
|
using System.IO;
|
|
using System.Text;
|
|
using System.Text.RegularExpressions;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace Microsoft.AspNetCore.NodeServices.Util
|
|
{
|
|
/// <summary>
|
|
/// Wraps a <see cref="StreamReader"/> to expose an evented API, issuing notifications
|
|
/// when the stream emits partial lines, completed lines, or finally closes.
|
|
/// </summary>
|
|
internal class EventedStreamReader
|
|
{
|
|
public delegate void OnReceivedChunkHandler(ArraySegment<char> chunk);
|
|
public delegate void OnReceivedLineHandler(string line);
|
|
public delegate void OnStreamClosedHandler();
|
|
|
|
public event OnReceivedChunkHandler OnReceivedChunk;
|
|
public event OnReceivedLineHandler OnReceivedLine;
|
|
public event OnStreamClosedHandler OnStreamClosed;
|
|
|
|
private readonly StreamReader _streamReader;
|
|
private readonly StringBuilder _linesBuffer;
|
|
|
|
public EventedStreamReader(StreamReader streamReader)
|
|
{
|
|
_streamReader = streamReader ?? throw new ArgumentNullException(nameof(streamReader));
|
|
_linesBuffer = new StringBuilder();
|
|
Task.Factory.StartNew(Run);
|
|
}
|
|
|
|
public Task<Match> WaitForMatch(Regex regex)
|
|
{
|
|
var tcs = new TaskCompletionSource<Match>();
|
|
var completionLock = new object();
|
|
|
|
OnReceivedLineHandler onReceivedLineHandler = null;
|
|
OnStreamClosedHandler onStreamClosedHandler = null;
|
|
|
|
void ResolveIfStillPending(Action applyResolution)
|
|
{
|
|
lock (completionLock)
|
|
{
|
|
if (!tcs.Task.IsCompleted)
|
|
{
|
|
OnReceivedLine -= onReceivedLineHandler;
|
|
OnStreamClosed -= onStreamClosedHandler;
|
|
applyResolution();
|
|
}
|
|
}
|
|
}
|
|
|
|
onReceivedLineHandler = line =>
|
|
{
|
|
var match = regex.Match(line);
|
|
if (match.Success)
|
|
{
|
|
ResolveIfStillPending(() => tcs.SetResult(match));
|
|
}
|
|
};
|
|
|
|
onStreamClosedHandler = () =>
|
|
{
|
|
ResolveIfStillPending(() => tcs.SetException(new EndOfStreamException()));
|
|
};
|
|
|
|
OnReceivedLine += onReceivedLineHandler;
|
|
OnStreamClosed += onStreamClosedHandler;
|
|
|
|
return tcs.Task;
|
|
}
|
|
|
|
private async Task Run()
|
|
{
|
|
var buf = new char[8 * 1024];
|
|
while (true)
|
|
{
|
|
var chunkLength = await _streamReader.ReadAsync(buf, 0, buf.Length);
|
|
if (chunkLength == 0)
|
|
{
|
|
OnClosed();
|
|
break;
|
|
}
|
|
|
|
OnChunk(new ArraySegment<char>(buf, 0, chunkLength));
|
|
|
|
int lineBreakPos = -1;
|
|
int startPos = 0;
|
|
|
|
// get all the newlines
|
|
while ((lineBreakPos = Array.IndexOf(buf, '\n', startPos, chunkLength - startPos)) >= 0 && startPos < chunkLength)
|
|
{
|
|
var length = (lineBreakPos + 1) - startPos;
|
|
_linesBuffer.Append(buf, startPos, length);
|
|
OnCompleteLine(_linesBuffer.ToString());
|
|
_linesBuffer.Clear();
|
|
startPos = lineBreakPos + 1;
|
|
}
|
|
|
|
// get the rest
|
|
if (lineBreakPos < 0 && startPos < chunkLength)
|
|
{
|
|
_linesBuffer.Append(buf, startPos, chunkLength);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void OnChunk(ArraySegment<char> chunk)
|
|
{
|
|
var dlg = OnReceivedChunk;
|
|
dlg?.Invoke(chunk);
|
|
}
|
|
|
|
private void OnCompleteLine(string line)
|
|
{
|
|
var dlg = OnReceivedLine;
|
|
dlg?.Invoke(line);
|
|
}
|
|
|
|
private void OnClosed()
|
|
{
|
|
var dlg = OnStreamClosed;
|
|
dlg?.Invoke();
|
|
}
|
|
}
|
|
}
|