Streaming Async Enumerable - lobodava/artisan-orm GitHub Wiki

For queries that return more rows than you want to buffer in memory, ReadToAsyncEnumerable<T> and ReadAsAsyncEnumerable<T> stream rows one at a time as IAsyncEnumerable<T>, with first-class CancellationToken support.

Added in v4 — see What's New in v4.

When to stream

Use streaming over ReadToList<T> when any of these is true:

  • The result set is large enough that holding it all in RAM is uncomfortable (CSV exports, batch processing pipelines, archival reads).
  • You want to start working on each row immediately without waiting for the whole list (latency-sensitive feeds, server-sent events, gRPC streaming responses).
  • You want the consumer to control how many rows to read (early break after finding what you need, take-N patterns).

Use ReadToList<T> when:

  • The result is small and bounded (a page of 50 users, a lookup table).
  • You will iterate the rows multiple times.
  • You need to materialise the count up front.

Two flavours

Method Mapping path
ReadToAsyncEnumerable<T> Registered [MapperFor] mapper, or scalar conversion for simple types.
ReadAsAsyncEnumerable<T> Auto-mapping (reflection-based, cached).

Both exist on SqlCommand and RepositoryBase.

Basic example

public class RecordRepository : RepositoryBase
{
    public RecordRepository(string connStr) : base(connStr) { }

    public IAsyncEnumerable<Record> GetAllRecords(CancellationToken ct = default) =>
        ReadToAsyncEnumerable<Record>("dbo.GetRecords", ct);
}

// Caller
await foreach (var record in repo.GetAllRecords(stoppingToken))
{
    await ProcessAsync(record, stoppingToken);
}

The underlying SqlDataReader and SqlConnection stay open for the lifetime of the iterator. They are disposed when:

  • the consumer's await foreach loop exits normally,
  • the consumer breaks out of the loop,
  • the consumer's IAsyncEnumerator is disposed (which await foreach does automatically),
  • the CancellationToken is canceled.

Cancellation

Two ways to pass the token, both equivalent:

// Pass at call time
await foreach (var r in repo.ReadToAsyncEnumerable<Record>("dbo.GetRecords", ct))
    ...

// Pass via WithCancellation when the producer didn't take a CT
await foreach (var r in cmd.ReadToAsyncEnumerable<Record>().WithCancellation(ct))
    ...

When the token is canceled, the next MoveNextAsync throws OperationCanceledException. The reader and connection are disposed by the iterator's finally block — no leak.

Early break

Iteration is lazy — rows are pulled from SQL Server only as you consume them. Breaking out of the loop is cheap and clean:

await foreach (var record in repo.ReadToAsyncEnumerable<Record>("dbo.GetAllRecords"))
{
    if (record.Name == "STOP") break;   // remaining rows never travel over the wire
    await ProcessAsync(record);
}

Streaming on SqlCommand

When you need to configure parameters or share the command with other reads:

public async IAsyncEnumerable<Record> StreamRecordsAsync(
    int minId,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    using var cmd = CreateCommand();
    cmd.UseProcedure("dbo.GetRecordsByMinId");
    cmd.AddIntParam("@MinId", minId);

    await foreach (var record in cmd.ReadToAsyncEnumerable<Record>(ct))
        yield return record;
}

The [EnumeratorCancellation] attribute lets the caller's WithCancellation(...) token flow into your inner streaming call.

Auto-mapping flavour

ReadAsAsyncEnumerable<T> works the same way but builds the mapper via reflection on the first call and caches it (see Auto-Mapping):

public IAsyncEnumerable<UserSummary> StreamUsers(CancellationToken ct = default) =>
    ReadAsAsyncEnumerable<UserSummary>(
        "select Id, Login, Name, Email from dbo.Users",
        ct);

Connection lifetime — important

The underlying SqlConnection is held open for the entire iteration. Two implications:

  1. Don't issue other repository calls inside the loop on the same repo. The connection is busy with the open reader. If you need a second query mid-iteration, take that query's data first (into a list / dictionary) and use it during the iteration, or open a second repository instance.

  2. Always drain or break out of the iterator promptly. Forgetting to await foreach to completion (or breaking and not disposing) leaks a connection until the iterator is finalized. Modern C# await foreach does the disposal for you; the trap is mainly stashing the IAsyncEnumerable<T> somewhere and never enumerating it.

Streaming scalars

ReadToAsyncEnumerable<int>, <string>, <DateTime> etc. work too, useful for "ids only" passes:

await foreach (var id in repo.ReadToAsyncEnumerable<int>(
    "select Id from dbo.Records order by Id"))
{
    // ...
}

See also:

⚠️ **GitHub.com Fallback** ⚠️