How to use map, filter, groupByKey.

How to use map, filter, groupByKey

Problem statement: Get daily revenue by product with CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet file.

The example is using spark-shell for executing all the steps.

1. Read all the files from local system, The same can be read from HDFS as well.  For simplicity of the example I am reading it from local file system.
val orders = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders")
val order_items = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items")
val products = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products")

All the data files can be found at GitHub.

2. Check the content of each RDD. This will help to understand the data structure and position of each element.
orders.take(3).foreach(println)
order_items.take(3).foreach(println)
products.take(3).foreach(println)

3. orders RDD is having data in following sequence (comma separated)

order_id int,
order_date timestamp,
order_customer_id int,
order_status string

scala> orders.take(3).foreach(println)
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE

4. order_items RDD is having data in following sequence (comma separated)

order_items_id int
order_item_order_id int
order_item_product_id int
order_item_quantity int
order_item_subtotal float
order_item_product_price float

scala> order_items.take(3).foreach(println)
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0

5. products is having data in following sequence (comma separated)
product_id int,
product_category int,
product_name String,
product_description String,
product_price float,
product_url String

scala> products.take(3).foreach(println)
1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat


6. Now lets first filter out all the required records (i.e CLOSED and COMPLETE) from order RDD. This may help to improve the performance as less data will be required for processing.
val closedAndCompletedFilteredOrders = orders.filter(order => order.split(",")(3) == "CLOSED" || order.split(",")(3) == "COMPLETE")


7. Map the closedAndCompletedFilteredOrders RDD, Since we need to join orders with order_items, we need to create a map of orders with order_id as key (which is a foreign key in order_items). Also we are interested in order_date from orders RDD as other columns are not been used in final output.

val closedAndCompletedOrdersMap = closedAndCompletedFilteredOrders.map(order => {
   val splittedRow = order.split(",")
   val orderId = splittedRow(0).toInt
   val orderDate = splittedRow(1)
   (orderId, orderDate)
})

Check the content of new map (RDD)
scala> closedAndCompletedOrdersMap.take(2)
res54: Array[(Int, String)] = Array((1,2013-07-25 00:00:00.0), (3,2013-07-25 00:00:00.0))

8. Similarly map order_items with order_id as key.

val orderItemsMap = order_items.map(orderItems => {
   val splittedRow = orderItems.split(",")
   val orderId = splittedRow(1).toInt
   val orderProductId = splittedRow(2)
   val orderItemsubTotal= splittedRow(4)
   (orderId, (orderProductId,orderItemsubTotal))
})

orderItemsMap RDD content:
orderItemsMap: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[247] at map at <console>:29

9. Join both the RDD (orders and order_items).

scala> val orderOrderItemMap = closedAndCompletedOrdersMap.join(orderItemsMap)
orderOrderItemMap: org.apache.spark.rdd.RDD[(Int, (String, (String, String)))] = MapPartitionsRDD[250] at join at <console>:37

10. Check the content of the joined RDD, One need to understand the content to further process the data.

scala> orderOrderItemMap.take(5).foreach(println)

(65722,(2014-05-23 00:00:00.0,(365,119.98)))
(65722,(2014-05-23 00:00:00.0,(730,400.0)))
(65722,(2014-05-23 00:00:00.0,(1004,399.98)))
(65722,(2014-05-23 00:00:00.0,(627,199.95)))
(65722,(2014-05-23 00:00:00.0,(191,199.98)))

11. The below step is just for testing on how I can access to the content of the joined map.

scala> orderOrderItemMap.map(ooim => ooim._2).take(5).foreach(println)
18/01/01 16:53:43 WARN memory.TaskMemoryManager: leak 36.8 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@7fbc5c70
18/01/01 16:53:43 ERROR executor.Executor: Managed memory leak detected; size = 38574974 bytes, TID = 2446
(2014-05-23 00:00:00.0,(365,119.98))
(2014-05-23 00:00:00.0,(730,400.0))
(2014-05-23 00:00:00.0,(1004,399.98))
(2014-05-23 00:00:00.0,(627,199.95))
(2014-05-23 00:00:00.0,(191,199.98))


12. Now we need to join the orderOrderItemMap with product RDD, However orderOrderItemMap is having order_id as the key and not the product_id. So we need to remap the RDD.

val orderProd = orderOrderItemMap.map(ooim => {
   val orderDate = ooim._2._1
   val orderProductId = ooim._2._2._1
   val orderSubTotal = ooim._2._2._2
   (orderProductId, (orderDate, orderSubTotal))
  })

13. Now on one side we have a order_Order_item map with key as product_id, Similarly we need to create product map.

val productMap = products.map(product => {
   val splittedRow = product.split(",")
   val productId = splittedRow(0)
   val productName = splittedRow(2)
   (productId, productName)
})

Check the content of the productMap.

scala> productMap.take(3).foreach(println)
(1,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U)
(2,Under Armour Men's Highlight MC Football Clea)
(3,Under Armour Men's Renegade D Mid Football Cl)

14. Now join both the map to get a final (may be).

val orderAndProdJoin = orderProd.join(productMap)

scala> val orderAndProdJoin = orderProd.join(productMap)
orderAndProdJoin: org.apache.spark.rdd.RDD[(String, ((String, String), String))] = MapPartitionsRDD[256] at join at <console>:45

Check the content of the orderAndProdJoin RDD. we need to re arrange the data to apply group on the order_date and product_name.

scala> orderAndProdJoin.take(3).foreach(println)
18/01/01 17:03:42 WARN memory.TaskMemoryManager: leak 5.4 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@6f987b8
(273,((2014-04-07 00:00:00.0,139.95),Under Armour Kids' Mercenary Slide))
(273,((2013-12-30 00:00:00.0,27.99),Under Armour Kids' Mercenary Slide))
(273,((2014-01-16 00:00:00.0,55.98),Under Armour Kids' Mercenary Slide))

15. Lets re arrange the content with key as product name and order date.

val orderAndProdMapped = orderAndProdJoin.map(row => {
   val orderDate = row._2._1._1
   val orderSubTotal = row._2._1._2
   val productName = row._2._2
   ((productName, orderDate), orderSubTotal)})

Execution:

scala> val orderAndProdMapped = orderAndProdJoin.map(row => {
     |    val orderDate = row._2._1._1
     |    val orderSubTotal = row._2._1._2
     |    val productName = row._2._2
     |    ((productName, orderDate), orderSubTotal)})
orderAndProdMapped: org.apache.spark.rdd.RDD[((String, String), String)] = MapPartitionsRDD[262] at map at <console>:47


16. Let group the data based on key (productName, orderDate)

val orderAndProdGrouped = orderAndProdMapped.groupByKey()

Execution:
scala> val orderAndProdGrouped = orderAndProdMapped.groupByKey()
orderAndProdGrouped: org.apache.spark.rdd.RDD[((String, String), Iterable[String])] = ShuffledRDD[263] at groupByKey at <console>:49

17. Check the content of our final grouped RDD.

orderAndProdGrouped.take(1).foreach(println)
((Nike Men's Comfort 2 Slide,2014-03-20 00:00:00.0),CompactBuffer(224.95))

As we can see the value of the RDD is a CompactBuffer which need to be fetched and totalled to final aggregate for a given productName and order_date .

val productWiseRevenueMap = orderAndProdGrouped.map(row => {     
   val productName = row._1._1
   val orderDate = row._1._2
   val orderSubTotalIterator = row._2
   var total =0.0
   val it = orderSubTotalIterator.toIterator
    while(it.hasNext){
     total+=it.next.toDouble
    }
    ((productName,orderDate),total)
   })

18. Oofff....Check the final output.
scala> productWiseRevenueMap.take(10).foreach(println)

((Nike Men's Comfort 2 Slide,2014-03-20 00:00:00.0),224.95)
((Nike Men's CJ Elite 2 TD Football Cleat,2013-09-04 00:00:00.0),1819.8600000000001)
((Under Armour Men's Tech II T-Shirt,2014-05-17 00:00:00.0),124.95)
((Diamondback Women's Serene Classic Comfort Bi,2013-08-21 00:00:00.0),1799.88)
((LIJA Women's Eyelet Sleeveless Golf Polo,2014-04-25 00:00:00.0),130.0)
((Pelican Sunstream 100 Kayak,2014-05-03 00:00:00.0),5399.729999999997)
((Perfect Fitness Perfect Rip Deck,2013-09-24 00:00:00.0),5459.089999999998)
((Perfect Fitness Perfect Rip Deck,2014-06-16 00:00:00.0),6238.959999999998)
((Under Armour Hustle Storm Medium Duffle Bag,2014-07-13 00:00:00.0),209.94)
((Perfect Fitness Perfect Rip Deck,2014-02-06 00:00:00.0),5939.0099999999975)


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
There is a easy way to get the same result.
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
How to use sqlContext.sql function to query the data from a text file.

Our problem statement will be same, i.e we need to find order revenue for a product and order date.

On High level we are going to use following step to achieve the requirement.

Read Text file into RDD --> Create Case classes --> Create Map from RDD using case classes --> Convert Map into a DataFrame --> Register DataFrame into a temp table.

1. Create RDD of the text data using spark context.

scala> val orders = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders")
orders: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders MapPartitionsRDD[150] at textFile at <console>:27

Check the content.

scala> orders.collect.take(3).foreach(println)
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE

2. Since the content of the RDD is all String, We need to parse the input String to Int and Double. For this we need to create a funciton.

def toInt(s: String): Int = {
  try {
    s.toInt
  } catch {
    case e: Exception => 0
  }
}


def toDouble(s: String): Double = {
  try {
    s.toDouble
  } catch {
    case e: Exception => 0.0
  }
}

3. Create case class for mapping the RDD.

scala> case class Order (order_customer_id: Int, order_date:String, order_id: Int, order_status:String)
defined class Order

4. Create Map of orders RDD.

val ordersMapped = orders.map(row => {
    val cell = row.split(",")
    val order_id = toInt(cell(0))
    val order_date = cell(1)   
    val order_customer_id = toInt(cell(2))
    val order_status = cell(3)
    Order(order_customer_id,order_date,        order_id,order_status)        })


5. Convert the RDD into a DataFrame.

scala> val ordersDataFrame = ordersMapped.toDF
ordersDataFrame: org.apache.spark.sql.DataFrame = [order_customer_id: int, order_date: string, order_id: int, order_status: string]

6. Register Orders DataFrame as a temp table ORDERS.

scala> ordersDataFrame.registerTempTable("ORDERS")

7. Check the content of the ORDERS table.

scala> sqlContext.sql("SELECT * FROM ORDERS").limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|                1|2013-07-25 00:00:...|   11599|         CLOSED|
|                2|2013-07-25 00:00:...|     256|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+

8. Similarly we need to create temp tables for other RDD's. Lets create for order_items.

scala> val order_items = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items", 2)
order_items: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items MapPartitionsRDD[159] at textFile at <console>:27

9. Create case for order_items.

scala> case class OrderItems(order_item_id: Int,order_item_order_id: Int,order_item_product_id: Int,order_item_product_price: Double,order_item_quantity: Int,order_item_subtotal: Double)
defined class order_items

10. Crete a map of order_items using  OrderItems case class.

val OrderItemsMapped = order_items.map(row => {
val cell = row.split(",")
    val order_item_id = toInt(cell(0))
    val order_item_order_id = toInt(cell(1))
    val order_item_product_id = toInt(cell(2))  
    val order_item_quantity = toInt(cell(3))
    val order_item_product_price = toDouble(cell(4))
    val order_item_subtotal = toDouble(cell(4))
    OrderItems(order_item_id, order_item_order_id, order_item_product_id, order_item_product_price, order_item_quantity, order_item_subtotal)        })

11. Convert the RDD into a DataFrame.

scala> val OrderItemsDataFrame = OrderItemsMapped.toDF
OrderItemsDataFrame: org.apache.spark.sql.DataFrame = [order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_product_price: double, order_item_quantity: int, order_item_subtotal: double]

12. Register DataFrame into a temp table.

scala> OrderItemsDataFrame.registerTempTable("ORDER_ITEMS")

scala> sqlContext.sql("SELECT * FROM ORDER_ITEMS").limit(3).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|                  1|             199.99|
|            3|                  2|                  502|                   250.0|                  5|              250.0|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+

13. Similarly read the text file for products and create RDD and DataFrame.

scala> val products = sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products", 2)
products: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products MapPartitionsRDD[172] at textFile at <console>:27

14. Check the content of the products.

scala> products.collect.take(3).foreach(println)
1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat

15. Create a case class for the products rdd mapping.

case class Product(product_id:Int, product_category_id:Int, product_name:String, product_price:Double)

16. Create a products map.

val productsMapped = products.map(row => {
val cell = row.split(",")
    val product_id = toInt(cell(0))
    val product_category_id = toInt(cell(1))
    val product_name = cell(2)
    val product_price = toDouble(cell(4))
    Product(product_id, product_category_id, product_name, product_price)        })

17. Create DataFrame of products.

scala> val productsDataFrame = productsMapped.toDF
productsDataFrame: org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int, product_name: int, product_price: double]

18. Register DataFrame as a temp table.
scala> productsDataFrame.registerTempTable("PRODUCTS")

19. Check the content of the PRODUCTS table.

scala> sqlContext.sql("SELECT * FROM PRODUCTS").limit(3).show

20. Final output can be found using sqlContext.sql funciton.

val finalOutput = sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, ORDER_STATUS, SUM(ORDER_ITEM_SUBTOTAL) AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID = ORDER_ITEM_ORDER_ID) JOIN PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID) WHERE ORDER_STATUS='CLOSED' OR  ORDER_STATUS='COMPLETE' GROUP BY PRODUCT_NAME,ORDER_DATE, ORDER_STATUS")

scala> finalOutput.limit(10).show

+--------------------+--------------------+------------+------------------+    
|        PRODUCT_NAME|          ORDER_DATE|ORDER_STATUS|               SUM|
+--------------------+--------------------+------------+------------------+
|adidas Kids' F5 M...|2014-07-12 00:00:...|      CLOSED|             34.99|
|Under Armour Kids...|2014-07-19 00:00:...|    COMPLETE|             27.99|
|Under Armour Kids...|2013-08-02 00:00:...|    COMPLETE|             27.99|
|Pelican Sunstream...|2013-12-04 00:00:...|    COMPLETE|2199.8900000000003|
|Pelican Sunstream...|2013-09-15 00:00:...|    COMPLETE|           2399.88|
|Pelican Sunstream...|2014-03-16 00:00:...|    COMPLETE|2999.8499999999995|
|Pelican Sunstream...|2013-10-01 00:00:...|      CLOSED|            399.98|
|Pelican Sunstream...|2014-07-11 00:00:...|      CLOSED|            399.98|
|Under Armour Wome...|2014-07-20 00:00:...|    COMPLETE|             31.99|
|Under Armour Wome...|2013-11-28 00:00:...|    COMPLETE|             31.99|
+--------------------+--------------------+------------+------------------+

Conclusion: The second approach is much more easier compared to first one if one know SQL.

Now lets Check whether both the approach gives the same result.

Lets find output from both method for some product say ("Under Armour Hustle Storm Medium Duffle Bag")

scala> productWiseRevenueMap.filter(x => x._1._1=="Under Armour Hustle Storm Medium Duffle Bag").collect.take(10).foreach(println)
((Under Armour Hustle Storm Medium Duffle Bag,2014-07-13 00:00:00.0),209.94)
((Under Armour Hustle Storm Medium Duffle Bag,2014-02-06 00:00:00.0),139.96)
((Under Armour Hustle Storm Medium Duffle Bag,2013-08-01 00:00:00.0),139.96)
((Under Armour Hustle Storm Medium Duffle Bag,2013-10-15 00:00:00.0),139.96)
((Under Armour Hustle Storm Medium Duffle Bag,2014-01-01 00:00:00.0),139.96)
((Under Armour Hustle Storm Medium Duffle Bag,2013-09-17 00:00:00.0),34.99)
((Under Armour Hustle Storm Medium Duffle Bag,2013-10-26 00:00:00.0),104.97)
((Under Armour Hustle Storm Medium Duffle Bag,2013-09-14 00:00:00.0),104.97)
((Under Armour Hustle Storm Medium Duffle Bag,2013-08-02 00:00:00.0),104.97)
((Under Armour Hustle Storm Medium Duffle Bag,2013-09-10 00:00:00.0),174.95)


scala> sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, SUM(ORDER_ITEM_SUBTOTAL) AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID = ORDER_ITEM_ORDER_ID) JOIN PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID) WHERE (ORDER_STATUS='CLOSED' OR  ORDER_STATUS='COMPLETE') AND PRODUCT_NAME='Under Armour Hustle Storm Medium Duffle Bag' and ORDER_DATE='2014-07-13 00:00:00.0' GROUP BY PRODUCT_NAME,ORDER_DATE ").limit(10).show
+--------------------+--------------------+------+                             
|        PRODUCT_NAME|          ORDER_DATE|   SUM|
+--------------------+--------------------+------+
|Under Armour Hust...|2014-07-13 00:00:...|209.94|
+--------------------+--------------------+------+


scala> sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, SUM(ORDER_ITEM_SUBTOTAL) AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID = ORDER_ITEM_ORDER_ID) JOIN PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID) WHERE (ORDER_STATUS='CLOSED' OR  ORDER_STATUS='COMPLETE') AND PRODUCT_NAME='Under Armour Hustle Storm Medium Duffle Bag' and ORDER_DATE='2014-02-06 00:00:00.0' GROUP BY PRODUCT_NAME,ORDER_DATE ").limit(10).show
+--------------------+--------------------+------+                             
|        PRODUCT_NAME|          ORDER_DATE|   SUM|
+--------------------+--------------------+------+
|Under Armour Hust...|2014-02-06 00:00:...|139.96|

+--------------------+--------------------+------+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS