How to Migrate Data from MongoDB to Elasticsearch in Python


At an old job, we wanted to migrate our Mongo backend to Elasticsearch. My first task was to find a way to do this elegantly.

I decided to use Mongo and Elasticsearch’s Python clients to achieve this.

First, we have to initialize our connection with Mongo and Elasticsearch.

from pymongo import MongoClient
from elasticsearch import Elasticsearch
import os

# Mongo Config
client = MongoClient(os.environ['MONGO_URI'])
db = client[os.environ['MONGO_DB']]
collection = db[os.environ['MONGO_COLLECTION']]

# Elasticsearch Config
es_host = os.environ['ELASTICSEARCH_URI']
es = Elasticsearch([es_host])
es_index = os.environ['ELASTICSEARCH_INDEX']

We can then create this migrate() function that uses Elasticsearch’s helpers API.

We iterate through the collection documents and add them to this actions list.

The key-value pairs in each document will be dumped into Elasticsearch’s _source field.

The _id of each document needs to be removed and moved to Elasticsearch’s _id field to avoid _id conflicts.

from elasticsearch import helpers
import json

def migrate():
  res = collection.find()
  # number of docs to migrate
  num_docs = 2000
  actions = []
  for i in range(num_docs):
      doc = res[i]
      mongo_id = doc['_id']
      doc.pop('_id', None)
      actions.append({
          "_index": es_index,
          "_id": mongo_id,
          "_source": json.dumps(doc)
      })
  helpers.bulk(es, actions)

helpers.bulk() is what will perform the bulk store into Elasticsearch.

If any of the documents in Mongo contain an unserializable object like datetime, we can use a default converter in json.dumps().

import datetime
json.dumps(doc, default = defaultconverter)
def defaultconverter(o):
  if isinstance(o, datetime):
    return o.__str__()