Data Engineering

Spark SQL Aggregations – Gotcha!


A few months ago we were writing a Spark job to process AWS billing data.

The idea was that every day we’d automatically spin up an Amazon EMR cluster which would do a whole bunch of ETL on the latest AWS billing dataset and write it back out to S3 for ingestion into our Amazon Redshift cluster.

Spark SQL was the logical choice here as the daily dataset was generally about 50 million rows in size spread over a large number of gzipped archives in S3 buckets.

confusionThe job would categorise usage, generate some statistics, clean up some fields, drop some others and most importantly join each billing data row with other data sets from various internal datasources such as our internal CRM.

There were a number of parts to this job that aren’t really that interesting (at least in this post) but specifically, one of the subtasks was to create a summary row and append it to the original dataset.

And it is here that I became unstuck…

A simple task unravels…

Below is a sample of the data – four timestamp columns and a numeric one for the cost (note I’ve removed a whole bunch of columns for simplicity):

| BillingStartDate   | BillingEndDate     | UsageStartDate     | UsageEndDate       |UnblendedCost|
|2015-09-01 00:00:...|2015-09-30 23:59:...|2015-09-01 00:00:...|2015-09-30 23:59:...| 1077.622948 |
|2015-03-01 00:00:...|2015-03-31 23:59:...|2015-03-01 00:00:...|2015-03-31 23:59:...| 3294.874    |
|2015-03-01 00:00:...|2015-03-31 23:59:...|2015-03-01 00:00:...|2015-03-31 23:59:...| 2373.442    |
|2015-04-01 00:00:...|2015-04-30 23:59:...|2015-04-01 00:00:...|2015-04-30 23:59:...| 3188.59     |
|2015-04-01 00:00:...|2015-04-30 23:59:...|2015-04-01 00:00:...|2015-04-19 00:00:...| 1379.978    |

The task was to add a single summary row consisting of the minimum start dates, maximum end dates and the total cost. Pretty straight forward you’d think as aggregates in Spark SQL are performed by constructing a mapping between column names and aggregate function names and passing it to the agg method of the data frame as shown below:

1  // load raw billing data
2  var billingDF = ...
3  // define aggregate functions for summary row
4  val aggregates = Map("BillingStartDate" -> "min", 
5                     "BillingEndDate" -> "max", 
6                     "UsageStartDate" -> "min", 
7                     "UsageEndDate" -> "max", 
8                     "UnblendedCost" -> "sum")
9  // calculate summary
10 val summaryDF = billingDF.agg(aggregates)
11    .toDF("BillingStartDate", "BillingEndDate", 
12          "UsageStartDate", "UsageEndDate", 
13          "UnblendedCost")
15  // add back to original
16  val finalDF = billingDF.unionAll(summaryDF)

The results, however were somewhat uninspiring….

org.apache.spark.sql.AnalysisException: unresolved operator 'Union;
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
 at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:203)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)


Obsessive diagnosis…

This one stopped me dead for a few days. It was one of those times when I wished I’d stuck with civil engineering at uni all those years ago instead of transferring into comp sci.

sydneyOnly people who’ve worked deeply with code know the soul-less emptiness of the unresolved exception and the affect it has on their nearest and dearest who can’t engage them in even simple conversation for literally days while they track it down…

I’ve seen many esoteric stack traces over the last 20 years but I think we have a new winner here – with all due respect to the Spark crowd, “unresolved operator ‘Union'” is almost guaranteed  to send you down the wrong path. Then again, at least you know the unionAll call is where things are going astray.

Now as with ANSI SQL, Spark SQL’s unionAll operator requires that the schema of both data frames match exactly – this means both in name (which is case-sensitive) and in data type. So these were the obvious things to check first.

When Spark SQL performs an aggregation operation on a data frame it generates new columns naming them using an operation(columnName) format. So in this case for example, the summaryDF data frame on line 10 would contain columns called things like “sum(UnblendedCost)” and “min(BillingStartDate)”.

For the unionAll to work, those columns have to renamed. This is done using the toDF method on the results of the call to the agg method in lines 11-13 above.

Many unpleasant hours later it occurred to me to go back and look at the raw aggregate data frame before I renamed it all and check the aggregate map itself:

scala> billingDF.agg(aggregates).show()
|  sum(UnblendedCost)| min(UsageStartDate)|   max(UsageEndDate)| max(BillingEndDate)|min(BillingStartDate)|
|1.2648269541634992E7|2013-06-01 00:00:...|2018-07-03 12:56:...|2016-12-31 23:59:...| 2013-06-01 00:00:...|
scala> aggregates.keys.foreach(println)

And boom. There it was.

A solution is found…

When you use a map for aggregations in SparkSQL the resulting data frame will have columns in the default traversal order of the keys in the map instance itself. In the case of a  scala.collection.Map, the default implementation is a HashMap and so the final order will be determined by the hash function as applied to the column names.

In addition I’d also renamed the columns after the aggregation in the order I expected them to be in (i.e the same order as the Map instantiation on lines 4-8) showing once again that our brains often help us to see what we want to see rather than what’s actually there.

So the names where right but the data types were now wrong (the BillingStartDate column now held the sum of the UnblendedCost column).

The solution was to simply to change line 4 to use a scala.collection.immutable.ListMap for the aggregation definition which maintains insertion order as the keys are backed by a linked list.

It wasn’t until I’d solved this that I noticed that in addition to the unionAll failing, the re-ordering/renaming had also mixed up the min and max timestamps for the billing and usage columns.

That would have been, um…very hard to find.

So here’s the summary data frame using a standard Map instance:

|    BillingStartDate|      BillingEndDate|      UsageStartDate|        UsageEndDate|       UnblendedCost|
|1.2648269541634992E7|2013-06-01 00:00:...|2018-07-03 12:56:...|2016-12-31 23:59:...|2013-06-01 00:00:...|

And here it is after swapping out the Map for a ListMap:

|    BillingStartDate|      BillingEndDate|      UsageStartDate|        UsageEndDate|       UnblendedCost|
|2013-06-01 00:00:...|2016-12-31 23:59:...|2013-06-01 00:00:...|2018-07-03 12:56:...|1.2648269541634992E7|


Using a standard map used a hash ordering of the column names to produce the final aggregate results.

Even if this had worked originally (i.e the hash order turned out to match the expected order) someone else could easily have changed a column name earlier in the code base at a later stage causing the whole job to collapse in a screaming heap hours into the ETL process. Using a ListMap avoids this issue entirely by maintaining key insertion order.

Well, there it is. Like many tech problems they seem obvious in retrospect and you can’t help but feel a bit sheepish that you missed it. But it’s on these harsh lessons that real world experience is built.

It won’t happen again though. I’ll tell you that for nothing….

Categories: Data Engineering

Tagged as: , , , ,

Leave a Reply