Copying Millions of Files Between S3 Buckets Without Managing Servers

Dan | Oct 16, 2020 min read

Issue: I need to copy thousands or millions of files between S3 buckets. This might be because you regularly move data between a test and a production environments or your data architecture has changed and the new bucket names make more sense. Whether you are doing it once or regularly, there are multiple options for moving a large number of files between buckets, and the choice might come down to your familiarity with different AWS computing services, such as Lambda, EC2, and EMR.

Solution: If there is no need to change the S3 key, I would recommend using EMR (Elastic Map Reduce) to copy a large number of files because the large number of objects can be distributed across multiple worker nodes. If you are familiar with java, you could also use EMR to change the s3 key during the copying operation. However, if you are more comfortable with scripting languages, like Python and Javascript, you can use a single lambda function and leverage concurrency to scale up to as much as 1,000 functions running in parallel. The weakness in this second option is that you need to send a list of s3 objects to the lambda function; you will either need to locally list the objects, which can be time consuming, or prepare a manifest file, which is an added step.

Here I provide a simple command-line script that leverages EMR to accomplish this task without having to worry about managing the server running the distributed computing.

EMR provides a helpful tool, s3-dist-cp, that executes the listing, copying, and deleting of all files in a single S3 path in one command:

s3-dist-cp --src args.src --dest args.dest --deleteOnSuccess

EMR provides ‘job flows’ as a serverless option that will manage start a cluster, run your code, and terminate the cluster for you. This means you will only pay for the time the cluster is running and you do not need to worry about managing the underlying computing resources.

Using the Python SDK, we can write a simple script that accepts the source and destination S3 path and the profile you would like to use for permissions. This script is adapted from a great blog post.

I have adapted the code to be more accessible to anyone. This code should work without adjustment because: * I did not prepare a custom JAR file, which is a compressed java file that contains the instructions for running java code (JAR stands for “Java ARchive”). I am using a JAR file that comes pre-installed on all EMR instances called ‘command-runner.jar’. This JAR file contains a handful of common commands that users might execute on an EMR cluster, including S3DistCp. * I have added the ‘–deleteOnSuccess’ option, so that all the files are deleted from the source bucket after they are successfully copied. * I removed other parameters from the run_job_flow() function that are specific to a user, such as the ec2 key pair and the subnet ID.

"""
Script to run an EMR job to copy s3 files with a given prefix between buckets.

Adapted from: https://github.com/Kulasangar/CopyFile-Between-S3Buckets
Blog Post: https://medium.com/@kulasangar/create-an-emr-cluster-and-submit-a-job-using-boto3-c34134ef68a0
"""


import boto3
import argparse

copy_args = argparse.ArgumentParser(prog='copy',
                                    description='requires user inputs to run s3 batch operations')

copy_args.add_argument('--src', action='store', required=True,
                       help='source path')
copy_args.add_argument('--dest', action='store', required=True,
                       help='destination path')
copy_args.add_argument('--profile', action='store', required=True,
                       help='aws profile')
copy_args.add_argument('--name', action='store', required=False,
                       help='job name')
copy_args.add_argument('--worker_nodes', action='store', required=False,
                       help='the number of worker nodes to distribute the work across')
copy_args.add_argument('--log_uri', action='store', required=False,
                       help='log URI to store logs')

args = copy_args.parse_args()

session = boto3.Session(profile_name=args.profile, region_name='us-east-1')

if not args.name:
  args.name = 'copy_job'
  
if not args.worker_nodes:
  worker_nodes = 5

emr_client = session.client('emr')

cluster_id = emr_client.run_job_flow(
    Name=args.name,
    LogUri=args.log_uri,
    ReleaseLabel='emr-5.30.1',
    Applications=[
        {'Name': 'Hadoop'},
        {'Name': 'Hive'},
        {'Name': 'Hue'},
        {'Name': 'Mahout'},
        {'Name': 'Pig'},
        {'Name': 'Tez'},
    ],
    Instances={
        'InstanceGroups': [
            {
                'Name': "Main nodes",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': "Worker nodes",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': worker_nodes,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    },
    Steps=[
        {
            'Name': 'file-copy-step',
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                        'Jar':
                            'command-runner.jar',
                        'Args': [
                            's3-dist-cp',
                            '--src',
                            args.src,
                            '--dest',
                            args.dest,
                            '--deleteOnSuccess'
                        ]
                    }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
)

print ('cluster created with the step...', cluster_id['JobFlowId'])