How to Paginate/Scroll Elasticsearch Data using Python
I’m pretty new to Elasticsearch’s Python client, so I ran across what seemed to be a simple error.
I ran a simple match all query like the one below.
body = {"query": {"match_all": {}}}
res = es.search(index='test', body=body)
But the response boggled my mind.
Issue: Search API Defaults
The correct number of hits could be found in res['hits']['total']
(around 5000
hits).
But, res['hits']['hits']
only returned 10
documents.
This isn’t actually an issue at all, but rather something I just missed in the documentation.
By default, the search API returns the top 10 matching documents.
How could I access all my data in Elasticsearch?
Solution: Scrolling
We want to paginate through our dataset.
In our search
call, we can specify a scroll
and size
input.
es.search(index='test', body=body, scroll='2m', size=50)
For instance, in the above request, we want each request to return 50
documents until we have no more data to cycle through, as specified by the size
parameter.
We also want to keep the search context alive for 2m
, or 2 minutes. This value for scroll
needs to be long enough for us to process the previous batch of results. It’s essentially an expiry time for our data.
Now, let’s see how we can use this in a generic function to scroll through our results.
def scroll(es, index, body, scroll, size, **kw):
page = es.search(index=index, body=body, scroll=scroll, size=size, **kw)
scroll_id = page['_scroll_id']
hits = page['hits']['hits']
while len(hits):
yield hits
page = es.scroll(scroll_id=scroll_id, scroll=scroll)
scroll_id = page['_scroll_id']
hits = page['hits']['hits']
Here, we use a generator to stream our hits
to disk in small batches.
Each response page
contains a scroll_id
, which we use to paginate through the results.
Lastly, we use **kw
in order to pass an arbitrary number of keyword arguments into scroll()
. This allows us to use parameters other than scroll
and size
in the es.search()
function.
Example Usage
Let’s use the same match all query from earlier.
body = {"query": {"match_all": {}}}
Suppose we want to pull all the query results, but we want them in chunks of 20
entries. This would require size=20
in scroll()
.
for hits in scroll(es, 'test', body, '2m', 20):
# Do something with hits here
print(json.dumps(hits, indent=4))
Suppose now we only want the first 100
entries from the previous example. This would be 5
chunks of 20
entries, which means we want our generator to iterate 5
times.
We can use zip(generator(), range())
to achieve this.
zip()
returns an iterator. The iteration ends once the shortest input is exhausted, so range(i)
limits our generator()
to i
iterations.
for hits, i in zip(scroll(es, 'test', body, '2m', 20), range(5)):
# Do something with hits here
print(i, json.dumps(hits, indent=4))
If we want to pull all the query results into one list, we can simply use a nested loop. We can experiment with the size
parameter to see what works best in different scenarios.
res = []
for hits in scroll(es, 'test', body, '2m', 20):
for hit in hits:
res.append(hit)
Reason: Why do we do this?
Why do we paginate instead of pulling all our records at once?
It’s in our best interest to pull smaller batches of data when we’re working with a large dataset.
Each request we make to the Elasticsearch server allocates a heap of the same size before fetching the data. As a result, requests won’t scale linearly like we’d expect, and we may encounter out of memory (OOM) issues.
Because of this scaling issue, it’s advised to request smaller amounts of records and paginate (scroll) through the responses, treating the data as a stream.