How to use DataFrame registerTempTable in spark SQL.
How to use DataFrame registerTempTable in spark SQL.
Problem statement: Get daily revenue by product with
CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet
file.
1. Load all the json file into spark DataSet using
sqlContext.
All the data files
can be found at GitHub.
scala> val
products =
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/products/*")
scala> val orders
=
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/orders/*")
scala> val
order_items =
sqlContext.read.json("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db_json/order_items/*")
scala>
products.limit(2).show
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|product_category_id|product_description|product_id| product_image| product_name|product_price|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
| 2| | 1|http://images.acm...|Quest Q64 10
FT. ...| 59.98|
| 2| | 2|http://images.acm...|Under Armour
Men'...| 129.99|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
scala>
orders.limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id| order_date|order_id| order_status|
+-----------------+--------------------+--------+---------------+
| 11599|2013-07-25 00:00:...| 1| CLOSED|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+
scala>
order_items.limit(2).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
| 1| 1| 957| 299.98| 1| 299.98|
| 2| 2| 1073| 199.99| 1| 199.99|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
2. Register all the tables as a temp table.
scala>
products.registerTempTable("products")
scala>
orders.registerTempTable("orders")
scala>
order_items.registerTempTable("order_items")
3. Get the final output using sqlContext sql function.
val finalOutput =
sqlContext.sql("select product_name, order_date, order_status,
sum(order_item_subtotal) as sum from orders join order_items on (order_id =
order_item_order_id) join products on (order_item_product_id = product_id)
where order_status='CLOSED' or
order_status='COMPLETE' group by product_name,order_date,
order_status")
4. Save the DataFrame in different file format
scala>
finalOutput.saveAsParquetFile("file:////home/cloudera/workspace/scala/orderRevenue/parguet")
scala>
finalOutput.rdd.map(x=>(x(0) + "," +
x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/csv/")
scala>
finalOutput.rdd.map(x=>(x(0) + "\t" +
x(1))).saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/tsv/")
scala>
finalOutput.rdd.saveAsTextFile("file:///home/cloudera/workspace/scala/orderRevenue/text")
Comments
Post a Comment