Fault Tolerant Indexing - nsoft/jesterj GitHub Wiki

Fault Tolerant Indexing

Fault Tolerance (WIP)

Fault tolerance is the property of gracefully recovering from a failure. In the case of search ingestion we primarily care about the case where some or all of the infrastructure suffers an ungraceful shutdown or disconnection. Additionally we will want to handle cases involving errors during processing.

Shutdown

When a document is being processed it is initially read or received from a source. After it is read/received it spends some amount of time progressing through steps that conduct a variety of transformations, and enrichments. At one or more points the document, or information derived from the document is transmitted to one or more external systems. In our case the primary focus is transmission to a search engine, but transmission to multiple search engines, or to secondary systems may also be important. For example a document in a a back end filesystem or CMS might be read, and processed by JesterJ. The plan with which JesterJ is configured might include a step to emit the extracted text from the document to a search engine for indexing, and another step to ftp the original document to a filesystem served by an http server that will be linked in the search results.

A Shutdown is an event in which a node or service/step in the system quits with notification of its departure. The node/step shutting down may wait to drain all, some or none of presently queued work. In the event of a shutdown with incomplete draining of processing tasks, a document from the source flowing through such a pipeline might have been:

  1. Undiscovered, and not (yet) processed in any way

  2. Discovered and read, but not emitted to either the search engine or the http server

  3. Indexed but not written to the http server

  4. Written to the http server but not indexed

  5. Fully processed and written to both the http server and the search engine

The key aspect that distinguishes shutdown however is that we are notified that there is a need to clean-up, and direct work elsewhere. There is potential for a shutdown to be temporary, but except in the case of very short temporary shutdowns, there is no advantage in treating this case specially.

Disconnection

Disconnection is similar to shutdown except the system that becomes disconnected has no opportunity to notify any other system of its unavailability. Such a situation may be limited to inability to accept or emit documents, but more frequently the system is entirely disconnected. Since latency is normal and in small quantities does not indicate a disconnection, there needs to be a response latency threshold, which when exceeded generates an event indicating the loss of the node. Once this event is generated the response will be similar to a shutdown with zero drain.

Imagine a processing system comprised of multiple nodes cooperating to index various documents. If one node looses network connection, its processing might be delayed indefinitely. After some time, it may be necessary to assume that the node has failed and ensure that documents are not sent to it. The document must be restarted on separate nodes IF there is a complete path to success remaining in the system.

A system that has lost a node has one of three possible states: 1. Incapacitated - the lost node was the only node capable of certain processing feats required for all documents. No documents can be processed. This is a simpler case and can (mostly) be treated as if a full system shut down occurred. 1. Disabled - the lost node was the only node capable of certain processing feats required for some documents but not all documents. Some documents may be processed and others need to be delayed until the node returns. 1. Diminished - the lost node did not contain unique capabilities, and the system may have lost capacity but all documents may still be properly processed. In this case the challenge is to time out the processing of documents that were "in flight" in the lost node and if the lost node re-joins invalidate any processing that has already been restarted.

Additionally, any of the above states might be temporary or "permanent" (here permanent refers to any condition where the system must await manual intervention by a human, i.e. until the missing node can be restarted). In the event that the capabilities are the result of the assignment of steps to particular nodes those steps could be reassigned to remaining nodes, and in the event that an auto-scaling scheme is in use, new nodes may be started to compensate. However if the steps require specific hardware or software that is not available, or for which additional licenses are not available, starting new nodes may not be an option (making the above states permanent).

Errors

The third type of fault that may be experienced is an error at the document level. This is distinguished by the fact that the problematic condition is not detectable until an attempt is made to process a document. Such errors may be "universal" or "document specific" meaning that in the former case the error will occur for a set of documents delimited exclusively by a time period in which processing is attempted, or in the latter case a set of documents defined by some other criteria, such as PDF documents that have been encrypted. Additionally, errors may be considered either transient, or permanent. Thus we can draw a set of quadrants like this:

             | Transient | Permanent |
-------------+-----------+-----------+
Universal    |     A     |    B      |
-------------+-----------+-----------+
Doc Specific |     C     |    D      |
-------------+-----------+-----------+

Some examples of each quadrent are:

  • A. Timeouts due to overloaded local processor; Failure to write to the search engine due to network outage; External database down or overloaded;

  • B. Incorrect configuration resulting in Exception or inability to connect.

  • C. External source for Geo Coding unavailable (if not all documents require geo-coding);

  • D. Encrypted PDFs for which text cannot be extracted; Documents missing a required field leading to an Exception; Documents with malfomred data i.e. unclosed tags in XML

Quadrants A and D are most common with C usually being essentially the same as A, but for a service only required by a subset of documents. Quadrant B usually represents a programmer or configuration error. Furthermore if the errors are universal and permanent then all documents fail to be written to one or more outputs.

Permanent errors generally require human interaction for resolution. A well designed system should identify such errors when possible, attempt to notify the relevant humans, and not waste resources re-attempting processing. On the other hand a document experiencing a transient error should be re-attempted after a delay.

Finally, to bring the discussion full circle, one can think of a document that entered a processing stage and never exited due to a shutdown or disconnection as having experienced a universal, transient error condition. (Quadrant A) Thus our key tasks in fault tolerant indexing are detecting/categorizing errors, marking/tracking documents for retry or preventing hopeless retries, and of course communicating errors to the humans in charge.

The net result of the foregoing is that from the perspective of the document we only need to handle two cases Transient errors (where the document should be processed again at the next available opportunity) and permanent errors where the document must not be processed again (at least not until a new version of it is provided).

From the perspective of the system and routing of documents we will want to (eventually) detect and adapt to shutdowns and unavailability.

Fatal vs Nonfatal Errors

There are perhaps cases where a processing step is unable to complete all it’s prospective tasks, but having the document arrive in the index partially processed is better than having the document not be searchable at all. For the purposes of this discussion we only consider fatal errors where processing was halted. A notification and monitoring infrastructure might be contemplated later to facilitate handling of these cases (i.e. queuing the document again). For the near term handling of non-fatal exceptional cases will be handled by the user’s custom processors, and not the FTI infrastructure.

Error Detection

Programer Detection

In the simplest cases, the nature of the error is anticipated and diagnosed in code by the author of a particular Document Processor. In this case the programer should be able to effortlessly identify whether or not the error is transient or permanent. By definition a document with a permanent error must never be re-processed unless it has changed in some fashion. The system might detect this change (by a change in the hash code) or an administrator may wish to signal this change directly with a command specifying that the document should be reprocessed.

Heuristic Detection - Repetition

In the face of an error that is not specifically anticipated by the programer, the error type may be initially assumed to be transient and the document automatically resubmitted, possibly after a delay. After the same document produces the same error for a certain number of attempts exponential back off may be required, but eventually it may be declared to have an unknown fatal flaw and marked as failed. Subsequent ingestion then requires a human to examine and resubmit the document.

Huristic Detection - Timeout

If a document starts processing in a stage, and does not exit the stage in a timely manner it may be declared failed. In this case processing must be interrupted before the status indicating failure is set for the document. Otherwise the document might be omitted just after the status was set creating a duplicate processing of the document.

Document Status Tracking

The notion that a document has a singular status is a reduction from the trivial case of a perfectly linear pipeline for a particular document. In truth, the document itself is entirely unimportant. What is important is the execution of steps that have an possible external side effect, and ensuring that when routers clone documents each path to which a clone might be routed is completed. Therefore a status relates to the tuple <docId,source,cloning_step_list,destination>

External Side Effects

Destinations always involve some sort of persistent external side effect in a system external to the JesterJ. There is no point in running a step that has no persistent side effect when it is not followed by a subsequent step that produces a side effect. Common examples would be:

  • Indexing the document into a search engine

  • Placing an event on a queue notifying external listeners of the document’s availability

  • Archiving a copy of the original document to a file system or other store.

  • Initiating an incremental update to corpus statistics every Nth document.

ℹ️
I am of course excluding side effects like running up the energy bill at the hosting company, or a DOS attack vs other processes on the machine. These are not supported use cases.

Types of Side Effecting Steps

There are 3 possible levels of side effect which we will refer to as SAFE, IDEMPOTENT and POTENT.

  • SAFE steps are ones that have no side effects. These steps do not need to be tracked for fault tolerance.

  • IDEMPOTENT steps are ones that do have a side effect but can be repeated any number of times. The order of a set of IDEMPOTENT steps is important but if the same idempotent steps are performed in the same order the result should also be IDEMPOTENT. In JesterJ step order is only guaranteed within a particular path through the plan’s DAG, so there’s no need to track order of execution for the purposes of fault tolerance. Designing a processing plan with execution order dependencies across disparate paths through the DAG is a failed plan design.

  • POTENT is a term I am coining for this discussion and for use in the code to take the place of the cumbersome and verbally confusing "NON-IDEMPOTENT". This refers to any step which must only be executed once for a particular document, such as a step that invokes an API that incurs a usage fee, or increments a persistent value.

It is important to note that the handling of POTENT steps is very closely tied to the notion of document identity and the built in data derived hash codes that drive our default fault tolerance indexing implementation will consider versions of a document to be distinct documents (i.e. if you update a word doc with new formatting elements, it would change the hash code and therefore be handled as a novel document.

Constraints on step ordering

It is impossible to guarantee proper handling of a document in a fault tolerant manner in any case where a non-safe step follows a POTENT step unless the POTENT step can be skipped. Therefore it is also important to track whether or not the step is

  • MUTATING - the document is altered and the output of the step differs from the input, including steps that produce children

  • PRESERVING - the document is unaltered and the output of the step that will be handed to the next step exactly matches the input, drop or otherwise ignore the document.

Fault tolerance is not possible if a step that is both POTENT and MUTATING step is also emitting the document for further processing. The use of any such step might be considered an anti-pattern since there is no point in causing mutations that will not be recorded somewhere, and if the mutations are made in advance of persistence, they could have been a separate step. Good step design will choose between either POTENT or MUTATING, but not all systems need fault tolerance so this will only result in an error if fault tolerance is enabled and a step that is both POTENT and MUTATING is has successor steps.

Delivery Guarantees

For reasons discussed very well elsewhere the highly desirable ideal of only once delivery is theoretically impossible if one does not control both the sender and receiver. This leaves us with the options of "at least once" delivery and "at most once" delivery. These guarantees are technically easy to meet, but of course the goal is to keep the result as close to "only once" delivery as possible. In JesterJ the strategy is to write a system that would give only once delivery except for the "two generals problem" and then write the status to Cassandra and conduct the send in close proximity. The order of the operations would determine which guarantee we provide. If we write the status first, and then conduct the send the result is no more than once delivery. If we conduct the send and then write the status the result will be at least once delivery. This will imply that our divergence from only once delivery will be exacerbated by the overloading of the system running JesterJ since that will raise the likelihood and duration of thread pausing between the two critical operations. The goal is to diverge in a predictable manner. The first option JesterJ will provide in 1.0 is "at most once" and then subsequently the option to use "at least once" for some or all outputs will be added.

Child Documents

ℹ️
Child documents are only partially supported until issue #177 is complete. As such, some details of this section might change.

Another common aspect of document processing is the need to split documents into child documents. A parent document cannot be considered complete unless all children (or children’s children) have completed. It is necessary to track both the child document id (to ensure children are not processed twice by POTENT steps) and the parent ID (to ensure that parent documents are re-ingested if some children have not been processed). This becomes even more complex if children can be further split int sub-children. This type of design could easily crop up if single large XML, CSV or JSON documents need to be represented as a complex set of documents in an index.

Identifying children uniquely

To handle arbitrary layers of children it is necessary to to track:

  • id (the current doc)

  • parent id (the most immediate parent document)

  • original id (the source document)

  • The hash of the original document to distinguish children generated by other versions of the same original.

The original ID can be used to fetch the original source for reprocessing, and coordinate the reprocessing of sibling or otherwise related documents. This coordination is important because if a million line CSV is meant to create a million documents in a search index, and processing fails after only 100k docs are in the index, we want to neither reprocess those 100k nor do we want to trigger a reprocessing of the entire CSV for every inflight document that was dropped (we should only reprocess the CSV once, not hundreds or thousands of times!). To avoid wasted reprocessing, upon detecting a failed child document we will need to gather all records for children of the original and ensure that the re-feed of the original marks all related failures as in progress.

Completing the parent document.

This is a really thorny problem involving collecting the statuses across many children (possibly across multiple generations of document splitting). To handle our million line CSV hypothetical above, and imagining one column of the CSV contained a JSON structure that implied multiple levels of sub-children we would need to find the set of successful and unsuccessful children, and the parent record. To succeed we need to consider the following: - We must mark all failed children as processing in a single atomic operation. - We need to ensure that children already processed are skipped. - For correct end results all POTENT steps must identify and exclude previously successful children. - Though not necessary for correctness, to reduce load we also likely want to skip all SAFE and IDEMPOTENT steps for previously successful documents.

So the basic step infrastructure will want to filter previously successful children as soon as they are detected.

Routing Complications

The above complexity in knowing if all child documents have been processed is complicated by the possibility that the hypothetical million line csv produces a variety of children that require different processing steps. JesterJ allows for a DAG of processing steps for just this reason, but if there are POTENT steps for which some child docs (or even some parent docs) do not reach by design, then we need to avoid marking the document and the child document incomplete when they have not reached that step. To identify cases where a document had correctly skipped a branch that has a POTENT step, we need to also account for routing decisions.

Conceptually there are two possible attributes of a router, determinism and distribution.

Determinism

An example of a deterministic router is RouteByStepName which looks at a field value and matches it with the name of a down stream step. An example of a non-deterministic router is RoundRobinRouter which balances load across downstream steps.

Distribution

A fully distributive router is one that sends an identical copy of a document to every downstream steps. A partially distributive router sends a copy to only a portion of the downstream steps. RouteByStepName is partially distributive and DuplicateToAll is fully distributive

Example

The following image shows an example plan with routers that vary in both dimensions

complex routing
ℹ️
An example plan with complicated, hard to track routing. The blue step is a scanner (document source), Red steps are document outputs (POTENT steps), Green shading indicates a deterministic router and blue shading indicates a non-deterministic router. In a real document processing plan the "ProcessTypeA" and "ProcessTypeB" would normally a path of several steps, but are shown as a single step here for brevity.
⚠️
Though the above example uses writes to solr as a POTENT step, this would indicate an abnormal (and possibly flawed) design for your Solr index. Typically writes to Solr are IDEMPOTENT.

Discussion

Fault tolerance can only be achieved to the extent we are able to predict the destination of the document. When faced with a non-deterministic routing scenario any valid downstream path represents a completion. Plan authors should particularly understand that because document order is not guaranteed round robin routing may be non-deterministic, especially in plans that have multiple sources feeding into the router. Futhermore, any cases where documents experience transient errors and are re-attempted will have non-deterministic destinations. Therefore the fundamental concept expressed by authoring a plan with round-robin (or any other routing that does not depend deterministically on the content of the document) is that any destination down stream is acceptable. Futhermore, the existence of branching that leads to multiple destinations down stream of non-deterministic routing means that partial failure AFTER routing can lead to multiple deliveries to the same destination. Below are some minimalist images of potential scenarios.

NonDetRoutingCases

S1, S2 and S4 are generally safe because the elements down stream of the router are not complex. S3 is dangerous because the paths are not equivalent. In the S3 case, each of the two paths to get to R3 are considered destinations (R3←via→DL3 and R3←via→DR3). If only one of the duplicates created by DAR3 arrives at R3 before a power failure, the document will be re-processed, and on the next processing attempt that document might be routed to either L3 or DAR3. If it is routed to DAR3, then duplicates will be created again and BOTH duplicates will reach R3, (and thus a total of 3 copies of the document will have reached destinations instead of 2).

Note that in the event of repeated failures with only one document reaching R3 (i.e. OOM after transmission every time one copy reaches R3) the number of possible documents arriving at destinations is theoretically unbounded (though in this diagram increasingly unlikely). If such a scenario is not acceptable then S3 it is a bug in the plan design.

Out of Scope Issues

Merging and Sorting

We will not attempt to handle merging and Sorting of documents across the DAG. This type of design is a major anti-pattern because it implies a retention of arbitrary numbers of documents. Such tasks must be handled outside of JesterJ in a DBMS or other system designed to hold large sets of data and perform calculations on sets of data efficiently. JesterJ is not a database and will not try to support database operations. Systems like this will likely require custom code to stash intermediate states in a store that can perform these operations, and then initiate a second plan for further processing, or at least be read by a second input/source step (scanner) in the plan.

Transmission of full version history.

JesterJ will not endeavor to solve what I will call the "Transient Version" problem. Specifically FTI will not ensure that all versions of a document are sent to all destinations for the following case:

  1. Document X is scanned

  2. Document X traverses the configured plan

  3. Document X is emitted to destinations A and B

  4. Document X is updated to v2

  5. On a subsequent scan Document X(v2) is scanned.

  6. Document X(v2) traverses the configured plan

  7. Document X(v2) is emitted to destination A

  8. Document X(v2) is NOT emitted to destination B because of a JesterJ shutdown/crash/power issue

  9. Document X is updated to v3 before JesterJ is restarted

  10. JesterJ is restarted

  11. On startup, JesterJ will re-read X and send it for processing again.

  12. Document X(v3) is emitted to destinations A and B

⚠️
X(v2) will never reach B

This is because JesterJ does not store the content of documents in a persistent store while processing. Such a feature requires a lot of extra disk IO or network traffic. Cassandra is used for FTI and massive numbers of deletes of full document content are…​ "worrisome". If at some time JesterJ does solve this issue it will be as a configurable option, and turning on that option may require configuration of a secondary data store.

ℹ️
If such a use case is truly important to you there’s nothing to stop you from writing a processor node that records what was sent into the plan in the first step, and then adding a callback in Solr as a custom update processor that can mark it as "received". Your custom processor would then load the missed version(s) and emit a document for each version, and the current version. Assuming these were all destined for a constant path they should then arrive in correct order (though we don’t have any tests for this right now). Obviously any randomizing routers or versions that take different paths could lead to out of order delivery

Ordered Delivery

Since JesterJ allows for routing documents through paths that may have a different number of steps, and may perform differing operations that take arbitrary amounts of time, there is no way to guarantee that sending documents A, B, C will result in them being indexed in the same order.

Pseudo Code

The below code is meant to represent what needs to happen for a single node system that does not need to worry about the possibility of other nodes being up. Additional logic will be required when clustering is introduced for 2.0.

System Shutdown

There is no special code to run on system shutdown. Since it is not acceptable to loose anything if we experience a loss of power at the physical hardware level, taking any precautions on shutdown can only mask the flaws we want to fix. Every shutdown should be a kill -9.

System Startup

Based on the foregoing shutdown goals we must not make assumptions regarding the state when we shut down.

parseArgs()
if (argsIndicateStartProcessing()) {
  initializePersistentStore()
  plan = loadPlanFromUserSuppliedJar()
  if(newPlanVersion(plan)) {  // based on serialVersionUID of plan
    doPlanMigration() // (future feature)
  }
  plan.start()
}

Within the plan the scanners are in charge of handling the discovery of their own incomplete documents.

  incompleteDocIds = queryPersistentStoreForIncompleteDocs() // in progress and transient errors
  processIncompleteDocuments(incompleteDocIds)
  initiateNormalScanning()
void  processIncompleteDocuments(incompleteDocIds) {
  for (incompleteDocIds) {
    doc = makeDoc(docId)
    doc.setIncompleteDestinations(determineIncompleteDestinations())
  }
}

Step Processing

doc = queue.takeDoc()
if (this.isSafe() || doc.getIncompleteDestinations().contains(this) {
  processDoc(doc)
} else {
  // skip any destinations not listed on the doc
  sendToNext(doc)
}

ChildCreation

ℹ️
this is subject to change until issue #177 is complete.

The JesterJ system will give authors of Processor implementations the freedom to create arbitrary child documents, but to have those children properly participate in FTI a few steps will be necessary, and a utility method to facilitate these steps should be added.

Document createChild(parent,step) {
  child = new Document()
  child.setParentId(parent.getId())
  child.setParentVersion(parent.getContentHash())
  child.setId(calculateChildId(parent))
  child.setDownstreamPotentSteps(step.downStreamPotentStepsAndSelfIfPotent())
  return child
}

Document Identity

The following factors contribute to the identity of a document and make it unique.

  • The source URI - this URI must locate the document such that the running instance of JesterJ can use the plan to read it without further configuration. This URI must encode a URL that can be used to retrieve the original source document.

  • The scanner that read the document. Note that it is entirely valid for two scanners to read the same source documents.

  • The plan containing the scanner that read the document so that if a new plan is run, old documents will not interfere.

  • The identity of the original parent document (if child).

  • A childDoc identifier (if child) - by default a monotonically increasing integer in processing order, but also possibly a value from the dat if that value can be assured to be unique among all siblings. Note UUIDs created in processing steps are discouraged unless previously generated due to their impact on performance due to the use of globally synchronized methods and the potential for entropy exhaustion.

Finally, it’s important to note that the identification of child documents must be strictly deterministic such that we never create a child doc with a different id on a subsequent re-process.

Persistence for Processing

There are some persistence requirements that are absolutely required for correct processing of data. This section enumerates the bare minimum that can be stored to ensure proper processing.

Requirement

The above pseudo-code contains one query (at [1]) and one write (used twice at [2] and [3]). The query needs to pull back data that tells us what documents require reprocessing. The selection criteria in the single node case is fairly simple we want to identify the documents that:

  • Relate to this plan, or a prior version of it

  • Are in a status that is non-terminal

For those documents we need to know:

  • Which POTENT steps have or have not been processed.

Eventually (to resolve issue #177) we will also want to know

  • Which documents are children

  • The top level parent of any child doc

Implementation

Cassandra is best used with a heavy write, occasional read, rarely update/delete pattern. Status changes for documents can be thought of as events which naturally lend them to a time series data design. Thus we hope to write document status events ordered by timestamp, and when we have a question about a document, we will query the most recent events for that document and inspect the result.

Thus we need to write a record for the intention to produce an output (when we scan), and write a new event when we produce that output. Documents to be restarted are the set of documents that have not been updated with a terminal status. In other-words, if the most recent event for a document is not a terminal status, that document needs to be reprocessed.

Keys

We would LIKE to have a key for each event that contains all the elements of document identity above plus a time stamp. There are two problems.

First, since this is meant to be an event stream, and also because Cassandra is much better at writes than updates, we want to be sure we never overwrite an event that is previously written. The goal is to have our system processing documents very fast and we would want to be able to eventually support up to a million documents per second across many cooperating instances of JesterJ and thus a simple milisecond precision time stamp might well be insufficient for distinguishing consecutive writes. Thus two additional columns are added, one with a nano second precision, and another with a random number, because disparate machines will have differing start points for nano-time and might still overlap. Of these two columns, we could potentially eliminate the nano time one but within a single machine it provides some additional ordering that can be useful, so we won’t optimize it out until that seems necessary.

Secondarily we need to access only the documents for a particular scanner, plan and plan version, which implies a set of filter, but since we will also be sorting via time (to find the most recent status) and possibly also querying for events relating to a particular document (another filter) such a design does not play well with Cassandra. Cassandra is not designed for high numbers of filters restricting the results unless those filters are all partition keys and then all keys are required, so "all docs for a scanner" or the "last three events for a destination" is not efficient if docId is part of the key for example. However, since we always know the scanner, plan, plan version and destination, and we never need to read those columns from the results, there is a way to "cheat" and get some free filtering. To do this we create a separate keyspace for each scanner/plan/version and remove those values from the primary key. Thus every time we do a query we calculate the name of the keyspace from the scanner/plan/version thereby restricting the results as desired. An additional wrinkle is that the keyspace names in Cassandra have a limited length and therefore the arbitrary length of plan names and step names (and thus destination names) becomes an issue. To work around this the components that identify the keyspace are hashed into an MD5 hash which is then used as the ultimate keyspace name.

All of this eventually leaves us with a table that looks like:

  public static final String CREATE_FT_TABLE =
      "CREATE TABLE IF NOT EXISTS %s.jj_output_step_status (" +
          "docId varchar, " + // k1
          "docHash varchar, " +
          "parentId varchar, " +
          "origParentId varchar, " +
          "outputStepName varchar, " +
          "status varchar, " +
          "message varchar, " +
          "antiCollision int, " + // C3 avoid collisions on systems with poor time resolution
          "created timestamp, " + // C1
          "createdNanos int, " + // C2 best effort for ordering ties in timestamp, just the nanos
          "PRIMARY KEY (docId, created,createdNanos,outputStepName,antiCollision)) " +
          "WITH CLUSTERING ORDER BY (created DESC, createdNanos DESC);";

The above however is not suitable for detecting changes in documents, because there would be multiple tables to check per document so a second table is used for that which does not include outputStepName:

  public static final String CREATE_DOC_HASH =
      "CREATE TABLE IF NOT EXISTS %s.jj_scanner_doc_hash (" +
          "docId varchar, " +       // k1
          "created timestamp, " +   // C1
          "createdNanos int, " +    // C2 best effort for ordering ties in timestamp, just the nanos
          "antiCollision int, " +   // C3 avoid collisions on systems with poor time resolution
          "hashAlg varchar, " +     //
          "docHash varchar, " +
          "PRIMARY KEY ((docId),created,createdNanos,antiCollision)) " +
          "WITH CLUSTERING ORDER BY (created DESC, createdNanos DESC);";

The %s is replace by the calculated keyspace name which might be something like: jj_95d6881b327e0463dba77016ce1901b8

Queries

These tables support the following queries

When we need to find a list of documents for which processing is incomplete for a given status/scanner/plan/version we use the following query once for each non-terminal status we are interested in:

  static final String FIND_STRANDED_STATUS =
      "SELECT docid FROM %s.jj_output_step_status " +
          "WHERE status = ?" +
          " PER PARTITION LIMIT 1";

Since we wish to allow for a "heuristic" permanent failure scenario we need to check for non-errors back into the the event history and declare the document dead if there are no recent examples of success related statuses for that document. The per partition limit determines how far back in time we are looking. For the latest status we can set this to one.

  static final String FIND_HIST =
      "SELECT docid, status, created FROM %s.jj_output_step_status " +
          "WHERE docid = ? " +
          " PER PARTITION LIMIT ?";

Finally, to determine if a document has been seen before we use this query. Notably in this case the calculated keyspace omits the destination step portion, and so this table spans all destinations for a given scanner. This is important to avoid repeat processing in partial success interrupted by shutdown scenarios.

  static String FTI_CHECK_DOC_HASH = "SELECT docHash from %s.jj_scanner_doc_hash " +
      "WHERE docid = ? " +
      "LIMIT 1";

Indexes

Indexes are not great for write performance, so finding a way to eliminate this might be desirable in the future, but for expedience we support our need to filter on status with a single secondary index:

  public static final String CREATE_INDEX_STATUS =
      "CREATE INDEX IF NOT EXISTS jj_ft_idx_step_status ON %s.jj_output_step_status (status);";
⚠️ **GitHub.com Fallback** ⚠️