Reindexer - olivere/elastic GitHub Wiki
Reindexing is e.g. necessary when you want to apply changes to your mapping in existing data.
There are currently two solutions for Reindexing. A home-grown version that is specific to Elastic (and available in all recent versions of Elastic), and a native Reindex API that was introduced in Elasticsearch 2.3.0.
Reindexing with Elasticsearch Reindex API (2.3.0 or later)
The Reindex API in Elasticsearch 2.3.0+ is documented here. It is available in Elastic 3.0.29 or later.
Here's an example of using the Reindex API with Elastic:
elastic.v3
client, err := elastic.NewClient()
if err != nil { ... }
src := elastic.NewReindexSource().Index("source_index")
dst := elastic.NewReindexDestination().Index("target_index")
res, err := client.ReindexTask().Source(src).Destination(dst).Refresh(true).Do()
if err != nil { ... }
fmt.Printf("Reindexed a total of %d documents\n", res.Total)
elastic.v5+
client, err := elastic.NewClient()
if err != nil { ... }
src := elastic.NewReindexSource().Index("source_index")
dst := elastic.NewReindexDestination().Index("target_index")
res, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Do(context.Background())
if err != nil { ... }
fmt.Printf("Reindexed a total of %d documents\n", res.Total)
The tests in reindex_test.go
illustrate how to use the API in detail.
Reindexing with Elastic's Reindexer (pre-2.3.0)
Reindexing is e.g. necessary when you want to apply changes to your mapping in existing data. However, before 2.3.0, reindexing could only be done as a client-side operation. To make things easier for application programmers, Elastic contains a helper to reindex indices, even between clusters. You can still use this Reindexer, even with versions later than 2.3.0. The Elasticsearch documentation has a section about reindexing if you want more information.
Here's an example of the Reindexer with various options:
client, err := elastic.NewClient()
if err != nil { ... }
ix := client.Reindex("source", "target")
ix = ix.Progress(func(current, total int64) {
fmt.Printf("%d of %d\r", current, total)
})
result, err := ix.Do()
if err != nil { ... }
fmt.Printf("%d operations succeeded, %d failed", result.Success, result.Failed)
As you see in the example above, you can provide a progress callback function with Progress(func(int64,int64))
.
If you need more information about which bulk item requests exactly failed, use StatsOnly(false)
. The result will then contain all failed items in result.Errors
([]*elastic.BulkResponseItem
).
You can also specify the chunk size of bulk items sent to Elasticsearch with BulkSize(int)
. The default is 500.
Also, a scroll timeout can be specified with Scroll(string)
, e.g. Scroll("15m")
. The default is 5 minutes (5m).
Transforming data while reindexing
The Reindexer has become more versatile with this PR (thanks to @nwolff). In fact, copying data from a source index to a target index is just a special case of the more general Reindexer.
If you use the Reindexer class directly (not via client.Reindexer(...)
), you can pass a handler that will be called for each hit. Here's an example of how to use that to e.g. preserve the _ttl
of each hit:
// Carries over the source item's ttl to the reindexed item
copyWithTTL := func(hit *elastic.SearchHit, bulkService *elastic.BulkService) error {
source := make(map[string]interface{})
if err := json.Unmarshal(*hit.Source, &source); err != nil {
return err
}
req := elastic.NewBulkIndexRequest().Index(targetIndexName).Type(hit.Type).Id(hit.Id).Doc(source)
if ttl, ok := hit.Fields["_ttl"].(float64); ok {
req.Ttl(int64(ttl))
}
bulkService.Add(req)
return nil
}
r := NewReindexer(client, sourceIndexName, copyWithTTL).ScanFields("_source", "_ttl")
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
Moving data to a different cluster
You can also use the Reindexer to copy data from one cluster to another. You do so by providing a different client for the target. This code will use the sourceClient
to copy the sourceIndexName
to the targetIndexName
, using the targetClient
.
sourceClient, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
...
targetClient, err := elastic.NewClient(elastic.SetURL("http://localhost:8200"))
...
r := elastic.NewReindexer(sourceClient, sourceIndexName, elastic.CopyToTargetIndex(targetIndexName))
r = r.TargetClient(targetClient)
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
More examples are available in reindexer_test.go.