How to use (inner) JOIN and group by in apache spark SQL.

How to use (inner) JOIN and group by in spark SQL.

Problem statement: Get daily revenue by product with CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet file.

1. Load all the json file into spark DataSet using sqlContext.
All the data files can be found at GitHub.

scala> val products = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/products/*")

scala> val orders = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/orders/*")

scala> val order_items = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/order_items/*")


scala> products.limit(2).show
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|product_category_id|product_description|product_id|       product_image|        product_name|product_price|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|                  2|                   |         1|http://images.acm...|Quest Q64 10 FT. ...|        59.98|
|                  2|                   |         2|http://images.acm...|Under Armour Men'...|       129.99|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+


scala> orders.limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+


scala> order_items.limit(2).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|                  1|             199.99|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+

2. Join all the above DataFrame

First create column values using DataFrame, which will be used in join condition.

val order_id = orders.col("order_id")
val order_item_order_id = order_items.col("order_item_order_id")
val order_item_product_id = order_items.col("order_item_product_id")
val product_id = products.col("product_id")

3. Create Closed and Complete order DataFrame using columns created in 2 step.

val closedAndCompletedOrders = orders.join(order_items, order_id===order_item_order_id).
join(products, order_item_product_id === product_id).
where("order_status='CLOSED' or order_status='COMPLETE'" )

scala> closedAndCompletedOrders.limit(2).show
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|order_customer_id|          order_date|order_id|order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|product_category_id|product_description|product_id|       product_image|        product_name|product_price|
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|            11599|2013-07-25 00:00:...|       1|      CLOSED|            1|                  1|                  957|                  299.98|                  1|             299.98|                 43|                   |       957|http://images.acm...|Diamondback Women...|       299.98|
|             8827|2013-07-25 00:00:...|       4|      CLOSED|            5|                  4|                  897|                   24.99|                  2|              49.98|                 40|                   |       897|http://images.acm...|Team Golf New Eng...|        24.99|
+-----------------+--------------------+--------+------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------------+-------------------+----------+--------------------+--------------------+-------------+


4. Group the data based on "order_date","product_name", "order_status"  to get the sum of all the order_item_subtotal
scala> val finalOutput = closedAndCompletedOrders.groupBy("order_date","product_name", "order_status").sum("order_item_subtotal")



5. Check the final output. Using above approach we can find total revenue based on order date, product name and order status.

scala> finalOutput.show

+--------------------+--------------------+------------+------------------------+
|          order_date|        product_name|order_status|sum(order_item_subtotal)|
+--------------------+--------------------+------------+------------------------+
|2013-07-27 00:00:...|Team Golf San Fra...|    COMPLETE|                   74.97|
|2013-08-01 00:00:...|Nike Men's Dri-FI...|    COMPLETE|                  4500.0|
|2013-08-05 00:00:...|Glove It Women's ...|    COMPLETE|                   59.97|
|2013-08-06 00:00:...|adidas Men's F10 ...|    COMPLETE|                  299.95|


6. Save the data to a text file.

When we try to save the DataFrame diirectly to a text file it throws error.

scala> finalOutput.save("file:///home/cloudera/workspace/scala/orderRevenue/")
org.apache.spark.sql.AnalysisException: Attribute name "sum(order_item_subtotal)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.        ;

The column sum(order_item_subtotal) is having a special char ("()") which needs to be renamed.

Lets rename the columns and save the output in a new DataFrame.

scala> val saveToFile = finalOutput.select($"order_date",$"product_name",$"order_status",  $"sum(order_item_subtotal)".alias("sum_subtotal"))
saveToFile: org.apache.spark.sql.DataFrame = [order_date: string, product_name: string, order_status: string, sum_subtotal: double]

Save the DataFrame to a text file by converting DataFrame into a RDD.
scala> saveToFile.rdd.saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/text")

scala> saveToFile.saveAsParquetFile("file:///home/cloudera/workspace/scala/orderRevenue/parguet/")

scala> saveToFile.rdd.map(x=>(x(0) + "," + x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/csv/")
scala> saveToFile.rdd.map(x=>(x(0) + "\t" + x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/tsv/")

[root@quickstart orderRevenue]# pwd
/home/cloudera/workspace/scala/orderRevenue
[root@quickstart orderRevenue]# ls
csv  parguet  text  tsv

[root@quickstart orderRevenue]# tail csv/part-00000
2014-07-02 00:00:00.0,Nike Men's Dri-FIT Victory Golf Polo
2014-07-05 00:00:00.0,Team Golf New England Patriots Putter Grip
2014-07-13 00:00:00.0,Field & Stream Sportsman 16 Gun Fire Safe


[root@quickstart orderRevenue]# tail text/part-00000
[2014-07-02 00:00:00.0,Nike Men's Dri-FIT Victory Golf Polo,CLOSED,900.0]
[2014-07-05 00:00:00.0,Team Golf New England Patriots Putter Grip,COMPLETE,124.95]

[2014-07-13 00:00:00.0,Field & Stream Sportsman 16 Gun Fire Safe,COMPLETE,4799.76]

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

Problem: Find top rated movie using HIVE and store the result to HDFS