Scala Hadoop Shred - OXYGEN-MARKET/oxygen-market.github.io GitHub Wiki
HOME > SNOWPLOW TECHNICAL DOCUMENTATION > Enrichment > Scala Hadoop Shred
Scala Hadoop Shred is Hadoop job, written in Scalding(Scala API for Cascading) and allowing you to split (shred) Snowplow enriched event, produced by Scala Hadoop Enrich into separate enrities. Scala Hadoop Shred utilizes the scala-common-enrich Scala project to load enriched events.
You will typically run the Scala Hadoop Shred jar as part of an EMR jobflow, started by EmrEtlRunner. It is designed to be used downstream of the Scala Hadoop Enrich and upstream of StorageLoader.
Scala Hadoop Shred has two primary tasks:
- Shred enriched event into
atomic-event
TSV and associated JSONs - Make
event_id
s for all events unique
Snowplow enriched event is 131-column TSV file, produced by Scala Hadoop Enrich. Each line contains all information about specific events, including id, timestamps, custom and derived contexts and many more.
Shredding is process of splitting EnrichedEvent
TSV into following parts:
-
Atomic event. TSV line very similar to
EnrichedEvent
but not contining JSON fields (contexts
,derived_contexts
andunstruct_event
). Result will be stored in path similar toshredded/good/run=2016-11-26-21-48-42/atomic-events/part-00000
and will be available to load via StorageLoader or directly via Redshift COPY. -
Contexts. This part consists of two extracted above JSON fields:
contexts
andderived_contexts
, which are validated (on enrichment-step) self-describing JSONs. But unlike usual self-describing JSONs consisting ofschema
string anddata
object, these ones consist ofschema
object (like in JSON Schema), usualdata
object andhierarchy
object. Thishierarchy
contains data to later join your contexts SQL tables withatomic.events
table. Result will be stored in path similar toshredded/good/run=2016-11-26-21-48-42/com.acme/mycontext/jsonschema/1-0-1/part-00000
, where files likepart-00000
are valid NDJSONs and will be available to load via StorageLoader or directly via Redshift COPY. -
Self-describing (unstructured) event. Pretty much like contexts this is
same JSON with
schema
,data
andhierarchy
fields. The only difference is that it has one-to-one relation toatomic.events
, whereas contexts have many-to-one relation.
Shredding is classic example of Hadoop mapper - each line (event) is independent of each other, it is a function which has single input and output.
More details on what shredding is can be found on dedicated shredding page.
Duplicates is common problem in event pipelines, it is described
manytimes. Basically
problem is that we cannot guarantee that every event has unique UUID
because
- we have no exactly-once-delivery guarantee
- some user-side software makes events send more than once
- flawed algorithms
There are four strategies planned for Scala Hadoop Shred's deduplication:
Strategy | Batch? | Same event ID? | Same event fingerprint? | Availability |
---|---|---|---|---|
In-batch natural de-duplication | In-batch | Yes | Yes | R76 Changeable Hawk-Eagle |
In-batch synthetic de-duplication | In-batch | Yes | No | R86 Petra |
Cross-batch natural de-duplication | Cross-batch | Yes | Yes | R88 Angkor Wat |
Cross-batch synthetic de-duplication | Cross-batch | Yes | No | Planned |
We will cover these in turn:
As of R76 Changeable Eagle-Hawk release, Hadoop Shred de-duplicates
"natural duplicates" - i.e. events which share the same event ID (event_id
)
and the same event payload (based by event_fingerprint
), meaning that they are
semantically identical to each other. For a given ETL run (batch) of events
being processed, Hadoop Shred keeps only first out of each group of natural
duplicates; all others will be discarded.
To enable this functionality you need to have Event Fingerprint Enrichment
enabled in order to correctly populate event_fingerprint
property.
As of R86 Petra, Hadoop Shred de-duplicates
"synthetic duplicates" - i.e. events which share the same event ID (event_id
),
but have different event payload (based by event_fingerprint
), meaning that
they are can be either semantically independent events (caused by flawed
algorithms) or same events with slightly different payload (caused by
third-party software). For a given ETL run (batch) of events being processed,
Hadoop Shred uses following strategy:
- Collect all events with identical
event_id
left after natural-deduplication - Generate new random
event_id
for each of them - Create
duplicate
context original withevent_id
to each event where duplicatedevent_id
was found
There is no configuration required for this functionality - de-duplication is
performed automatically in Hadoop Shred, but it is highly recommended to use
Event Fingerprint Enrichment
in order to correctly populate event_fingerprint
property.
With cross-batch natural de-duplication, we have a challenge: we need to track events across multiple ETL processing batches to detect duplicates.
We don't need to store the whole event - just the event_id
and the event_fingerprint
metadata.
And we need to store these in a database that allows fast random access - we chose Amazon DynamoDB, a fully managed NoSQL database service.
We store the event metadata in a DynamoDB table with the following attributes:
-
eventId
, a String -
fingerprint
, a String -
etlTime
, a Date -
ttl
, a Date
A lookup into this table will tell us if the event we are looking has been seen before based on event_id
and event_fingerprint
.
We store the etl_timestamp
to prevent issues in the case of a failed run.
If a run fails during Hadoop Shred and is then rerun, we don't want the rerun to consider rows in the DynamoDB table which were written as part of the prior failed run; otherwise all events in the rerun would be rejected as dupes!
It is clear when we read the event metadata from DynamoDB: during the Hadoop Shred process. But when do we write the event metadata for this run back to DynamoDB? Instead of doing all the reads and then doing all the writes, we decided to use DynamoDB's conditional update to perform a check-and-set operation inside Hadoop Shred, on a per-event basis.
The algorithm is simple:
- Attempt to write the
event_id-event_fingerprint-etl_timestamp
triple to DynamoDB but only if theevent_id-event_fingerprint
pair cannot be found with an earlieretl_timestamp
than the provided one - If the write fails, we have a natural duplicate
- If the write succeeds, we know we have an event which is not a natural duplicate (it could still be a synthetic duplicate however)
If we discover a natural duplicate, then we delete it. We know that we have an "original" of this event already safely in Redshift (because we have found it in DynamoDB).
In the code, we perform this check after we have grouped the batch by event_id
and event_fingerprint
; this ensures that all check-and-set requests to a specific event_id-event_fingerprint
pair in DynamoDB will come from a single mapper.
To enable cross-batch natural de-duplication you must provide a DynamoDB table configuration to EmrEtlRunner and provide necessary rights in IAM. If this is not provided, then cross-batch natural de-duplication will be disabled. In-batch de-duplication will still work however.
To avoid "cold start" problem you may want to use Event-manifest-populator Spark job, which backpopulates duplicate storage with events from specified point in time.
To make sure DynamoDB table is not going to be overpopulated we're using DynamoDB Time-to-Live feature, which provides automatic cleanup after specified time.
For event manifests this time is etl timestamp plus 180 days and stored in ttl
attribute.
Cross-batch deduplication uses DynamoDB as transient storage and therefore has associated AWS costs. Default write capacity is 100 units, which means no matter how powerful your EMR cluster is - whole Hadoop Shred can be throttled by AWS DynamoDB. Rough cost of default setup is 50USD per month, however throughput can be tweaked according to your needs.
This section hasn't been written yet.