Sample generic host using MSMQ (#1381)

This commit is contained in:
Marcin Polewski 2018-04-30 13:08:20 -05:00 committed by Chris Ross
parent efcb8d4a44
commit e9b84a298a
7 changed files with 302 additions and 0 deletions

View File

@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CreateWebHostBuilderInvalid
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BuildWebHostInvalidSignature", "test\TestAssets\BuildWebHostInvalidSignature\BuildWebHostInvalidSignature.csproj", "{79D0E344-71C4-4D63-9632-01CC041C8788}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SampleMsmqHost", "samples\SampleMsmqHost\SampleMsmqHost.csproj", "{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -342,6 +344,18 @@ Global
{79D0E344-71C4-4D63-9632-01CC041C8788}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{79D0E344-71C4-4D63-9632-01CC041C8788}.Release|x86.ActiveCfg = Release|Any CPU
{79D0E344-71C4-4D63-9632-01CC041C8788}.Release|x86.Build.0 = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|x86.ActiveCfg = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Debug|x86.Build.0 = Debug|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Any CPU.Build.0 = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|x86.ActiveCfg = Release|Any CPU
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -371,6 +385,7 @@ Global
{F9074486-EAE4-4171-BC9E-1557C2A56DDE} = {A7270417-6BC6-4E3F-A96A-79193D16BB82}
{ACB63E80-375C-4A8F-9210-8FD509148F31} = {FA7D2012-C1B4-4AF7-9ADD-381B2004EA16}
{79D0E344-71C4-4D63-9632-01CC041C8788} = {FA7D2012-C1B4-4AF7-9ADD-381B2004EA16}
{C082BF14-2F9A-4D48-8539-AEF3A1B2043C} = {9C7520A0-F2EB-411C-8BB2-80B39C937217}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AABD536D-E05F-409B-A716-535E0C478076}

View File

@ -0,0 +1,80 @@
using System;
using System.IO;
using System.Messaging;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace SampleMsmqHost
{
public interface IMsmqConnection
{
void SendText(string text);
Task<Message> ReceiveAsync(CancellationToken cancellationToken);
}
public class MsmqConnection : IMsmqConnection, IDisposable
{
private readonly MessageQueue _queue;
public MsmqOptions Options { get; }
public ILogger<MsmqConnection> Logger { get; }
public MsmqConnection(IOptions<MsmqOptions> options, ILogger<MsmqConnection> logger)
{
Options = options?.Value ?? throw new ArgumentNullException(nameof(options));
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
_queue = OpenQueue();
}
private MessageQueue OpenQueue()
{
Logger.LogInformation("Opening Queue: Path={0}; AccessMode={1};", Options.Path, Options.AccessMode);
return new MessageQueue(Options.Path, Options.SharedModeDenyReceive, Options.EnableCache, Options.AccessMode);
}
public void Dispose()
{
Logger.LogInformation("Closing Queue");
_queue?.Dispose();
}
public void SendText(string text)
{
// send the text message as UTF7
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream, Encoding.UTF7))
using (var message = new Message())
{
writer.Write(text);
writer.Flush();
message.BodyStream = stream;
_queue.Send(message);
}
}
public async Task<Message> ReceiveAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<Message>();
using (cancellationToken.Register(obj => ((TaskCompletionSource<Message>)obj).TrySetCanceled(), tcs))
{
// wait for a message to arrive or cancellation
var receiveTask = Task.Factory.FromAsync(_queue.BeginReceive(), _queue.EndReceive);
if (receiveTask != await Task.WhenAny(receiveTask, tcs.Task))
throw new OperationCanceledException(cancellationToken);
return receiveTask.Result;
}
}
}
}

View File

@ -0,0 +1,15 @@
using System.Messaging;
namespace SampleMsmqHost
{
public class MsmqOptions
{
public string Path { get; set; }
public bool SharedModeDenyReceive { get; set; } = false;
public bool EnableCache { get; set; } = false;
public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive;
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.IO;
using System.Messaging;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace SampleMsmqHost
{
public interface IMsmqProcessor
{
Task ProcessMessageAsync(Message message, CancellationToken cancellationToken);
}
public class MsmqProcessor : IMsmqProcessor
{
private readonly ILogger<MsmqProcessor> _logger;
public MsmqProcessor(ILogger<MsmqProcessor> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
{
// we assume the message contains text encoded as UTF7
using (var reader = new StreamReader(message.BodyStream, Encoding.UTF7))
{
var text = await reader.ReadToEndAsync();
_logger.LogInformation("Received Message: {0}", text);
}
}
}
}

View File

@ -0,0 +1,44 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace SampleMsmqHost
{
public class MsmqService : BackgroundService
{
public ILogger<MsmqService> Logger { get; }
public IMsmqConnection Connection { get; }
public IMsmqProcessor Processor { get; }
public MsmqService(ILogger<MsmqService> logger, IMsmqConnection connection, IMsmqProcessor processor)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
Connection = connection ?? throw new ArgumentNullException(nameof(connection));
Processor = processor ?? throw new ArgumentNullException(nameof(processor));
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("Begin Receive Loop");
try
{
while (!cancellationToken.IsCancellationRequested)
{
using (var message = await Connection.ReceiveAsync(cancellationToken))
{
await Processor.ProcessMessageAsync(message, cancellationToken);
}
}
}
finally
{
Logger.LogInformation("End Receive Loop");
}
}
}
}

View File

@ -0,0 +1,86 @@
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace SampleMsmqHost
{
public static class Program
{
// Before running this program, please make sure to install MSMQ
// and create the ".\private$\SampleQueue" queue on your local machine.
public static async Task Main(string[] args)
{
var host = new HostBuilder()
.ConfigureAppConfiguration(config =>
{
config.AddEnvironmentVariables();
config.AddJsonFile("appsettings.json", optional: true);
config.AddCommandLine(args);
})
.ConfigureLogging(factory =>
{
factory.AddConsole();
})
.ConfigureServices(services =>
{
services.AddOptions();
services.Configure<MsmqOptions>(options =>
{
options.Path = @".\private$\SampleQueue";
options.AccessMode = QueueAccessMode.SendAndReceive;
});
services.AddSingleton<IMsmqConnection, MsmqConnection>();
services.AddTransient<IMsmqProcessor, MsmqProcessor>();
services.AddTransient<IHostedService, MsmqService>();
})
.Build();
using (host)
{
// start the MSMQ host
await host.StartAsync();
// read and dispatch messages to the MSMQ queue
StartReadLoop(host);
// wait for the MSMQ host to shutdown
await host.WaitForShutdownAsync();
}
}
private static void StartReadLoop(IHost host)
{
var connection = host.Services.GetRequiredService<IMsmqConnection>();
var applicationLifetime = host.Services.GetRequiredService<IApplicationLifetime>();
// run the read loop in a background thread so that it can be stopped with CTRL+C
Task.Run(() => ReadLoop(connection, applicationLifetime.ApplicationStopping));
}
private static void ReadLoop(IMsmqConnection connection, CancellationToken cancellationToken)
{
Console.WriteLine("Enter your text message and press ENTER...");
while (!cancellationToken.IsCancellationRequested)
{
// read a text message from the user
cancellationToken.ThrowIfCancellationRequested();
var text = Console.ReadLine();
// send the text message to the queue
cancellationToken.ThrowIfCancellationRequested();
if (!string.IsNullOrEmpty(text))
connection.SendText(text);
}
}
}
}

View File

@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<!-- System.Messaging is only available in the full .NET Frameworks -->
<TargetFramework>net461</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.Extensions.Hosting\Microsoft.Extensions.Hosting.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="$(MicrosoftExtensionsConfigurationCommandLinePackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="$(MicrosoftExtensionsConfigurationEnvironmentVariablesPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="$(MicrosoftExtensionsConfigurationJsonPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="$(MicrosoftExtensionsLoggingConsolePackageVersion)" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.Messaging" />
</ItemGroup>
</Project>