WCF Duplex to gRPC Streaming

One of the useful message exchange patterns in WCF is the Duplex Service, which creates a persistent connection between client and server and allows both sides to send messages independently. gRPC provides a similar capability with its Streaming services, which allow either the client, the server or both sides to asynchronously send messages over a persistent connection. Let’s take a look at how gRPC Streaming compares to WCF Duplex Services, and how to migrate to one from the other.

WCF Duplex Service

We create a Duplex service by defining two interfaces: the regular ServiceContract interface for the server, and a callback interface to represent the client. For example, a stock ticker service might be defined like this:

[ServiceContract(
    SessionMode=SessionMode.Required,
    CallbackContract=typeof(IStockTickerDuplexCallback))]
public interface IStockTickerDuplex
{
    [OperationContract(IsOneWay=true)]
    void Subscribe(string[] stockSymbols);
}

public interface IStockTickerDuplexCallback
{
    [OperationContract(IsOneWay=true)]
    void Update(StockInfo info);
}

In a very naive implementation, we might make that work something like this:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
public class StockTickerDuplex : IStockTickerDuplex
{
    private readonly TimeSpan _interval = TimeSpan.FromSeconds(10);
    private Task _task;

    public void Subscribe(string[] stockSymbols)
    {
        var tokenSource = new CancellationTokenSource();
        _task = RunAsync(stockSymbols, tokenSource.Token);
        OperationContext.Current.OperationCompleted += (o,a) => tokenSource.Cancel();
    }

    private async Task RunAsync(string[] symbols, CancellationToken token)
    {
        DateTimeOffset lastCheck = default;
        while (!token.IsCancellationRequested)
        {
            var infos = await StockDataService.GetLatestAsync(_symbols, lastCheck);
            lastCheck = DateTimeOffset.UtcNow;
            var callback = Callback;
            foreach (var info in infos)
            {
                callback.Update(info);
            }
            await Task.Delay(_interval, token);
        }
    }

    private IStockTickerDuplexCallback Callback =>
        OperationContext.Current.GetCallbackChannel<IStockTickerDuplexCallback>();
}

On the client side, we implement the IStockTickerDuplexCallback interface and register it with the generated StockTickerDuplexClient via an InstanceContext. For as long as the client exists, the callback object can receive messages from the server.

gRPC Streaming Service

To create an equivalent service in gRPC, we can use the bi-directional streaming feature to create a persistent connection between client and server, send registration actions from the client and a steady stream of stock updates from the server. First of all we need to define the service in our .proto file:

syntax = "proto3";

option csharp_namespace = "TraderSys.Stocks";

package Stocks;

message StockInfo {
	string symbol = 1;
	float price = 2;
}

message SubscribeRequest {
	repeated string symbols = 1;
}

service StockTicker {
	rpc Subscribe(SubscribeRequest) returns (stream StockInfo);
}

Now we implement this service by deriving from the generated StockTicker.StockTickerBase class in our ASP.NET Core 3.0 application:

public class StockTickerGrpcService : StockTicker.StockTickerBase
{
    private static readonly TimeSpan Interval = TimeSpan.FromSeconds(10);

    public override async Task Subscribe(SubscribeRequest request,
        IServerStreamWriter<StockInfo> responseStream,
        ServerCallContext context)
    {
        var symbols = request.Symbols.ToArray();
        var token = context.CancellationToken;
        DateTimeOffset lastCheck = default;

        while (!token.IsCancellationRequested)
        {
            var infos = await StockDataService
                .GetLatestAsync(symbols, lastCheck, token);

            foreach (var info in infos)
            {
                await responseStream.WriteAsync(info);
            }

            lastCheck = DateTimeOffset.UtcNow;
            await Task.Delay(Interval, token);
        }
    }
}

gRPC streaming service methods are different from WCF duplex methods in that the service method is asynchronous, and the Task it returns should not complete until the stream is closed. In our implementation, we use the CancellationToken from the context and run a loop with a delay, getting updates from our StockDataService and writing them to the response stream.

On the client, we can consume this service by using the brand new .NET Core 3.0 and C# 8.0 IAsyncEnumerable<T> and await foreach feature.

class Program
{
    static Task Main(string[] args)
    {
        using var httpClient = new HttpClient {
            BaseAddress = new Uri("https://localhost:5001")
        };
        var client = GrpcClient.Create<StockTicker.StockTickerClient>(httpClient);

        string[] symbols = {"MSFT", "INTC", "AMD"};

        var request = new SubscribeRequest();
        request.Symbols.AddRange(new[] {"MSFT", "INTC", "AMD"});
        var subscription = client.Subscribe(request);

        var tokenSource = new CancellationTokenSource();
        var listenTask = ListenAsync(subscription.ResponseStream, tokenSource.Token);

        Console.Write("Press a key to exit...");
        Console.ReadKey();

        subscription.Dispose();

        return listenTask;
    }

    static async Task ListenAsync(IAsyncStreamReader<StockInfo> stream, CancellationToken token)
    {
        await foreach (var info in stream.AsAsyncEnumerable(token))
        {
            Console.WriteLine($"{info.Symbol}: {info.Price}");
        }
    }
}

The AsAsyncEnumerable method on the stream is a simple extension method that uses C# 8.0’s new language features to wrap gRPC’s IAsyncStreamReader<T> type in an IAsyncEnumerable<T> that can be used with await foreach. Here’s the extension method:

// For Preview 8, install version 2.23.0-pre1 of Grpc.Core.Api
// to get this working. Earlier versions conflict with System.Interactive.Async
internal static class StreamExtensions
{
    public static async IAsyncEnumerable<T> AsAsyncEnumerable<T>(
        this IAsyncStreamReader<T> stream,
        [EnumeratorCancellation] CancellationToken token)
    {
        while (await stream.MoveNext(token))
        {
            yield return stream.Current;
        }
    }
}

Note that the current ASP.NET Core 3.0 Preview 8 (at the time of writing) uses version 1.22.0-pre1 of the Grpc.Core.Api NuGet package, which in turn uses the System.Interactive.Async package from the Reactive Extensions libraries. Unfortunately, this package declares an interface System.Collections.Generic.IAsyncEnumerable<T> that conflicts with the .NET Core 3.0 type. If you try to use this in a gRPC project, you’ll get conflict errors because the compiler doesn’t know which type to use.

This has been “fixed” in version 2.23.0-pre1 of Grpc.Core.Api, so if you add a direct package reference to that version in your project, everything should work properly. The next release of the Grpc.Net.* packages will use that version by default so you won’t have to worry about this.

If you are using Reactive Extensions in your projects, it is similarly easy to create an IObservable<T> wrapper around an IAsyncStreamReader<T>.

Visual RecCde will provide extension methods for both IAsyncEnumerable<T> and IObservable<T> in all generated gRPC code.

Streaming directions

gRPC supports streams in either direction, and full bi-directional streaming.

service Example {
    rpc OneWayClientToServer(stream Request) returns (Empty);
    rpc OneWayServerToClient(Empty) returns (stream Response);
    rpc TwoWay(stream Request) returns (stream Response);
}

It’s possible to build very sophisticated distributed applications over these fairly simple primitives:

  • You might use a gRPC stream from client to server as a channel for logging messages or telemetry, or sending data from a sensor on an IoT device, more efficiently than using HTTP requests and more easily than writing your own raw network protocol, with framing and other issues.

  • A stream from server to client could be used to send notifications, updates and other frequently-changing information.

  • Bi-directional streams could be used for a chat app, to synchronize data between client and server, or for long-running request/response communications where server-side processing takes a long time but the client still wants to be made aware of the success or failure of the operation.

Comparison

Although they differ greatly in the way they’re implemented, WCF Duplex Services and gRPC Streams both work well. gRPC Streams feel more modern, in the way that they can be treated as enumerables or observables; it’s a more functional approach compared to WCF’s very object-oriented paradigm.

WCF’s duplex callback interfaces provide an “easy” way for the server to send different message types or invoke different methods on the client, and this is more complex with gRPC Streams, although not impossible. gRPC messages support a oneof field type that can be used to send different message types along the same stream. You can then detect the type of message in the consumer and process it accordingly. Although this is more complicated than a simple interface with multiple methods, the fact that the consumer(s) and producer(s) can be implemented in different programming languages on different platforms is a big advantage. WCF Duplex services are restricted to .NET alone.

The job of converting from WCF Duplex to gRPC Streaming is not straight-forward, but we are working to make Visual Recode provide as much assistance as possible.

The No-Hassle Code Upgrade Tool for .NET

Individual licenses are just $295 $100 or license your team and deploy worldwide starting at $1,495 $995