Bulk Loading - andrew-nguyen/titan GitHub Wiki
There are a number of configuration options and tools that make ingesting large amounts of graph data into Titan more efficient. Such ingestion is referred to as bulk loading in contrast to the default transactional loading where small amounts of data are added through individual transactions.
There are a number of use cases for bulk loading data into Titan, including:
- Introducing Titan into an existing environment with existing data and migrating or duplicating this data into a new Titan cluster.
- Using Titan as an end point of an ETL process.
- Adding an existing or external graph datasets (e.g. publicly available RDF datasets) to a running Titan cluster.
- Updating a Titan graph with results from a graph analytics job.
This page describes configuration options and tools that make bulk loading more efficient in Titan. Please observe the limitations and assumptions for each option carefully before proceeding to avoid data loss or data corruption.
This documentation focuses on Titan specific optimization. In addition, consider improving the chosen storage backend and (optional) index backend for high write performance. Please refer to the documentation of the respective backend for more information.
Enabling the storage.batch-loading
configuration option will have the biggest positive impact on bulk loading times for most applications. Enabling batch loading disables Titan internal consistency checks in a number of places. Most importantly, it disables locking. In other words, Titan assumes that the data to be loaded into Titan is consistent with the graph and hence disables its own checks in the interest of performance.
In many bulk loading scenarios it is significantly cheaper to ensure data consistency prior to loading the data then ensuring data consistency while loading it into the database. The storage.batch-loading
configuration option exists because of this observation.
For example, consider the use case of bulk loading existing user profiles into Titan. Furthermore, assume that the username property key is defined as in-unique, i.e. usernames must be unique across the entire graph. If the user profiles are imported from another database, username uniqueness might already guaranteed. If not, it is simple to sort the profiles by name and filter out duplicates or writing a Hadoop job that does such filtering. Now, we can enable storage.batch-loading
which significantly reduces the bulk loading time because Titan does not have to check for every added user whether the name already exists in the database.
Important: Enabling storage.batch-loading
requires the user to ensure that the loaded data is internally consistent and consistent with any data already in the graph. In particular, concurrent type creation can lead to severe data integrity issues when batch loading is enabled. Hence, we strongly encourage disabling automatic type creation by setting autotype = none
in the graph configuration.
Each newly added vertex or edge is assigned a unique id. Titan’s id pool manager acquires ids in blocks for a particular Titan instance. The id block acquisition process is expensive because it needs carefully guarantee unique assignment of blocks. Increasing ids.block-size
reduces the number of acquisitions but potentially leaves many ids unassigned and hence wasted. For transactional workloads the default block size is reasonable, but during bulk loading vertices and edges are added much more frequently and in rapid succession. Hence, it is generally advisable to increase the block size by a factor of 10 or more depending on the number of vertices to be added per machine.
Rule of thumb: Set ids.block-size
to the number of vertices you expect to add per Titan instance per hour.
Important: All Titan instances MUST be configured with the same value for ids.block-size
to ensure proper id allocation. Hence, be careful to shut down all Titan instances prior to changing this value.
When id blocks are frequently allocated by many Titan instances in parallel, allocation conflicts between instances will inevitably arise and slow down the allocation process. In addition, the increased write load due to bulk loading may further slow down the process to the point where Titan considers it failed and throws an exception. There are three configuration options that can be tuned to avoid this.
1) storage.idauthority-wait-time
configures the time in milliseconds the id pool manager waits for an id block application to be acknowledged by the storage backend. The shorter this time, the more likely it is that an application will fail on a congested storage cluster.
Rule of thumb: Set this to the sum of the 95th percentile read and write times measured on the storage backend cluster under load.
Important: This value should be the same across all Titan instances.
2) storage.idauthority-retries
configures the number of times the id pool manager attempts to acquire an id block before giving up and throwing an exception.
Rule of thumb: Increase this value. The only downside of increasing it is that Titan will try for a long time on an unavailable storage backend cluster.
3) ids.renew-timeout
configures the number of milliseconds Titan’s id pool manager will wait in total while attempting to acquire a new id block before failing.
Rule of thumb: Set this value to be larger than 2 x (storage.idauthority-retries) x (storage.idauthority-wait-time)
. The only downside of increasing it is that Titan will try for a long time on an unavailable storage backend cluster.
Titan buffers writes and executes them in small batches to reduce the number of requests against the storage backend. The size of these batches is controlled by storage.buffer-size
. When executing a lot of writes in a short period of time, it is possible that the storage backend can become overloaded with write requests. In that case, increasing storage.buffer-size
can avoid failure by increasing the number of writes per request and thereby lowering the number of requests.
However, increasing the buffer size increases the latency of the write request and its likelihood of failure. Hence, it is not advisable to increase this setting for transactional loads and one should carefully experiment with this setting during bulk loading.
During bulk loading, the load on the cluster typically increases making it more likely for read and write operations to fail (in particular if the buffer size is increased as described above).
storage.read-attempts
and storage.write-attempts
configure how many times Titan will attempt to execute a read or write operation against the storage backend before giving up. If it is expected that there is a high load on the backend during bulk loading, it is generally advisable to increase these configuration options.
storage.attempt-wait
specifies the number of milliseconds that Titan will wait before re-attempting a failed backend operation. A higher value can ensure that operation re-tries do not further increase the load on the backend.
Faunus is a Hadoop-based graph analytics engine that can operate on a Titan cluster. It can be used to pre-process and bulk load data into Titan. Faunus effectively parallelizes the loading process across multiple mappers. Since Faunus can use Titan as a sink and source it is a useful tool for incremental data loading into Titan.
For medium size graph datasets (up to 100s million edges), Blueprints’ BatchGraph is a useful tool for bulk loading data into Titan from a single machine through Titan’s native Blueprints interface. BatchGraph effectively caches externally provided vertex ids to eliminate reads against Titan. This allows bulk loading with minimal read load.
BatchGraph is limited to single machine bulk loading use cases and requires enough local RAM to hold the entire vertex id cache in memory. BatchGraph supports id compression to reduce the memory requirements. Please refer to the BatchGraph documentation for more information on how to use BatchGraph most effectively.
By parallelizing the bulk loading across multiple machines, the load time can be greatly reduced if Titan’s storage backend cluster is large enough to serve the additional requests. This is the approach Faunus takes to bulk loading data into Titan using Hadoop parallelization mechanisms.
If Faunus cannot be used for parallelizing the bulk loading process, here are some high level guidelines for effectively parallelizing the loading process:
- In some cases, the graph data can be decomposed into multiple disconnected subgraphs. Those subgraphs can be loaded independently in parallel across multiple machines (for instance, using BatchGraph as described above).
- If the graph cannot be decomposed, it is often beneficial to load in multiple steps where the last two steps can be parallelized across multiple machines:
- Make sure the vertex and edge data sets are de-duplicated and consistent.
- Set
batch.loading=true
. Possibly optimize additional configuration settings described above. - Add all the vertices with their properties to the graph (but no edges). Maintain a (distributed) map from vertex id (as defined by the loaded data) to Titan’s internal vertex id (i.e.
vertex.getId()
) which is a 64 bit long id. - Add all the edges using the map to look-up Titan’s vertex id and retrieving the vertices using that id.
Presorting the data to be bulk loaded can significantly increase the loading performance through BatchGraph. The BatchGraph documentation describes this strategy in more detail. It has been reported that loading times were decreased by a factor of 2 or more when presorting the bulk loaded data.
-
What should I do to avoid the following exception during batch-loading:
java.io.IOException: ID renewal thread on partition [X] did not complete in time.
?
This exception is mostly likely caused by repeated time-outs during the id allocation phase due to highly stressed storage backend. Refer to the section on ID Allocation Optimization above.