MongoDB backups at scale - how one Ops guy got his "daily" backup job to run in under 24 hours.

There's more than one way to deal with dumping a lot of databases with slow tools.

Awhile back, I wrote another article outlining how my organization handles cluster-wide backup and restore of our MongoDB clusters. The tool/methodology we use is great for quickly backing up, and more importantly, quickly restoring an entire multi-terabyte cluster. It is quick and reliable enough that we're able to restore our production data into our staging and dev environments on a weekly basis without any human intervention. This type of restore takes just over an hour for our ~3tb sharded cluster, or about the same amount of time for our ~350gb unsharded cluster. However, that method has two very serious drawbacks - you can only restore a cluster in its entirety, and you have to take the cluster offline to do the restore. The main purpose of having this system in place is in case of a true disaster scenario, where we lose enough of our production cluster that we can safely assume it is offline anyway...like North Virginia getting hit by a meteor. What if you need something more targeted?

One possible scenario is that a developer ran something that they shouldn't have against production data. Yes, this shouldn't ever happen, but the point of backups isn't just to guard against likely scenarios. It might be that you need a dataset from a previous date, for a specific database or collection. I'm sure you can think of others - the point is, you want to be able to restore an individual database or collection:

a) without also having to restore the cluster in its entirety, and
b) while the cluster remains operational.

To state the obvious (for those who are familiar with MongoDB tooling, anyway) - enter mongodump and mongorestore. This article focuses primarily on how we backup a large number of databases using the mongodump utility.

My organization's journey with Mongodump started with a simple shell script. All it did was:

  • Talk to MongoS or MongoD and get a list of databases (we have both sharded and non-sharded clusters)
  • Remove a set of arbitrary DBs from that list (there are some databases that we didn't want backed up)
  • Loop through each database, and for each one:
    • mongodump the database
    • tar and compress the dumped files
    • copy them to S3

This had some distinct limitations. For one, each cluster was a separate script that had to be run. It was also difficult to achieve concurrency - our script predated the introduction of the --numParallelCollections flag for the mongodump 3.2 utility. Even after that became available, we had problems using it with our sharded environments due to cursor timeouts - which caused our backups to fail on certain DBs that had particularly large collections. Essentially, we'd open cursors against several different collections, which are spread out over several shards apiece. We would then chew away at the data on a single shard for long enough that the (idle) cursors opened against other shards would timeout and close, causing the dump to fail when it tried to use that cursor again. I was eventually tasked with finding a solution that we could scale without the use of numParallelCollections.

My first attempt resulted in a ~300 line python script. I wrote a nice class based on the multiprocessing module. It did effectively the same thing that the original shell script did, only it leveraged asynchronous multiprocessing to run our backups using multiple processes dumping one database apiece, thereby solving the problem we had been encountering due to cursor timeouts when dumping multiple sharded collections. It also had much nicer log output and could dump all our environments with a single job. However, this again had a big drawback - some of our databases take so long to dump, due to enormous size (hundreds of gigabytes), that even with powerful hardware, our "daily" database backups could end up taking 30 hours or more...and of course, we're adding new databases all the time. We needed a way to scale horizontally!

Enter Celery and RabbitMQ.

Our organization uses Celery and either SQS or RabbitMQ (we're migrating from a mix of both to using rabbit for everything) to handle a lot of tasks already, so I had devs available who already knew how to use them. I got one of them to help me convert my python class into something that could be used by a celery worker. The new solution:

  • A python class that is used to talk to MongoS/MongoD and generate a list of databases to be dumped. This is actually run inside a docker container by Jenkins - on a slave which has a MongoS available.
  • This class then takes that list and, for each member of it, creates a message in a queue on our RabbitMQ broker
  • The queue size is reported to AWS Cloudwatch as a metric
  • A Cloudwatch metric alarm changes its state to ALARM when the queue size is > 0, and changes to OK when the queue is empty
  • When the Cloudwatch alarm is in the ALARM state, it triggers an autoscaling action which brings up worker instances. We set the ASG maximum to the number of worker instances we want as if we don't, EC2 will continue bringing up worker instances!
  • These worker instances (m1.xLarge in this case) are preconfigured with Celery and SupervisorD. Supervisor controls the worker processes.
  • The worker instances come up and, using a python script, call our Ansible Tower. Tower adds them to an inventory and a group and then launches a job to provision them based on a Tower job template. Which inventory, group, and job template are specified as arguments in the Launch Configuration User Data used by our Autoscaling Group.
  • The job template launches an ansible playbook with select tags against the worker instances. This playbook has several jobs:
    • Because we're using an instance (m1.xLarge) which uses ephemeral storage, the playbook builds a raid array out of its 4 disks. We need a fair bit of space to dump some of our larger databases to disk and compress them before uploading to S3.
    • We ensure that the celery worker configs are up to date with the latest master
    • We ensure that MongoS is pointed at the correct environment and running
    • We ensure that the SupervisorD config is up to date and that all workers are running
  • Once the instances have been launched, they reach out to RabbitMQ and begin consuming the queue of databases using essentially the same python code I wrote for use with the multiprocessing module...only without the multiprocessing. These database backups range in duration from less than 10 seconds to ~18 hours.
  • As each one completes, the celery worker compresses the dumped DB and then uploads it to S3. The bucket is replicated to another region for redundancy. The dump and archive are then erased from disk, and the worker gets the next message in the queue
  • SumoLogic collects specific patterns from the log output. All exit codes and the durations of the dumps are logged and fed into Sumo. That way, we can graph how long any database(s) take to dump, and any non-zero exit code triggers an email so that we know if one fails.
  • Once the last database has finished being uploaded to S3, the queue is empty and the Cloudwatch alarm will change to OK. This triggers an Autoscaling action which terminates the worker instances.

This system has been working well for us. If we need to increase our backup capacity due to newly added databases or the gradual size increase that inevitably happens as we add more data, it's easy - we simply increase the number of worker instances that the ASG allows, or alternatively, change the instance type to something with additional cores and memory and alter the number of celery workers per instance.

Areas for improvement

There are a few ways in which our current solution could be made better. The autoscaling logic is very basic. For instance:

A common scenario for us is that our largest database, which is several hundred gigabytes in size, will run the entire duration of our backup job. It is the first job to start and the last one to finish. This means that we could have a lot of spare compute capacity sitting there doing nothing while 1 worker process chews away at that large database. This could be improved by adding additional autoscaling logic to scale down instances that are not under load. I think the proper solution to this problem is to run each worker process in its own ECS container so that there is no wasted capacity. I haven't revisited this yet as the cost of the instances simply isn't enough to warrant the time, but I'd welcome thoughts on this problem (or any other element of this system that you think you could improve!)