Parallel - cywongg/2025 GitHub Wiki

In C#, you can use the Parallel.ForEach method to process a collection of items in parallel. This is both clean and efficient for scenarios where you want to run a function on each item concurrently. Here's how you can do it in a clean and idiomatic way:

Example Code

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        // Sample collection of items
        var items = new List<string> { "Item1", "Item2", "Item3", "Item4" };

        // Process each item in parallel
        Parallel.ForEach(items, item =>
        {
            ProcessItem(item);
        });
    }

    // Function to process each item
    static void ProcessItem(string item)
    {
        Console.WriteLine($"Processing {item} on thread {Task.CurrentId}");
        // Simulate some work
        Task.Delay(1000).Wait();
    }
}

Explanation

  1. Parallel.ForEach:

    • This method takes a collection and a delegate (a lambda function, method group, etc.) to process each item in parallel.
    • The runtime automatically manages the degree of parallelism for optimal performance based on available threads and resources.
  2. Thread-Safe Operations:

    • Ensure any shared resources accessed within ProcessItem are thread-safe. Use synchronization mechanisms like locks or thread-safe collections if needed.
  3. Void Return Type:

    • Since the function you are applying (ProcessItem) has a void return type, Parallel.ForEach is a natural fit, as it doesn't expect a return value.
  4. Logging or Debugging:

    • To see which threads are processing which items, you can log Task.CurrentId or use Thread.CurrentThread.ManagedThreadId for debugging purposes.

Clean Code Enhancements

  • Error Handling: Wrap the processing logic in a try-catch block to handle exceptions gracefully.
Parallel.ForEach(items, item =>
{
    try
    {
        ProcessItem(item);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error processing {item}: {ex.Message}");
    }
});
  • Degree of Parallelism: If you want to control how many threads are used, you can pass ParallelOptions.
var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 4 // Limit to 4 threads
};

Parallel.ForEach(items, parallelOptions, item =>
{
    ProcessItem(item);
});
  • Readability: Extract the parallel processing logic into a separate method to improve readability.
static void ProcessItemsInParallel(IEnumerable<string> items)
{
    Parallel.ForEach(items, item =>
    {
        ProcessItem(item);
    });
}

When to Avoid Parallel.ForEach

  • I/O-Bound Workloads: For tasks like file I/O or database calls, prefer async/await with tasks instead of Parallel.ForEach.
  • Small Collections: For very small collections, the overhead of creating threads might outweigh the benefit of parallelism.
  • High Contention: Be cautious if your items require heavy locking or synchronization, as it might degrade performance.

Alternative: Using Task.WhenAll for Asynchronous Work

If the function you apply is asynchronous (e.g., returns a Task), use Task.WhenAll instead:

static async Task ProcessItemsInParallelAsync(IEnumerable<string> items)
{
    var tasks = items.Select(item => ProcessItemAsync(item));
    await Task.WhenAll(tasks);
}

static async Task ProcessItemAsync(string item)
{
    Console.WriteLine($"Processing {item} on thread {Task.CurrentId}");
    await Task.Delay(1000); // Simulate async work
}

This approach is better suited for I/O-bound tasks and integrates seamlessly with async/await.

When performing database-related operations in parallel, robustness becomes critical because database operations often involve shared resources, potential race conditions, transaction management, and error handling. Below is a guide to making your parallel processing of database-related tasks more robust in C#:


Key Considerations for Robustness:

  1. Transaction Management:

    • Each operation should use its own database transaction if needed. This ensures isolation and prevents partial changes in case of failures.
  2. Error Handling:

    • Implement proper exception handling to handle database-related errors (e.g., deadlocks, timeouts).
    • Use retry logic for transient failures (e.g., network issues, deadlocks).
  3. Concurrency Control:

    • Avoid overloading the database. Use a controlled degree of parallelism.
  4. Thread Safety:

    • Ensure any shared state (e.g., connection pooling or in-memory data) is thread-safe.
  5. Idempotency:

    • Design your database operations to be idempotent where possible, so retries don't cause duplicate or inconsistent data.
  6. Logging and Monitoring:

    • Log all errors and outcomes for traceability.
    • Use performance monitoring to detect bottlenecks in parallel execution or database contention.

Implementation in C#

Here’s how you can implement robust parallel database processing:

1. Using Parallel.ForEach With Database Operations

using System;
using System.Collections.Generic;
using System.Data.SqlClient; // Or your database provider
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var items = new List<string> { "Item1", "Item2", "Item3", "Item4" };

        // Use controlled degree of parallelism
        var parallelOptions = new ParallelOptions
        {
            MaxDegreeOfParallelism = 4 // Adjust based on database capacity
        };

        Parallel.ForEach(items, parallelOptions, item =>
        {
            try
            {
                ProcessDatabaseOperation(item).Wait(); // Run the async method synchronously
            }
            catch (Exception ex)
            {
                // Log the error and continue processing other items
                Console.WriteLine($"Error processing {item}: {ex.Message}");
            }
        });
    }

    static async Task ProcessDatabaseOperation(string item)
    {
        // Example: Using ADO.NET for database interaction
        const string connectionString = "YourConnectionStringHere";

        using (var connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync();
            using (var transaction = connection.BeginTransaction())
            {
                try
                {
                    // Example: Insert or update operation
                    var command = new SqlCommand("INSERT INTO YourTable (ColumnName) VALUES (@Value)", connection, transaction);
                    command.Parameters.AddWithValue("@Value", item);
                    await command.ExecuteNonQueryAsync();

                    // Commit the transaction
                    transaction.Commit();
                }
                catch
                {
                    // Rollback transaction on failure
                    transaction.Rollback();
                    throw; // Rethrow the exception to be handled at a higher level
                }
            }
        }

        Console.WriteLine($"Successfully processed {item}");
    }
}

2. Using Task.WhenAll for Asynchronous Database Processing

If your database operations are asynchronous, prefer Task.WhenAll over Parallel.ForEach for better resource management and scalability.

using System;
using System.Collections.Generic;
using System.Data.SqlClient; // Or your database provider
using System.Linq;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        var items = new List<string> { "Item1", "Item2", "Item3", "Item4" };

        // Process items in parallel with async tasks
        var tasks = items.Select(item => ProcessDatabaseOperationAsync(item));
        try
        {
            await Task.WhenAll(tasks);
        }
        catch (Exception ex)
        {
            // Log the error if any task fails
            Console.WriteLine($"Error during processing: {ex.Message}");
        }
    }

    static async Task ProcessDatabaseOperationAsync(string item)
    {
        const string connectionString = "YourConnectionStringHere";

        using (var connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync();
            using (var transaction = connection.BeginTransaction())
            {
                try
                {
                    // Example: Insert or update operation
                    var command = new SqlCommand("INSERT INTO YourTable (ColumnName) VALUES (@Value)", connection, transaction);
                    command.Parameters.AddWithValue("@Value", item);
                    await command.ExecuteNonQueryAsync();

                    // Commit transaction
                    transaction.Commit();
                }
                catch
                {
                    // Rollback transaction on failure
                    transaction.Rollback();
                    throw; // Rethrow to propagate the error to Task.WhenAll
                }
            }
        }

        Console.WriteLine($"Successfully processed {item}");
    }
}

Enhancements for Robustness

1. Retry Logic for Transient Failures

Use a library like Polly to implement exponential backoff for retries:

using Polly;
using Polly.Retry;

static async Task ProcessDatabaseOperationWithRetryAsync(string item)
{
    var retryPolicy = Policy
        .Handle<SqlException>() // Retry on SQL exceptions
        .Or<TimeoutException>() // Retry on timeout exceptions
        .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); // Exponential backoff

    await retryPolicy.ExecuteAsync(async () =>
    {
        await ProcessDatabaseOperationAsync(item);
    });
}

2. Controlling Parallelism

If you're using Task.WhenAll, you can limit the degree of parallelism with a semaphore:

static async Task ProcessItemsWithLimitedConcurrency(IEnumerable<string> items, int maxDegreeOfParallelism)
{
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);

    var tasks = items.Select(async item =>
    {
        await semaphore.WaitAsync();
        try
        {
            await ProcessDatabaseOperationAsync(item);
        }
        finally
        {
            semaphore.Release();
        }
    });

    await Task.WhenAll(tasks);
}

Call this with:

await ProcessItemsWithLimitedConcurrency(items, 4); // Limit to 4 concurrent operations

3. Centralized Logging

Use a centralized logging framework like Serilog or NLog to log errors, retries, and successes for better observability.


4. Idempotency

Design your database operations to be idempotent. For example:

  • Use MERGE statements (UPSERT) in SQL to avoid inserting duplicates.
  • Use unique constraints or deduplication logic.

Example (SQL with MERGE):

MERGE INTO YourTable AS Target
USING (VALUES (@Value)) AS Source (Value)
ON Target.ColumnName = Source.Value
WHEN NOT MATCHED THEN
    INSERT (ColumnName) VALUES (Source.Value);

Summary

  • Use Parallel.ForEach or Task.WhenAll for parallelism based on your workload.
  • Ensure proper transaction management and handle transient errors with retry logic.
  • Limit concurrency to avoid overwhelming the database.
  • Leverage idempotent operations to make retries safe.
  • Use centralized logging and monitoring for better observability.

This approach ensures your parallel database operations are robust, scalable, and maintainable.

⚠️ **GitHub.com Fallback** ⚠️