Pusher - Titousensei/sisyphus GitHub Wiki
This is the main processing class.
The Pusher is use to declare the actions to be done for each row can be chained in a script-like declarations. Finally, the push(input) command will start the execution. Multiple inputs can be specified, they will be processed one after the other.
Contract:
- The actions will be performed in the order of declaration for each row.
- The order of the rows from the input is preserved in the outputs.
Pusher runs the actions sequentially in a single thread, because in most cases the I/O is the bottleneck and the processing does not benefit from being multithreaded. If the CPU is the bottleneck (because of some custom modifier, for instance), there is an option to run rows in multiple threads, with the command parallel(num_threads). However, running in parallel will not preserve the row order between the input and the output. (The order of the actions is still guaranteed.)
Typical speed (depending on the hardware):
- 2M rpm (million rows per minute) for file inputs and file outputs
- 10M rpm input from files, outputs to memory
- 100M rpm for memory only processing
Pusher can be given a label to distinguish their log messages. It can also be invoked with a limit (to process only the first N rows), or to process only a random sample of the rows.
Example:
// Schemas are often used multiple times and can be declared final static
String[] data_schema = new String[] { "guid", "category", "url", "title" };
// Preparing the Inputs, Modifiers, Keys and Outputs
Input in_data = new InputFileGroup(DIR_INDEX, "newdata.*.gz", data_schema);
KeyBinding bind_prod_hash = KeyMap.load(FILE_PROD_HASH, "guid", "#prod");
Modifier prod_to_hash = new ColumnsHashLong("#prod", "category", "title");
Output out_new = new OutputFile(FILE_NEW, data_schema);
Output out_changed = new OutputFile(FILE_CHANGED, data_schema);
Output out_same = new OutputFile(FILE_SAME, "guid");
// Declaring the Pusher actions and starting the push()
new Pusher()
.always(prod_to_hash) // populate #prod
.ifMiss( bind_prod_hash, new BreakAfter(out_new))
.ifDiff( bind_prod_hash, new BreakAfter(out_changed))
.ifMatch(bind_prod_hash, new BreakAfter(out_same))
.push(in_data);