Example implementation - Titousensei/sisyphus GitHub Wiki

Here's the code corresponding to the DVDs example from the README.

// first push: read rented.txt into a hashtable
KeyMap rented = new KeyMap("#title", "new_rented"); // long->int hashtable
new Pusher()
    .always(new ColumnHashLong("#title", "title"))  // long=hash(string)
    .always(new KeyMapIncrement(rented, 1))
    .push(new InputFile("rented.txt", "title"));

// second push
static final String[] SCHEMA = new String[] {
    "title", "category", "msrp", "price", "num_rented"
};

Input in_daily    = new InputFile("dvd.tsv.gz", SCHEMA);
Modifier sale_mod = new SalePrice("price", "msrp", 0.9);
Output out_daily  = new OutputFile("dvd_new.tsv.gz", SCHEMA);

new Pusher()
    .onlyIf(new IfEquals("category", "comedy"), sale_mod)
    .always(new ColumnHashLong("#title", "title"))
    .ifMiss(rented, BreakAfter.NO_OP) // skip the rest if not in KeyMap rented
    .always(new KeyMapGetter(rented))) // populate new_rented
                // "num_rented" = "num_rented" + "new_rented"
    .always(new Add("num_rented", "num_rented", "new_rented"))
    .push(in_daily);

Implementation of the sales price computation:

public class SalePrice extends Modifier
{
  protected final double price_coef_;

  public SalePrice(String outcol, String incol, double price_coef)
  {
    super(new String[] { incol }, new String[] { outcol });
    price_coef_ = price_coef;
  }

  public void compute(String[] input, String[] result)
  {
    try {
      double msrp = Double.parseDouble(input[0]);
      result[0] = String.format("%.2f", msrp * price_coef_);
    }
    catch (NumberFormatException nfex) {
      result[0] = null;
      warning("Unparsable msrp");
    }
  }
}

Previous: Common Use Case