Exploring ETL Options for Compaction on AWS

Dan | Feb 20, 2021 min read

I have been scraping millions of court records for the past year. These records are scraped and saved as individual JSON files in Amazon S3, a cheap and durable storage solution on AWS. These records are then catalogued in AWS Glue so analysts and data scientists can analyze the data using SQL queries in AWS Athena. We also plan to build web applications using this data, such as dashboards visualizing aggregations updated on a weekly basis.

This solution worked for a number of months as we began to explore the data, but as the project expanded, we started to see issues with this architecture. Some of the tables in the data lake contain over a million files and contain upwards of 30 gigabytes of data. This leads to long query times, as long as ten minutes for simple queries because the distributed query engine underlying AWS Athena incurs overhead in reading so many small files. Running regular jobs to aggregate this data would be both time-intensive and increase costs because the raw JSON data would be scanned multiple times for each downstream application.

In an ideal scenario, we would run scheduled jobs to compress the JSON data into a smaller number of larger files in columnar format to ultimately reduce query times and cost.

Requirements:

  • Works for frequently updated objects: We often re-scrape files and they overwrite the previous version in S3. Any solution would need to ensure we did not end up with duplicates in the compacted data, and the de-duplication step should not be laborous or expensive.
  • Low-cost: We currently spend less than $20 bucks a day on AWS; we do not want to break the bank on this compaction job. Many of the solutions we will explore would have increased our bill substantially.
  • Easy to maintain: We are a small shop and value simplicity over almost everything else; minimizing the number of services and future maintenance requirements is critical.
  • Scalable: With each new data ingestion pipeline, we need to ensure our data engineering workflows will readily scale to identify new tables when they become available without engineers updating the service.

Possible Solutions

There are a number of potential solutions for this simple compaction problem. We could buffer the files upstream so they are compacted before they are written to S3, using a streaming solution such as AWS Kinesis or Apache Kafka. Maintaining this infrastructure would be expensive relative to our data size (less than 200 GB at this time) and would incur additional technical debt to rework our architecture. Our other options leverage distributed processing to compact the data on a regular basis; the main AWS services for this are Glue (open-source software alternative: Airflow/Spark), EMR (oss alternative: Spark), and Athena (oss alternative: Spark SQL, Apache Hive, Presto).

AWS recommends Glue as their all-purpose ETL service because it is entirely managed and optimized for quick start-up. In comparison, EMR is not fully managed, so I would be required to manage some of the underlying cluster technology and Athena is largely marketed as an ad hoc interactive querying service. I created a glue job that compressed JSON to parquet files and could leverage the Bookmarks feature to only process new files during each run. This worked for a handful of Glue tables, but then the errors started to creep up, and, in traditional AWS fashion, they were not very helpful.

It started with org.apache.hadoop.mapred.InvalidInputException: Input path does not exist for a partition. I investigated the issue, determining that there was no S3 object with that input path, nor did the table include a partition for the S3 path. This error would reliably occur for the same tables, and the tables did not appear to have any commonalities in how they were configured. Ultimately, deleting the tables and running the crawlers to populate the table again fixed the problem. I still do not know what the exact problem was, but it must have had to do with how the tables were configured.

Then I began receiving errors about the output parquet format. PySpark was unable to write the DataFrames to file because some fields contained null values, such as empty arrays. In some cases, the field was a null array in every file, so they could be dropped and adjusted upstream to remove them. I edited the Spark script to drop these fields before writing out and it worked fine. But other fields were sparsely populated; Spark inferred they were null but some records did include values and this needed to be preserved. Additionally, dropping these fields would have created different table schema for the JSON and the parquet tables, which would make it more difficult to deduplicate the data because I would no longer be able to easily join the DataFrames from the JSON and the parquet table to ensure only the most recent version of each file was included.

As I was working through this problem, I started to notice the runtime for these Spark jobs. For a table containing about 225,000 objects, each of which is less than 50 kilobytes, the Glue Job took about 20 minutes. For a table of 350,000 objects, the job took 40 minutes. This was getting way too expensive; we could not afford to run these compaction every week. Even if Glue provided the perfect solution on the other fronts, which it had not yet, this did not seem like it would scale well to much larger tables. If I could make the successful jobs run quicker, I could hacked this solution together, but I would need to figure out why it was so slow.

Hadoop’s Small Files Problem

I started to google why Spark was having such long runtimes and I discovered Hadoop’s Small File Problem. Spark is notoriously slow in processing a large number of small files because S3 is not a true filesystem leading to list operations to take a long time and the associated metadata grows incredibly large with small files. Data architecture’s built around small files, like our court records, are a classic scenario were Spark would struggle because a small number of files are incrementally added. I did not want to implement a streaming compaction solution upstream, so this realization forced me to reassess the problem and consider solutions that did not use Spark.

Rolling My Own Metadata Index

I briefly considered building my own metadata index for all files in the data lake using DynamoDB. This index seems like a common feature in many data lake platforms but we had not yet explored it for our architecture. This index would record when files are added, updated, or deleted in the data lake, including the s3 object key and when the object was last modified. This timestamp would allow us to run jobs periodically to compact all files added or updated since the last job run. These jobs might run daily, where the number of new objects in a table would usually be less than 2,000. Compacting up to 2,000 JSON files seemed suitable for EC2, so we might be able to avoid the costs associated with running EMR clusters on a regular basis. While this would technically work, it would not be as simple and would incur more maintenance costs.

Leading with Simplicity

Before diving into the metadata index solution, I looked about all the full list of potential solutions. Athena still stuck out as a potential solution that would be address all my requirements. Athena is cheap, $5 dollars per terabyte scanned would cost of less than a dollar per run. Using Athena queries as the ETL mechanism would be easy to change as requirements changed. Athena does not track when objects are processed like Glue Bookmarks, but at the price point, we could afford to process all objects every time, thus eliminating the technical headache of deduplicating rows in the compacted data. While there are not many good examples of using Athena in an operational ETL capacity, I decided to use it.

The only consideration is S3 throttling; some Athena queries fail with the error saying “unable to communicate with downstream services”, which suggests that Athena is attempting to read too many objects in a short period of time. We would need to run the queries one at a time. By running this series of Athena queries at night or over the weekend, I could reduce the possibility that someone would be using Athena at the same time and potentially interfere with the job’s success.

Athena makes it easy to save the output of a SQL query to S3 in a compressed, column format using the CTAS query where you can specify the location, partitioning, and ideal file size of the output. We still are unable to use Parquet for the output format because of the null fields, but we can use ORC. In my initial tests, ORC accomplishs 90% reduction in dataset sizes and decreased some queries from 7 minutes to 10 seconds.

I could also run Presto on EMR if I wanted greater control over the underlying technology or if I wanted to orchestrate additional steps beyond the Presto query and saving out. For this workload, the SQL query and saving to ORC is sufficient because this compacted dataset will then be fanned out to a variety of downstream applications, we will push that work to the maintainers for the downstream applications.

Runtime: Presto vs Hadoop

But one question bothered me: why is Athena, running the Presto query engine, able to query hundreds of thousands of objects in a matter of minutes while Glue, running Spark, takes so much longer? Both are distributed processing systems that use a coordinator node to distribute tasks to worker nodes, which process chunks of the data and return the result to the main, which combines the outputs and returns it to the user. Many of the answers online indicate that Spark is a more general-purpose ETL tool that includes both Spark SQL queries, streaming, and machine learning capabilities, while Presto is more limited to just SQL. This does not explain why Presto is quicker than Spark SQL. Spark and Hadoop are tightly linked to the HDFS file system, whereas Presto is simply a query engine that is not linked to a particular data storage system. One blog post suggests that Presto’s “push” model allows for the SQL query to be executed concurrently across multiple stages, leading to faster queries than MapReduce’s “pull” model. Despite this advantage, Presto lacks the fault tolerance that makes Glue and Spark ideal for orchestrating ETL jobs.