How to use (inner) JOIN and group by in apache spark SQL.
How to use (inner) JOIN and group by in spark SQL.
Problem statement: Get daily revenue by product with
CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet
file.
1. Load all the json file into spark DataSet using
sqlContext.
All the data files
can be found at GitHub.
scala> val
products =
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/products/*")
scala> val orders
=
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/orders/*")
scala> val
order_items =
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/order_items/*")
scala>
products.limit(2).show
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|product_category_id|product_description|product_id| product_image| product_name|product_price|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
| 2| | 1|http://images.acm...|Quest Q64 10
FT. ...| 59.98|
| 2| | 2|http://images.acm...|Under Armour
Men'...| 129.99|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
scala>
orders.limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id| order_date|order_id| order_status|
+-----------------+--------------------+--------+---------------+
| 11599|2013-07-25 00:00:...| 1| CLOSED|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+
scala>
order_items.limit(2).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
| 1| 1| 957| 299.98| 1| 299.98|
| 2| 2| 1073| 199.99| 1| 199.99|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
2. Join all the above DataFrame
First create column
values using DataFrame, which will be used in join condition.
val order_id =
orders.col("order_id")
val
order_item_order_id = order_items.col("order_item_order_id")
val
order_item_product_id = order_items.col("order_item_product_id")
val product_id =
products.col("product_id")
3. Create Closed and Complete order DataFrame using
columns created in 2 step.
val
closedAndCompletedOrders = orders.join(order_items,
order_id===order_item_order_id).
join(products,
order_item_product_id === product_id).
where("order_status='CLOSED'
or order_status='COMPLETE'" )
scala>
closedAndCompletedOrders.limit(2).show
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|order_customer_id|
order_date|order_id|order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|product_category_id|product_description|product_id| product_image| product_name|product_price|
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+
| 11599|2013-07-25 00:00:...| 1|
CLOSED| 1| 1| 957| 299.98| 1| 299.98| 43| | 957|http://images.acm...|Diamondback
Women...| 299.98|
| 8827|2013-07-25 00:00:...| 4|
CLOSED| 5| 4| 897| 24.99| 2| 49.98| 40| | 897|http://images.acm...|Team Golf New
Eng...| 24.99|
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+
4. Group the data based on
"order_date","product_name", "order_status" to get the sum of all the order_item_subtotal
scala> val
finalOutput =
closedAndCompletedOrders.groupBy("order_date","product_name",
"order_status").sum("order_item_subtotal")
5. Check the final output. Using above approach we can
find total revenue based on order date, product name and order status.
scala>
finalOutput.show
+--------------------+--------------------+------------+------------------------+
| order_date|
product_name|order_status|sum(order_item_subtotal)|
+--------------------+--------------------+------------+------------------------+
|2013-07-27
00:00:...|Team Golf San Fra...|
COMPLETE| 74.97|
|2013-08-01
00:00:...|Nike Men's Dri-FI...|
COMPLETE| 4500.0|
|2013-08-05
00:00:...|Glove It Women's ...|
COMPLETE| 59.97|
|2013-08-06
00:00:...|adidas Men's F10 ...|
COMPLETE| 299.95|
6. Save the data to a text file.
When we try to save
the DataFrame diirectly to a text file it throws error.
scala>
finalOutput.save("file:///home/cloudera/workspace/scala/orderRevenue/")
org.apache.spark.sql.AnalysisException:
Attribute name "sum(order_item_subtotal)" contains invalid
character(s) among " ,;{}()\n\t=". Please use alias to rename
it. ;
The column
sum(order_item_subtotal) is having a special char ("()") which needs
to be renamed.
Lets rename the
columns and save the output in a new DataFrame.
scala> val
saveToFile =
finalOutput.select($"order_date",$"product_name",$"order_status",
$"sum(order_item_subtotal)".alias("sum_subtotal"))
saveToFile:
org.apache.spark.sql.DataFrame = [order_date: string, product_name: string,
order_status: string, sum_subtotal: double]
Save the DataFrame
to a text file by converting DataFrame into a RDD.
scala>
saveToFile.rdd.saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/text")
scala>
saveToFile.saveAsParquetFile("file:///home/cloudera/workspace/scala/orderRevenue/parguet/")
scala>
saveToFile.rdd.map(x=>(x(0) + "," +
x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/csv/")
scala>
saveToFile.rdd.map(x=>(x(0) + "\t" +
x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/tsv/")
[root@quickstart
orderRevenue]# pwd
/home/cloudera/workspace/scala/orderRevenue
[root@quickstart
orderRevenue]# ls
csv parguet
text tsv
[root@quickstart
orderRevenue]# tail csv/part-00000
2014-07-02
00:00:00.0,Nike Men's Dri-FIT Victory Golf Polo
2014-07-05
00:00:00.0,Team Golf New England Patriots Putter Grip
2014-07-13
00:00:00.0,Field & Stream Sportsman 16 Gun Fire Safe
[root@quickstart
orderRevenue]# tail text/part-00000
[2014-07-02
00:00:00.0,Nike Men's Dri-FIT Victory Golf Polo,CLOSED,900.0]
[2014-07-05
00:00:00.0,Team Golf New England Patriots Putter Grip,COMPLETE,124.95]
[2014-07-13
00:00:00.0,Field & Stream Sportsman 16 Gun Fire Safe,COMPLETE,4799.76]
Comments
Post a Comment