Streaming Data with ASP .NET Web API and PushContentStream

This article explains how you can subscribe to a data stream and receive data pushed spontaneously by the server, using the ASP .NET Web API. It is intended as nothing more than a learning exercise. Technologies such as WebSockets, SignalR, WCF or even plain sockets may be more suitable for this kind of thing.

Update 2015-03-14: Full source code for server and client applications is now available.

The Server

Our Web API is going to allow clients to subscribe to a service which will send the price of… something every two seconds. To start off, you will need to create a new Web project with a Web API component, and create a new controller (which I called PriceController).

Then, it is simply a matter of sending back a response whose content is a PushStreamContent:

        [HttpGet]
        public HttpResponseMessage Subscribe(HttpRequestMessage request)
        {
            var response = request.CreateResponse();
            response.Content = new PushStreamContent((a, b, c) =>
                { OnStreamAvailable(a, b, c); }, "text/event-stream");
            return response;
        }

The odd arguments in the lambda are a workaround for an unfortunate ambiguity between PushStreamContent constructors in Web API 2.

The implementation for OnStreamAvailable() is pretty simple:

        private void OnStreamAvailable(Stream stream, HttpContent content,
            TransportContext context)
        {
            var client = new StreamWriter(stream);
            clients.Add(client);
        }

We’re simply wrapping a StreamWriter around the stream, and then keeping it in a ConcurrentBag called “clients”.

The last thing we need is a timer to periodically send the price data. This is what the timer’s Elapsed event looks like:

        private async static void timer_Elapsed(object sender, ElapsedEventArgs e)
        {
            var price = 1.0 + random.NextDouble(); // random number between 1 and 2

            foreach (var client in clients)
            {
                try
                {
                    var data = string.Format("data: {0}\n\n", price);
                    await client.WriteAsync(data);
                    await client.FlushAsync();
                }
                catch(Exception)
                {
                    StreamWriter ignore;
                    clients.TryTake(out ignore);
                }
            }
        }

Every 2 seconds, a random number between 1 and 2 is selected, and is sent to all subscribed clients. The data is sent based on the format for Server-Sent Events.

If an exception occurs, then the client is effectively unsubscribed by removing it from the ConcurrentBag. This is necessary because, as Henrik Nielsen states in this discussion:

Detecting that the TCP connection has been reset is something that the Host (ASP, WCF, etc.) monitors but in .NET 4 neither ASP nor WCF tells us (the Web API layer) about it. This means that the only reliable manner to detect a broken connection is to actually write data to it. This is why we have the try/catch around the write operation in the sample. That is, responses will get cleaned up when they fail and not before.

PriceController – Full Code

All you need to get the PriceController working are the member variable declarations and the static constructor which initialises them. I’m providing the entire class below so that you can just copy it and get up and running.

    public class PriceController : ApiController
    {
        private static ConcurrentBag<StreamWriter> clients;
        private static Random random;
        private static Timer timer;

        static PriceController()
        {
            clients = new ConcurrentBag<StreamWriter>();

            timer = new Timer();
            timer.Interval = 2000;
            timer.AutoReset = true;
            timer.Elapsed += timer_Elapsed;
            timer.Start();

            random = new Random();
        }

        private async static void timer_Elapsed(object sender, ElapsedEventArgs e)
        {
            var price = 1.0 + random.NextDouble(); // random number between 1 and 2

            foreach (var client in clients)
            {
                try
                {
                    var data = string.Format("data: {0}\n\n", price);
                    await client.WriteAsync(data);
                    await client.FlushAsync();
                }
                catch(Exception)
                {
                    StreamWriter ignore;
                    clients.TryTake(out ignore);
                }
            }
        }

        [HttpGet]
        public HttpResponseMessage Subscribe(HttpRequestMessage request)
        {
            var response = request.CreateResponse();
            response.Content = new PushStreamContent((a, b, c) =>
                { OnStreamAvailable(a, b, c); }, "text/event-stream");
            return response;
        }

        private void OnStreamAvailable(Stream stream, HttpContent content,
            TransportContext context)
        {
            var client = new StreamWriter(stream);
            clients.Add(client);
        }
    }

Browser Support

To test the PriceController, you could consider firing up a browser and have it do the subscription. However, I’ve found that browser support for this is pretty crap. For instance, Firefox thinks the event stream is something you can download:

pushstreamcontent-firefox

So does IE:

pushstreamcontent-ie

Chrome actually manages to display data; however it sometimes only displays part of a message (the rest is buffered and displayed when the next message is received), and has a habit of giving up:

pushstreamcontent-chrome

When I originally saw these results, I thought I had done something seriously wrong. However, I realised this was not the case when I wrote a client in C# and found that it worked correctly. In fact, let’s do that now.

The Client

This client requires the latest Web API Client Libraries from NuGet. Since writing async Console applications can be a bit messy (see a workaround if you still want to take the Console direction), I wrote a simple WPF application. This just has a “Subscribe” button, and a TextBox which I named “OutputField”.

This is the code that does the subscription and streams the prices from the PriceController:

        private async void SubscribeButton_Click(object sender, RoutedEventArgs e)
        {
            if (this.client == null)
            {
                this.client = new HttpClient();
                var stream = await client.GetStreamAsync("http://localhost:1870/api/Price/Subscribe");

                try
                {
                    using (var reader = new StreamReader(stream))
                    {
                        while (true)
                        {
                            var line = await reader.ReadLineAsync() + Environment.NewLine;

                            this.OutputField.Text += line;
                            this.OutputField.ScrollToEnd();

                        }
                    }
                }
                catch (Exception)
                {
                    this.OutputField.Text += "Stream ended";
                }
            }
        }

Since we’re dealing with a stream, we’re using GetStreamAsync() to talk to the Web API rather than the usual GetAsync(). Then, we can read it as we would any other stream.

And in fact, it works pretty nicely:

pushstreamcontent-wpfclient

Related Links