How to Migrate Data from MongoDB to Elasticsearch in Python
At an old job, we wanted to migrate our Mongo backend to Elasticsearch. My first task was to find a way to do this elegantly.
I decided to use Mongo and Elasticsearch’s Python clients to achieve this.
First, we have to initialize our connection with Mongo and Elasticsearch.
from pymongo import MongoClient
from elasticsearch import Elasticsearch
import os
# Mongo Config
client = MongoClient(os.environ['MONGO_URI'])
db = client[os.environ['MONGO_DB']]
collection = db[os.environ['MONGO_COLLECTION']]
# Elasticsearch Config
es_host = os.environ['ELASTICSEARCH_URI']
es = Elasticsearch([es_host])
es_index = os.environ['ELASTICSEARCH_INDEX']
We can then create this migrate()
function that uses Elasticsearch’s helpers
API.
We iterate through the collection documents and add them to this actions
list.
The key-value pairs in each document will be dumped into Elasticsearch’s _source
field.
The _id
of each document needs to be removed and moved to Elasticsearch’s _id
field to avoid _id
conflicts.
from elasticsearch import helpers
import json
def migrate():
res = collection.find()
# number of docs to migrate
num_docs = 2000
actions = []
for i in range(num_docs):
doc = res[i]
mongo_id = doc['_id']
doc.pop('_id', None)
actions.append({
"_index": es_index,
"_id": mongo_id,
"_source": json.dumps(doc)
})
helpers.bulk(es, actions)
helpers.bulk()
is what will perform the bulk store into Elasticsearch.
If any of the documents in Mongo contain an unserializable object like datetime
, we can use a default converter in json.dumps()
.
import datetime
json.dumps(doc, default = defaultconverter)
def defaultconverter(o):
if isinstance(o, datetime):
return o.__str__()