How to use DataFrame registerTempTable in spark SQL.


How to use DataFrame registerTempTable in spark SQL.

Problem statement: Get daily revenue by product with CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet file.

1. Load all the json file into spark DataSet using sqlContext.
All the data files can be found at GitHub.

scala> val products = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/products/*")

scala> val orders = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/orders/*")

scala> val order_items = sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/order_items/*")


scala> products.limit(2).show
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|product_category_id|product_description|product_id|       product_image|        product_name|product_price|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|                  2|                   |         1|http://images.acm...|Quest Q64 10 FT. ...|        59.98|
|                  2|                   |         2|http://images.acm...|Under Armour Men'...|       129.99|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+


scala> orders.limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+


scala> order_items.limit(2).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|                  1|             199.99|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+


2. Register all the tables as a temp table.

scala> products.registerTempTable("products")

scala> orders.registerTempTable("orders")

scala> order_items.registerTempTable("order_items")

3. Get the final output using sqlContext sql function.

val finalOutput = sqlContext.sql("select product_name, order_date, order_status, sum(order_item_subtotal) as sum from orders join order_items on (order_id = order_item_order_id) join products on (order_item_product_id = product_id) where order_status='CLOSED' or  order_status='COMPLETE' group by product_name,order_date, order_status")

4. Save the DataFrame in different file format

scala> finalOutput.saveAsParquetFile("file:////home/cloudera/workspace/scala/orderRevenue/parguet")

scala> finalOutput.rdd.map(x=>(x(0) + "," + x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/csv/")
scala> finalOutput.rdd.map(x=>(x(0) + "\t" + x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/tsv/")


scala> finalOutput.rdd.saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/text")

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS