Core.ForEach - Dmitry-Bychenko/Amphisbaena GitHub Wiki

Foreach

Being similar to Select, ForEach executes action for each item in reader using parallel options. Note, that order of execution is not guaranteed.

Declaration (has overloads)

public static ChannelReader<T> ForEach<S, T>(this ChannelReader<S> reader,
                                                  Func<S, T> action,
                                                  ChannelParallelOptions options)

Example

int[] data = Enumerable
  .Range(0, 1000)
  .ToArray();

// Here we split initial data into 100 items batches: 1, 2, ..., 1000 => {1, 2, ..., 100}, {101 ... 200}, ...
// And then sum items within each blocks in parallel  {1, 2, ..., 100}, {101 ... 200}, => 5050, 15050, .. 
// Finally we aggregate (sum) all partial sums         5050 + 15050 + ... => 500500 
int sum = await data
  .ToChannelReader()
  .ToBatch((batch, item, index) => batch.Count < 100)
  .ForEach(batch => batch.Sum(), new ChannelParallelOptions() { DegreeOfParallelism = 4 })
  .Aggregate((s, a) => s + a);
⚠️ **GitHub.com Fallback** ⚠️