From e9b84a298aa8db641295dc977e05430a3ded3bf8 Mon Sep 17 00:00:00 2001 From: Marcin Polewski Date: Mon, 30 Apr 2018 13:08:20 -0500 Subject: [PATCH] Sample generic host using MSMQ (#1381) --- Hosting.sln | 15 ++++ samples/SampleMsmqHost/MsmqConnection.cs | 80 ++++++++++++++++++ samples/SampleMsmqHost/MsmqOptions.cs | 15 ++++ samples/SampleMsmqHost/MsmqProcessor.cs | 37 +++++++++ samples/SampleMsmqHost/MsmqService.cs | 44 ++++++++++ samples/SampleMsmqHost/Program.cs | 86 ++++++++++++++++++++ samples/SampleMsmqHost/SampleMsmqHost.csproj | 25 ++++++ 7 files changed, 302 insertions(+) create mode 100644 samples/SampleMsmqHost/MsmqConnection.cs create mode 100644 samples/SampleMsmqHost/MsmqOptions.cs create mode 100644 samples/SampleMsmqHost/MsmqProcessor.cs create mode 100644 samples/SampleMsmqHost/MsmqService.cs create mode 100644 samples/SampleMsmqHost/Program.cs create mode 100644 samples/SampleMsmqHost/SampleMsmqHost.csproj diff --git a/Hosting.sln b/Hosting.sln index 3ec084ca0b..7c15df66f9 100644 --- a/Hosting.sln +++ b/Hosting.sln @@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CreateWebHostBuilderInvalid EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BuildWebHostInvalidSignature", "test\TestAssets\BuildWebHostInvalidSignature\BuildWebHostInvalidSignature.csproj", "{79D0E344-71C4-4D63-9632-01CC041C8788}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SampleMsmqHost", "samples\SampleMsmqHost\SampleMsmqHost.csproj", "{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -342,6 +344,18 @@ Global {79D0E344-71C4-4D63-9632-01CC041C8788}.Release|Mixed Platforms.Build.0 = Release|Any CPU {79D0E344-71C4-4D63-9632-01CC041C8788}.Release|x86.ActiveCfg = Release|Any CPU {79D0E344-71C4-4D63-9632-01CC041C8788}.Release|x86.Build.0 = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|x86.ActiveCfg = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|x86.Build.0 = Debug|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Any CPU.Build.0 = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|x86.ActiveCfg = Release|Any CPU + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -371,6 +385,7 @@ Global {F9074486-EAE4-4171-BC9E-1557C2A56DDE} = {A7270417-6BC6-4E3F-A96A-79193D16BB82} {ACB63E80-375C-4A8F-9210-8FD509148F31} = {FA7D2012-C1B4-4AF7-9ADD-381B2004EA16} {79D0E344-71C4-4D63-9632-01CC041C8788} = {FA7D2012-C1B4-4AF7-9ADD-381B2004EA16} + {C082BF14-2F9A-4D48-8539-AEF3A1B2043C} = {9C7520A0-F2EB-411C-8BB2-80B39C937217} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AABD536D-E05F-409B-A716-535E0C478076} diff --git a/samples/SampleMsmqHost/MsmqConnection.cs b/samples/SampleMsmqHost/MsmqConnection.cs new file mode 100644 index 0000000000..d959c1a272 --- /dev/null +++ b/samples/SampleMsmqHost/MsmqConnection.cs @@ -0,0 +1,80 @@ +using System; +using System.IO; +using System.Messaging; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace SampleMsmqHost +{ + public interface IMsmqConnection + { + void SendText(string text); + + Task ReceiveAsync(CancellationToken cancellationToken); + } + + public class MsmqConnection : IMsmqConnection, IDisposable + { + private readonly MessageQueue _queue; + + public MsmqOptions Options { get; } + + public ILogger Logger { get; } + + public MsmqConnection(IOptions options, ILogger logger) + { + Options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + Logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _queue = OpenQueue(); + } + + private MessageQueue OpenQueue() + { + Logger.LogInformation("Opening Queue: Path={0}; AccessMode={1};", Options.Path, Options.AccessMode); + + return new MessageQueue(Options.Path, Options.SharedModeDenyReceive, Options.EnableCache, Options.AccessMode); + } + + public void Dispose() + { + Logger.LogInformation("Closing Queue"); + + _queue?.Dispose(); + } + + public void SendText(string text) + { + // send the text message as UTF7 + using (var stream = new MemoryStream()) + using (var writer = new StreamWriter(stream, Encoding.UTF7)) + using (var message = new Message()) + { + writer.Write(text); + writer.Flush(); + + message.BodyStream = stream; + + _queue.Send(message); + } + } + + public async Task ReceiveAsync(CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(); + using (cancellationToken.Register(obj => ((TaskCompletionSource)obj).TrySetCanceled(), tcs)) + { + // wait for a message to arrive or cancellation + var receiveTask = Task.Factory.FromAsync(_queue.BeginReceive(), _queue.EndReceive); + if (receiveTask != await Task.WhenAny(receiveTask, tcs.Task)) + throw new OperationCanceledException(cancellationToken); + + return receiveTask.Result; + } + } + + } +} \ No newline at end of file diff --git a/samples/SampleMsmqHost/MsmqOptions.cs b/samples/SampleMsmqHost/MsmqOptions.cs new file mode 100644 index 0000000000..f34fefb986 --- /dev/null +++ b/samples/SampleMsmqHost/MsmqOptions.cs @@ -0,0 +1,15 @@ +using System.Messaging; + +namespace SampleMsmqHost +{ + public class MsmqOptions + { + public string Path { get; set; } + + public bool SharedModeDenyReceive { get; set; } = false; + + public bool EnableCache { get; set; } = false; + + public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive; + } +} \ No newline at end of file diff --git a/samples/SampleMsmqHost/MsmqProcessor.cs b/samples/SampleMsmqHost/MsmqProcessor.cs new file mode 100644 index 0000000000..f99c2b8757 --- /dev/null +++ b/samples/SampleMsmqHost/MsmqProcessor.cs @@ -0,0 +1,37 @@ +using System; +using System.IO; +using System.Messaging; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace SampleMsmqHost +{ + public interface IMsmqProcessor + { + Task ProcessMessageAsync(Message message, CancellationToken cancellationToken); + } + + public class MsmqProcessor : IMsmqProcessor + { + private readonly ILogger _logger; + + public MsmqProcessor(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken) + { + // we assume the message contains text encoded as UTF7 + using (var reader = new StreamReader(message.BodyStream, Encoding.UTF7)) + { + var text = await reader.ReadToEndAsync(); + + _logger.LogInformation("Received Message: {0}", text); + } + } + + } +} \ No newline at end of file diff --git a/samples/SampleMsmqHost/MsmqService.cs b/samples/SampleMsmqHost/MsmqService.cs new file mode 100644 index 0000000000..6c5c86a444 --- /dev/null +++ b/samples/SampleMsmqHost/MsmqService.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace SampleMsmqHost +{ + public class MsmqService : BackgroundService + { + public ILogger Logger { get; } + + public IMsmqConnection Connection { get; } + + public IMsmqProcessor Processor { get; } + + public MsmqService(ILogger logger, IMsmqConnection connection, IMsmqProcessor processor) + { + Logger = logger ?? throw new ArgumentNullException(nameof(logger)); + Connection = connection ?? throw new ArgumentNullException(nameof(connection)); + Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("Begin Receive Loop"); + try + { + while (!cancellationToken.IsCancellationRequested) + { + using (var message = await Connection.ReceiveAsync(cancellationToken)) + { + await Processor.ProcessMessageAsync(message, cancellationToken); + } + } + } + finally + { + Logger.LogInformation("End Receive Loop"); + } + } + + } +} \ No newline at end of file diff --git a/samples/SampleMsmqHost/Program.cs b/samples/SampleMsmqHost/Program.cs new file mode 100644 index 0000000000..2c6989be26 --- /dev/null +++ b/samples/SampleMsmqHost/Program.cs @@ -0,0 +1,86 @@ +using System; +using System.Messaging; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace SampleMsmqHost +{ + public static class Program + { + // Before running this program, please make sure to install MSMQ + // and create the ".\private$\SampleQueue" queue on your local machine. + + public static async Task Main(string[] args) + { + var host = new HostBuilder() + .ConfigureAppConfiguration(config => + { + config.AddEnvironmentVariables(); + config.AddJsonFile("appsettings.json", optional: true); + config.AddCommandLine(args); + }) + .ConfigureLogging(factory => + { + factory.AddConsole(); + }) + .ConfigureServices(services => + { + services.AddOptions(); + + services.Configure(options => + { + options.Path = @".\private$\SampleQueue"; + options.AccessMode = QueueAccessMode.SendAndReceive; + }); + + services.AddSingleton(); + services.AddTransient(); + services.AddTransient(); + }) + .Build(); + + using (host) + { + // start the MSMQ host + await host.StartAsync(); + + // read and dispatch messages to the MSMQ queue + StartReadLoop(host); + + // wait for the MSMQ host to shutdown + await host.WaitForShutdownAsync(); + } + } + + private static void StartReadLoop(IHost host) + { + var connection = host.Services.GetRequiredService(); + var applicationLifetime = host.Services.GetRequiredService(); + + // run the read loop in a background thread so that it can be stopped with CTRL+C + Task.Run(() => ReadLoop(connection, applicationLifetime.ApplicationStopping)); + } + + private static void ReadLoop(IMsmqConnection connection, CancellationToken cancellationToken) + { + Console.WriteLine("Enter your text message and press ENTER..."); + + while (!cancellationToken.IsCancellationRequested) + { + // read a text message from the user + cancellationToken.ThrowIfCancellationRequested(); + var text = Console.ReadLine(); + + // send the text message to the queue + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrEmpty(text)) + connection.SendText(text); + } + } + + } +} \ No newline at end of file diff --git a/samples/SampleMsmqHost/SampleMsmqHost.csproj b/samples/SampleMsmqHost/SampleMsmqHost.csproj new file mode 100644 index 0000000000..108db8d554 --- /dev/null +++ b/samples/SampleMsmqHost/SampleMsmqHost.csproj @@ -0,0 +1,25 @@ + + + + Exe + + net461 + latest + + + + + + + + + + + + + + + + + +