Stream HTTP Response Content in ASP.NET Core Web API - Part 2, infinite ♾️ data stream
Advanced/Infinite (♾️) HTTP data streaming use case and example using ASP.NET Core Web API as server processor + both a .NET Console App consumer and a Browser based JavaScript/JS consumer sample
This post is a follow-up of a previous one, that presented a super fast and easy way of streaming some data in ASP.NET Core Web API by serializing the payload in a MemoryStream
and passing it forward to the HTTP Response.
So, the previous post showcased that if you pass an arbitrary Stream
to the HTTP Response of ASP.NET WebAPI, it will be streamed to the client in buffered chunks. Basically it will read buffers from the given stream and it will flush them to the HTTP Response stream.
This is perfect if you have a <<well-known>> data stream, such as a Memory Stream or a File Stream.
But what if you have a <<dynamic>> data stream? Like say you need to fetch each element via some asynchronous operation before sending it to the stream (I know, I know... IAsyncEnumerable
, but let's think .NET Standard...); or you don't know the end of the stream; or you simply want to stream something continuously (or for a specific time period).
This is the scenario covered by this article, with the complete working code on my GitHub Repo: https://github.com/hinteadan/net-http-stream-playground
What it does, is dinamically generates and streams a continuous datastream (DateTime, JSON or number) for a given amount of time. Preview below:
Server-side HTTP API Implementation
The main thing that makes the stuff work here, is flushing data bytes directly onto the HTTP Response Stream of .NET's WebAPI. But, read below!
- Content Type must be text/plain, otherwise browsers will not process the data stream as it comes, but only after the stream closes! However, you can append a custom format info. For example:
Response.ContentType = "text/plain; charset=utf-8; x-subtype=json";
- Make sure you always flush complete entries onto the stream. This means, no partial JSONs, no partial DateTimes, etc. Otherwise it's extremly hard for the consuming client to process the stream on the go.
So, first of all, in preparation of the continuous data stream, I implemented an EndlessEnumerable<TElement>
that enumarates forever and sleeps a bit on MoveNext()
to artificially simulate some workload and better exemplify the streaming process.
We iterate over such an enumaration for a given period of time and flush each element onto the HTTP Response stream. This is implemented in the StreamProvider
.
The StreamProvider
is called by the WebAPI controller and sends the Response Body Stream as parameter so that the StreamProvider
can flush bytes onto it.
And that's it on the server side, I'll put the imeplemtations below, as well, but again, the full working solution is on GitHub.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace H.Playground.Streaming.Core
{
public class EndlessEnumerable<TElement> : IEnumerable<TElement>, IEnumerator<TElement>, IDisposable
{
static readonly TimeSpan refreshRate = TimeSpan.FromSeconds(.25);
readonly Func<TElement> elementFactory;
public EndlessEnumerable(Func<TElement> elementFactory)
{
this.elementFactory = elementFactory ?? throw new ArgumentNullException(nameof(elementFactory));
}
public TElement Current => elementFactory();
public bool MoveNext()
{
Thread.Sleep(refreshRate);
return true;
}
public void Reset() { }
public void Dispose() { }
object IEnumerator.Current => Current;
public IEnumerator<TElement> GetEnumerator() => this;
IEnumerator IEnumerable.GetEnumerator() => this;
}
}
using H.Necessaire.Serialization;
using H.Playground.Streaming.Core.Model;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace H.Playground.Streaming.Core
{
public class StreamProvider
{
static readonly Random random = new Random();
static readonly TimeSpan defaultDuration = TimeSpan.FromSeconds(30);
static readonly TimeSpan maxDuration = TimeSpan.FromMinutes(5);
readonly DataEntryProvider dataEntryProvider = new DataEntryProvider();
public async Task StreamNumbersTo(Stream stream, TimeSpan? desiredDuration = null)
{
TimeSpan duration = GetActualDuration(desiredDuration);
DateTime startedAt = DateTime.UtcNow;
foreach (int number in new EndlessEnumerable<int>(() => random.Next(int.MinValue, int.MaxValue)))
{
if (DateTime.UtcNow > startedAt + duration)
break;
await WriteValueToStream(stream, $"{number}{Environment.NewLine}");
}
}
public async Task StreamTimestampTo(Stream stream, TimeSpan? desiredDuration = null)
{
TimeSpan duration = GetActualDuration(desiredDuration);
DateTime startedAt = DateTime.UtcNow;
foreach (DateTime dateTime in new EndlessEnumerable<DateTime>(() => DateTime.UtcNow))
{
if (DateTime.UtcNow > startedAt + duration)
break;
await WriteValueToStream(stream, $"{dateTime.ToString("O")}{Environment.NewLine}");
}
}
public async Task StreamDataEntriesTo(Stream stream, TimeSpan? desiredDuration = null)
{
TimeSpan duration = GetActualDuration(desiredDuration);
DateTime startedAt = DateTime.UtcNow;
await WriteValueToStream(stream, $"[{Environment.NewLine}");
foreach (DataEntry dataEntry in new EndlessEnumerable<DataEntry>(dataEntryProvider.NewRandomEntry))
{
if (DateTime.UtcNow > startedAt + duration)
break;
await WriteValueToStream(stream, $"{dataEntry.ToJsonObject()}{Environment.NewLine},{Environment.NewLine}");
}
await WriteValueToStream(stream, $"null{Environment.NewLine}]");
}
static async Task WriteValueToStream(Stream stream, string value)
{
byte[] valueAsBytes = Encoding.UTF8.GetBytes(value);
await stream.WriteAsync(valueAsBytes, 0, valueAsBytes.Length);
await stream.FlushAsync();
}
static TimeSpan GetActualDuration(TimeSpan? desiredDuration = null)
{
TimeSpan duration = desiredDuration ?? defaultDuration;
duration = duration < TimeSpan.Zero ? -duration : duration;
duration = duration > maxDuration ? maxDuration : duration;
return duration;
}
}
}
using H.Necessaire;
using H.Playground.Streaming.Core;
using Microsoft.AspNetCore.Mvc;
namespace H.Playground.Streaming.API.HTTP.NetCoreWebApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class StreamController : ControllerBase
{
StreamProvider streamProvider;
public StreamController()
{
streamProvider = new StreamProvider();
}
[HttpGet]
[Route(nameof(Timestamp))]
public async Task Timestamp([FromQuery] string? t)
{
Response.ContentType = "text/plain; charset=utf-8";
double? desiredDurationInSeconds = t?.ParseToDoubleOrFallbackTo(null);
await streamProvider.StreamTimestampTo(Response.Body, desiredDuration: desiredDurationInSeconds == null ? null : TimeSpan.FromSeconds(desiredDurationInSeconds.Value));
}
[HttpGet]
[Route(nameof(DataEntries))]
public async Task DataEntries([FromQuery] string? t)
{
Response.ContentType = "text/plain; charset=utf-8; x-subtype=json";
double? desiredDurationInSeconds = t?.ParseToDoubleOrFallbackTo(null);
await streamProvider.StreamDataEntriesTo(Response.Body, desiredDuration: desiredDurationInSeconds == null ? null : TimeSpan.FromSeconds(desiredDurationInSeconds.Value));
}
[HttpGet]
[Route(nameof(Numbers))]
public async Task Numbers([FromQuery] string? t)
{
Response.ContentType = "text/plain; charset=utf-8; x-subtype=json";
double? desiredDurationInSeconds = t?.ParseToDoubleOrFallbackTo(null);
await streamProvider.StreamNumbersTo(Response.Body, desiredDuration: desiredDurationInSeconds == null ? null : TimeSpan.FromSeconds(desiredDurationInSeconds.Value));
}
}
}
The Consuming Client Implementation
From JS
In JavaScript, consuming the stream starts with:
(await fetch(streamUrl)).body.getReader()
And then, you keep reading from the stream reader via:
let streamReadResult = await streamReader.read();
await processStreamChunk(streamReadResult.value);
Until:
streamReadResult.done === true;
Full streamer.js
implemention on GitHub and below:
(async function (window, console, fetch) {
function Streamer(streamUrl, chunkProcessor) {
const refreshRateInMilliseconds = 500;
const textDecoder = new TextDecoder();
let streamReader = null;
let isDone = false;
async function processStream() {
if (!streamReader) {
isDone = true;
return;
}
if (isDone !== false)
return;
let streamReadResult = await streamReader.read();
isDone = streamReadResult.done;
await processStreamChunk(streamReadResult.value);
}
async function processStreamChunk(chunkValue) {
let valueAsString = textDecoder.decode(chunkValue);
console.debug(valueAsString);
if (chunkProcessor)
await chunkProcessor(valueAsString);
}
async function processStreamAndQueueAnother() {
await processStream();
if (isDone !== false)
return;
setTimeout(async () => {
await processStreamAndQueueAnother();
}, refreshRateInMilliseconds);
}
this.Start = async function () {
streamReader = (await fetch(streamUrl)).body.getReader();
await processStreamAndQueueAnother();
}
}
window.Streamer = Streamer;
})(this.window, this.console, fetch);
And concrete streamer.js
usage on GitHub and below:
(async function (document, Streamer, JSON) {
const defaultSeparator = '\r\n';
const jsonStringSeparator = '\r\n,\r\n';
const stringStreamUrl = `/stream/timestamp`;
const jsonsStreamUrl = `/stream/DataEntries`;
const numbersStreamUrl = `/stream/Numbers`;
const buttonStreamStrings = document.getElementById("ButtonStreamStrings");
const buttonStreamJsons = document.getElementById("ButtonStreamJsons");
const buttonStreamNumbers = document.getElementById("ButtonStreamNumbers");
const printCanvas = document.getElementById("PrintCanvas");
const inputSecondsToRun = document.getElementById("InputSecondsToRun");
async function stringChunkProcessor(chunkValue) {
let values = chunkValue.split(defaultSeparator);
for (let i in values) {
let value = values[i];
if (!value) continue;
console.debug(value);
printCanvas.append(`${value}\n`);
}
await Promise.resolve(true);
}
async function jsonChunkProcessor(chunkValue) {
if (chunkValue.startsWith('['))
return;
if (chunkValue.endsWith(']'))
return;
let jsons = chunkValue.split(jsonStringSeparator);
for (let i in jsons) {
try {
let dataEntry = JSON.parse(jsons[i]);
console.debug(dataEntry);
printCanvas.append(`${JSON.stringify(dataEntry, null, 2)}\n`);
}
catch (err) {
}
}
await Promise.resolve(true);
}
async function numberChunkProcessor(chunkValue) {
let values = chunkValue.split(defaultSeparator);
for (let i in values) {
let value = values[i];
if (!value) continue;
console.debug(value);
printCanvas.append(`${value}\n`);
}
await Promise.resolve(true);
}
buttonStreamStrings.addEventListener("click", async () => {
printCanvas.innerText = '';
await new Streamer(`${stringStreamUrl}?t=${inputSecondsToRun.value}`, stringChunkProcessor).Start();
});
buttonStreamJsons.addEventListener("click", async () => {
printCanvas.innerText = '';
await new Streamer(`${jsonsStreamUrl}?t=${inputSecondsToRun.value}`, jsonChunkProcessor).Start();
});
buttonStreamNumbers.addEventListener("click", async () => {
printCanvas.innerText = '';
await new Streamer(`${numbersStreamUrl}?t=${inputSecondsToRun.value}`, numberChunkProcessor).Start();
});
})(this.document, this.window.Streamer, this.JSON);
From .NET
In .NET, consuming the stream starts with HttpCompletionOption.ResponseHeadersRead
:
await httpClient.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead);
Full sample, GetHttpStream
, on GitHub and below:
using H.Necessaire;
using H.Necessaire.Runtime.CLI.Commands;
using System.Text;
namespace H.Playground.Streaming.Client.CLI
{
internal abstract class CliCommandBase : CommandBase
{
protected const string baseUrl = "http://localhost:5261";
protected static readonly HttpClient httpClient = new HttpClient();
static readonly TimeSpan processingRefreshRate = TimeSpan.FromSeconds(.5);
static readonly TimeSpan warmupDuration = TimeSpan.FromSeconds(3);
static readonly string defaultEndMarker = Environment.NewLine;
const uint blankStreamReadCountTolerance = 3;
const uint streamBufferSize = 512;
protected async Task Warmup()
{
Log("Warming up...");
Log($"0% warmed up. {Math.Round(warmupDuration.TotalSeconds, 0)} second(s) remaining.");
for (double second = warmupDuration.TotalSeconds; second > 0; second -= processingRefreshRate.TotalSeconds)
{
await Task.Delay(processingRefreshRate);
Log($"{Math.Round((warmupDuration.TotalSeconds - second) / warmupDuration.TotalSeconds * 100, 0)}% warmed up. {Math.Round(second, 1)} second(s) remaining.");
}
Log("100% All warmed up. 0 second(s) remaining.");
}
protected Task<string?> WaitForUserInput()
{
string? userInput = Console.ReadLine();
return userInput.AsTask();
}
protected async Task ProcessEndlessStream(Stream stream, Func<string, Task> chunkProcessor, TimeSpan? timeout = null)
{
DateTime startedAt = DateTime.Now;
uint blankReadCount = 0;
while ((blankReadCount <= blankStreamReadCountTolerance) && (timeout is null ? true : DateTime.Now <= startedAt + timeout.Value))
{
string chunkValue = await ReadChunckFromStream(stream);
if (chunkValue.Length == 0)
blankReadCount++;
await chunkProcessor(chunkValue);
await Task.Delay(processingRefreshRate);
};
}
protected async Task<Stream> GetHttpStream(HttpMethod httpMethod, string url)
{
using (HttpRequestMessage httpRequest = new HttpRequestMessage(httpMethod, url))
{
HttpResponseMessage httpResponse = await httpClient.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead);
httpResponse.EnsureSuccessStatusCode();
return await httpResponse.Content.ReadAsStreamAsync();
}
}
private static async Task<string> ReadChunckFromStream(Stream stream, params string?[] endMarkers)
{
StringBuilder resultBuilder = new StringBuilder();
byte[] buffer = new byte[streamBufferSize];
string? readString = null;
int readLength = -1;
do
{
readLength = await stream.ReadAsync(buffer, 0, buffer.Length);
readString = Encoding.UTF8.GetString(buffer).TrimEnd('\0');
resultBuilder.Append(readString);
Array.Clear(buffer, 0, readLength);
}
while (readLength > 0 && (endMarkers?.Any() == true ? endMarkers : defaultEndMarker.AsArray()).All(endMarker => readString?.EndsWith(endMarker ?? defaultEndMarker) != true));
return resultBuilder.ToString();
}
}
}
And usage sample on GitHub and below:
using H.Necessaire;
namespace H.Playground.Streaming.Client.CLI
{
internal class StreamTimestampCommand : CliCommandBase
{
static readonly TimeSpan timeout = TimeSpan.FromSeconds(5);
public override async Task<OperationResult> Run()
{
await Warmup();
using (Stream contentStream = await GetHttpStream(HttpMethod.Get, $"{baseUrl}/stream/timestamp?t={(int)Math.Floor(timeout.TotalSeconds)}"))
{
await ProcessEndlessStream(contentStream, streamChunk =>
{
if (string.IsNullOrEmpty(streamChunk))
return false.AsTask();
string[] rawValues = streamChunk.Split(Environment.NewLine, StringSplitOptions.RemoveEmptyEntries);
foreach (string rawValue in rawValues)
{
if (DateTime.TryParse(rawValue.ReplaceLineEndings(string.Empty), out DateTime parseResult))
{
Log($"Server streamed timestamp: {parseResult.ToString("O")}");
}
}
return true.AsTask();
});
}
Log("Press <ENTER> to exit");
await WaitForUserInput();
return
true
.ToWinResult()
;
}
}
}