3. Consumers - aegisql/conveyor GitHub Wiki

Table of Contents

Consumers

Conveyor provides two functional interfaces for post-processing pipeline events.

@FunctionalInterface
public interface ResultConsumer<K,V> extends SerializableConsumer<ProductBin<K,V>> {
    default ResultConsumer<K,V> andThen(ResultConsumer<K,V> other)

    default ResultConsumer<K,V> filter(SerializablePredicate<ProductBin<K,V>> filter)
    default ResultConsumer<K,V> filterKey(SerializablePredicate<K> filter)
    default ResultConsumer<K,V> filterResult(SerializablePredicate<V> filter)
    default ResultConsumer<K,V> filterStatus(SerializablePredicate<Status> filter)
    default ResultConsumer<K,V> filterProperty(String property, SerializablePredicate<Object> filter)
    default ResultConsumer<K,V> propertyEquals(String property, Object value)

    default ResultConsumer<K,V> async(ExecutorService pool)
    default ResultConsumer<K,V> async()
}
@FunctionalInterface
public interface ScrapConsumer<K,V> extends Consumer<ScrapBin<K,V>> {
    default ScrapConsumer<K,V> andThen(ScrapConsumer<K,V> other)

    default ScrapConsumer<K,V> filter(SerializablePredicate<ScrapBin<K,V>> filter)
    default ScrapConsumer<K,V> filterKey(SerializablePredicate<K> filter)
    default ScrapConsumer<K,V> filterScrap(SerializablePredicate<V> filter)
    default ScrapConsumer<K,V> filterScrapType(SerializablePredicate<Class<?>> filter)
    default ScrapConsumer<K,V> filterFailureType(SerializablePredicate<FailureType> filter)
    default ScrapConsumer<K,V> filterError(SerializablePredicate<Throwable> filter)
    default ScrapConsumer<K,V> filterProperty(String property, SerializablePredicate<Object> filter)
    default ScrapConsumer<K,V> propertyEquals(String property, Object value)

    default ScrapConsumer<K,V> async(ExecutorService pool)
    default ScrapConsumer<K,V> async()
}

Default Methods

andThen - creates consumer chains.

ResultConsumer<Integer, String> rc = bin -> System.out.println(bin.key);
rc = rc.andThen(bin -> System.out.println(bin.product));

Filters - narrow events by key, status, payload type, failure type, or bin properties.

rc = rc.filterStatus(status -> status == Status.READY)
       .propertyEquals("channel", "primary");

async - dispatch consumer execution to a pool (default: ForkJoinPool.commonPool()).

rc = rc.async();

Nonblocking Consumers Collection

Package: com.aegisql.conveyor.consumers

Highlights:

  • IgnoreResult / IgnoreScrap
  • LogResult / LogScrap
  • LastResultReference / LastScrapReference
  • LastResults / LastScraps
  • FirstResults / FirstScraps
  • ResultCounter / ScrapCounter
  • ResultQueue / ScrapQueue
  • ResultMap / ScrapMap
  • StreamResult / StreamScrap
  • PrintStreamResult / PrintStreamScrap

JDBC Result Consumers

Conveyor provides JDBC-backed result consumers for writing result data to a database using plain JDBC.

Classes:

Support utilities:

Connection behavior:

  • Shared variants (JdbcProductResultConsumer, JdbcBinResultConsumer) open one connection and reuse it for all operations.
  • Pooled variants (PooledJdbcProductResultConsumer, PooledJdbcBinResultConsumer) acquire and close a connection for each execute/batch call (returning it to the pool).

Mapping model:

  • SQL placeholder order is defined by the mapper list order.
  • Product consumers map from product (OUT) only.
  • Bin consumers map from ProductBin<K,OUT> and can use key/status/properties/timestamps.

List handling:

  • If product is scalar, one SQL operation is executed.
  • If product is Iterable, consumers execute a JDBC batch.
  • For bin consumers with list products, each element is repackaged into a new ProductBin preserving original bin metadata.

Transaction behavior:

  • If Connection#getAutoCommit() is false, the executor calls commit() after successful execute/batch.
  • On JDBC failure, executor attempts rollback() (when auto-commit is disabled) and throws JdbcExecutionException.

Example: shared connection, mapping from product fields.

ResultConsumer<Integer, User> rc = new JdbcProductResultConsumer<>(
    connectionSupplier,
    "insert into users(id, email) values (?, ?)",
    User::id,
    User::email
);
conveyor.resultConsumer().first(rc).set();

Example: pooled connection, mapping from full ProductBin metadata.

ResultConsumer<Integer, User> rc = new PooledJdbcBinResultConsumer<>(
    connectionSupplier,
    "insert into results(conv_key, status, source, email) values (?, ?, ?, ?)",
    bin -> bin.key,
    bin -> bin.status.name(),
    bin -> bin.properties.get("source"),
    bin -> bin.product.email()
);
conveyor.resultConsumer().first(rc).set();

Loaders

ResultConsumerLoader

Prefer obtaining via conveyor:

ResultConsumerLoader<Integer, User> rcl = conveyor.resultConsumer();
// or
ResultConsumerLoader<Integer, User> rcl = conveyor.resultConsumer(resultConsumer);

Supported operations:

  • chain composition: first(...), andThen(...), before(...)
  • scoping: id(...), foreach(), foreach(predicate)
  • timing/priority: creationTime(...), expirationTime(...), ttl(...), priority(...)
  • properties: addProperty(...), addProperties(...), clearProperty(...), clearProperties()
  • apply: set()

ScrapConsumerLoader

Obtained via conveyor.scrapConsumer().

Supported operations:

  • first(...)
  • andThen(...)
  • set()

Scrap consumer customization is conveyor-level.

Examples

Build ID=1 to stdout (default), even IDs to stderr:

AssemblingConveyor<Integer, UserBuilderEvents, User> c = new AssemblingConveyor<>();

c.resultConsumer().first(LogResult.stdOut(c)).set();

c.resultConsumer()
 .foreach(k -> k % 2 == 0)
 .first(LogResult.stdErr(c))
 .set();
⚠️ **GitHub.com Fallback** ⚠️