How to use map, filter, groupByKey.
How to use map, filter, groupByKey
Problem statement: Get daily revenue by product with
CLOSED and COMPLETED orders. Save the output to a text, CSV, TSV and parguet
file.
The example is using spark-shell for executing all the
steps.
1. Read all the files from local system, The same can
be read from HDFS as well. For
simplicity of the example I am reading it from local file system.
val orders =
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders")
val order_items =
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items")
val products =
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products")
All the data files
can be found at GitHub.
2. Check the content of each RDD. This will help to
understand the data structure and position of each element.
orders.take(3).foreach(println)
order_items.take(3).foreach(println)
products.take(3).foreach(println)
3. orders RDD is having data in following sequence
(comma separated)
order_id int,
order_date
timestamp,
order_customer_id
int,
order_status string
scala>
orders.take(3).foreach(println)
1,2013-07-25
00:00:00.0,11599,CLOSED
2,2013-07-25
00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25
00:00:00.0,12111,COMPLETE
4. order_items RDD is having data in following
sequence (comma separated)
order_items_id int
order_item_order_id
int
order_item_product_id
int
order_item_quantity
int
order_item_subtotal
float
order_item_product_price
float
scala>
order_items.take(3).foreach(println)
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
5. products is having data in following sequence
(comma separated)
product_id int,
product_category
int,
product_name String,
product_description
String,
product_price float,
product_url String
scala>
products.take(3).foreach(println)
1,2,Quest Q64 10 FT.
x 10 FT. Slant Leg Instant
U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour
Men's Highlight MC Football
Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour
Men's Renegade D Mid Football
Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
6. Now lets first filter out all the required records
(i.e CLOSED and COMPLETE) from order RDD. This may help to improve the
performance as less data will be required for processing.
val
closedAndCompletedFilteredOrders = orders.filter(order =>
order.split(",")(3) == "CLOSED" ||
order.split(",")(3) == "COMPLETE")
7. Map the closedAndCompletedFilteredOrders RDD, Since
we need to join orders with order_items, we need to create a map of orders with
order_id as key (which is a foreign key in order_items). Also we are interested
in order_date from orders RDD as other columns are not been used in final
output.
val
closedAndCompletedOrdersMap = closedAndCompletedFilteredOrders.map(order =>
{
val splittedRow = order.split(",")
val orderId = splittedRow(0).toInt
val orderDate = splittedRow(1)
(orderId, orderDate)
})
Check the content of
new map (RDD)
scala>
closedAndCompletedOrdersMap.take(2)
res54: Array[(Int,
String)] = Array((1,2013-07-25 00:00:00.0), (3,2013-07-25 00:00:00.0))
8. Similarly map order_items with order_id as key.
val orderItemsMap =
order_items.map(orderItems => {
val splittedRow =
orderItems.split(",")
val orderId = splittedRow(1).toInt
val orderProductId = splittedRow(2)
val orderItemsubTotal= splittedRow(4)
(orderId,
(orderProductId,orderItemsubTotal))
})
orderItemsMap RDD
content:
orderItemsMap:
org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[247] at
map at <console>:29
9. Join both the RDD (orders and order_items).
scala> val
orderOrderItemMap = closedAndCompletedOrdersMap.join(orderItemsMap)
orderOrderItemMap:
org.apache.spark.rdd.RDD[(Int, (String, (String, String)))] =
MapPartitionsRDD[250] at join at <console>:37
10. Check the content of the joined RDD, One need to
understand the content to further process the data.
scala>
orderOrderItemMap.take(5).foreach(println)
(65722,(2014-05-23
00:00:00.0,(365,119.98)))
(65722,(2014-05-23
00:00:00.0,(730,400.0)))
(65722,(2014-05-23
00:00:00.0,(1004,399.98)))
(65722,(2014-05-23
00:00:00.0,(627,199.95)))
(65722,(2014-05-23
00:00:00.0,(191,199.98)))
11. The below step is just for testing on how I can
access to the content of the joined map.
scala>
orderOrderItemMap.map(ooim => ooim._2).take(5).foreach(println)
18/01/01 16:53:43
WARN memory.TaskMemoryManager: leak 36.8 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@7fbc5c70
18/01/01 16:53:43
ERROR executor.Executor: Managed memory leak detected; size = 38574974 bytes,
TID = 2446
(2014-05-23
00:00:00.0,(365,119.98))
(2014-05-23
00:00:00.0,(730,400.0))
(2014-05-23
00:00:00.0,(1004,399.98))
(2014-05-23
00:00:00.0,(627,199.95))
(2014-05-23
00:00:00.0,(191,199.98))
12. Now we need to join the orderOrderItemMap with
product RDD, However orderOrderItemMap is having order_id as the key and not
the product_id. So we need to remap the RDD.
val orderProd =
orderOrderItemMap.map(ooim => {
val orderDate = ooim._2._1
val orderProductId = ooim._2._2._1
val orderSubTotal = ooim._2._2._2
(orderProductId, (orderDate, orderSubTotal))
})
13. Now on one side we have a order_Order_item map
with key as product_id, Similarly we need to create product map.
val productMap =
products.map(product => {
val splittedRow =
product.split(",")
val productId = splittedRow(0)
val productName = splittedRow(2)
(productId, productName)
})
Check the content of the productMap.
scala>
productMap.take(3).foreach(println)
(1,Quest Q64 10 FT.
x 10 FT. Slant Leg Instant U)
(2,Under Armour
Men's Highlight MC Football Clea)
(3,Under Armour
Men's Renegade D Mid Football Cl)
14. Now join both the map to get a final (may be).
val orderAndProdJoin
= orderProd.join(productMap)
scala> val
orderAndProdJoin = orderProd.join(productMap)
orderAndProdJoin:
org.apache.spark.rdd.RDD[(String, ((String, String), String))] =
MapPartitionsRDD[256] at join at <console>:45
Check the content of the orderAndProdJoin RDD. we need
to re arrange the data to apply group on the order_date and product_name.
scala>
orderAndProdJoin.take(3).foreach(println)
18/01/01 17:03:42
WARN memory.TaskMemoryManager: leak 5.4 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@6f987b8
(273,((2014-04-07
00:00:00.0,139.95),Under Armour Kids' Mercenary Slide))
(273,((2013-12-30
00:00:00.0,27.99),Under Armour Kids' Mercenary Slide))
(273,((2014-01-16
00:00:00.0,55.98),Under Armour Kids' Mercenary Slide))
15. Lets re arrange the content with key as product
name and order date.
val
orderAndProdMapped = orderAndProdJoin.map(row => {
val orderDate = row._2._1._1
val orderSubTotal = row._2._1._2
val productName = row._2._2
((productName, orderDate), orderSubTotal)})
Execution:
scala> val
orderAndProdMapped = orderAndProdJoin.map(row => {
|
val orderDate = row._2._1._1
|
val orderSubTotal = row._2._1._2
|
val productName = row._2._2
|
((productName, orderDate), orderSubTotal)})
orderAndProdMapped:
org.apache.spark.rdd.RDD[((String, String), String)] = MapPartitionsRDD[262] at
map at <console>:47
16. Let group the data based on key (productName,
orderDate)
val
orderAndProdGrouped = orderAndProdMapped.groupByKey()
Execution:
scala> val
orderAndProdGrouped = orderAndProdMapped.groupByKey()
orderAndProdGrouped:
org.apache.spark.rdd.RDD[((String, String), Iterable[String])] =
ShuffledRDD[263] at groupByKey at <console>:49
17. Check the content of our final grouped RDD.
orderAndProdGrouped.take(1).foreach(println)
((Nike Men's Comfort
2 Slide,2014-03-20 00:00:00.0),CompactBuffer(224.95))
As we can see the
value of the RDD is a CompactBuffer which need to be fetched and totalled to
final aggregate for a given productName and order_date .
val
productWiseRevenueMap = orderAndProdGrouped.map(row => {
val productName = row._1._1
val orderDate = row._1._2
val orderSubTotalIterator = row._2
var total =0.0
val it = orderSubTotalIterator.toIterator
while(it.hasNext){
total+=it.next.toDouble
}
((productName,orderDate),total)
})
18. Oofff....Check the final output.
scala>
productWiseRevenueMap.take(10).foreach(println)
((Nike Men's Comfort
2 Slide,2014-03-20 00:00:00.0),224.95)
((Nike Men's CJ
Elite 2 TD Football Cleat,2013-09-04 00:00:00.0),1819.8600000000001)
((Under Armour Men's
Tech II T-Shirt,2014-05-17 00:00:00.0),124.95)
((Diamondback
Women's Serene Classic Comfort Bi,2013-08-21 00:00:00.0),1799.88)
((LIJA Women's
Eyelet Sleeveless Golf Polo,2014-04-25 00:00:00.0),130.0)
((Pelican Sunstream
100 Kayak,2014-05-03 00:00:00.0),5399.729999999997)
((Perfect Fitness
Perfect Rip Deck,2013-09-24 00:00:00.0),5459.089999999998)
((Perfect Fitness
Perfect Rip Deck,2014-06-16 00:00:00.0),6238.959999999998)
((Under Armour
Hustle Storm Medium Duffle Bag,2014-07-13 00:00:00.0),209.94)
((Perfect Fitness
Perfect Rip Deck,2014-02-06 00:00:00.0),5939.0099999999975)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
There
is a easy way to get the same result.
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
How to use sqlContext.sql function to query the data
from a text file.
Our problem statement will be same, i.e we need to
find order revenue for a product and order date.
On High level we are going to use following step to
achieve the requirement.
Read Text file
into RDD --> Create Case classes --> Create Map from RDD using case
classes --> Convert Map into a DataFrame --> Register DataFrame into a
temp table.
1. Create RDD of the text data using spark context.
scala> val orders
=
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders")
orders:
org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/orders
MapPartitionsRDD[150] at textFile at <console>:27
Check the content.
scala>
orders.collect.take(3).foreach(println)
1,2013-07-25
00:00:00.0,11599,CLOSED
2,2013-07-25
00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25
00:00:00.0,12111,COMPLETE
2. Since the content of the RDD is all String, We need
to parse the input String to Int and Double. For this we need to create a
funciton.
def toInt(s:
String): Int = {
try {
s.toInt
} catch {
case e: Exception => 0
}
}
def toDouble(s:
String): Double = {
try {
s.toDouble
} catch {
case e: Exception => 0.0
}
}
3. Create case class for mapping the RDD.
scala> case class
Order (order_customer_id: Int, order_date:String, order_id: Int,
order_status:String)
defined class Order
4. Create Map of orders RDD.
val ordersMapped =
orders.map(row => {
val cell = row.split(",")
val order_id = toInt(cell(0))
val order_date = cell(1)
val order_customer_id = toInt(cell(2))
val order_status = cell(3)
Order(order_customer_id,order_date, order_id,order_status) })
5. Convert the RDD into a DataFrame.
scala> val
ordersDataFrame = ordersMapped.toDF
ordersDataFrame:
org.apache.spark.sql.DataFrame = [order_customer_id: int, order_date: string,
order_id: int, order_status: string]
6. Register Orders DataFrame as a temp table ORDERS.
scala>
ordersDataFrame.registerTempTable("ORDERS")
7. Check the content of the ORDERS table.
scala>
sqlContext.sql("SELECT * FROM ORDERS").limit(2).show
+-----------------+--------------------+--------+---------------+
|order_customer_id| order_date|order_id| order_status|
+-----------------+--------------------+--------+---------------+
| 1|2013-07-25 00:00:...| 11599| CLOSED|
| 2|2013-07-25 00:00:...| 256|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+
8. Similarly we need to create temp tables for other
RDD's. Lets create for order_items.
scala> val
order_items =
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items",
2)
order_items:
org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/order_items
MapPartitionsRDD[159] at textFile at <console>:27
9. Create case for order_items.
scala> case class
OrderItems(order_item_id: Int,order_item_order_id: Int,order_item_product_id:
Int,order_item_product_price: Double,order_item_quantity:
Int,order_item_subtotal: Double)
defined class
order_items
10. Crete a map of order_items using OrderItems case class.
val OrderItemsMapped
= order_items.map(row => {
val
cell = row.split(",")
val order_item_id = toInt(cell(0))
val order_item_order_id = toInt(cell(1))
val order_item_product_id =
toInt(cell(2))
val order_item_quantity = toInt(cell(3))
val order_item_product_price =
toDouble(cell(4))
val order_item_subtotal = toDouble(cell(4))
OrderItems(order_item_id,
order_item_order_id, order_item_product_id, order_item_product_price,
order_item_quantity,
order_item_subtotal) })
11. Convert the RDD into a DataFrame.
scala> val
OrderItemsDataFrame = OrderItemsMapped.toDF
OrderItemsDataFrame:
org.apache.spark.sql.DataFrame = [order_item_id: int, order_item_order_id: int,
order_item_product_id: int, order_item_product_price: double,
order_item_quantity: int, order_item_subtotal: double]
12. Register DataFrame into a temp table.
scala>
OrderItemsDataFrame.registerTempTable("ORDER_ITEMS")
scala>
sqlContext.sql("SELECT * FROM ORDER_ITEMS").limit(3).show
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
| 1| 1| 957| 299.98| 1| 299.98|
| 2| 2| 1073| 199.99| 1| 199.99|
| 3| 2| 502| 250.0| 5| 250.0|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
13. Similarly read the text file for products and
create RDD and DataFrame.
scala> val
products =
sc.textFile("file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products",
2)
products:
org.apache.spark.rdd.RDD[String] = file:///home/cloudera/workspace/scala/rajudata/data-master/retail_db/products
MapPartitionsRDD[172] at textFile at <console>:27
14. Check the content of the products.
scala>
products.collect.take(3).foreach(println)
1,2,Quest Q64 10 FT.
x 10 FT. Slant Leg Instant
U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
2,2,Under Armour
Men's Highlight MC Football
Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
3,2,Under Armour
Men's Renegade D Mid Football
Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
15. Create a case class for the products rdd mapping.
case class
Product(product_id:Int, product_category_id:Int, product_name:String,
product_price:Double)
16. Create a products map.
val productsMapped =
products.map(row => {
val
cell = row.split(",")
val product_id = toInt(cell(0))
val product_category_id = toInt(cell(1))
val product_name = cell(2)
val product_price = toDouble(cell(4))
Product(product_id, product_category_id,
product_name, product_price) })
17. Create DataFrame of products.
scala> val
productsDataFrame = productsMapped.toDF
productsDataFrame:
org.apache.spark.sql.DataFrame = [product_id: int, product_category_id: int,
product_name: int, product_price: double]
18. Register DataFrame as a temp table.
scala>
productsDataFrame.registerTempTable("PRODUCTS")
19. Check the content of the PRODUCTS table.
scala>
sqlContext.sql("SELECT * FROM PRODUCTS").limit(3).show
20. Final output can be found using sqlContext.sql
funciton.
val finalOutput =
sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, ORDER_STATUS,
SUM(ORDER_ITEM_SUBTOTAL) AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID =
ORDER_ITEM_ORDER_ID) JOIN PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID)
WHERE ORDER_STATUS='CLOSED' OR
ORDER_STATUS='COMPLETE' GROUP BY PRODUCT_NAME,ORDER_DATE,
ORDER_STATUS")
scala>
finalOutput.limit(10).show
+--------------------+--------------------+------------+------------------+
| PRODUCT_NAME| ORDER_DATE|ORDER_STATUS| SUM|
+--------------------+--------------------+------------+------------------+
|adidas Kids' F5
M...|2014-07-12 00:00:...|
CLOSED| 34.99|
|Under Armour
Kids...|2014-07-19 00:00:...|
COMPLETE| 27.99|
|Under Armour
Kids...|2013-08-02 00:00:...|
COMPLETE| 27.99|
|Pelican
Sunstream...|2013-12-04 00:00:...|
COMPLETE|2199.8900000000003|
|Pelican
Sunstream...|2013-09-15 00:00:...|
COMPLETE| 2399.88|
|Pelican
Sunstream...|2014-03-16 00:00:...|
COMPLETE|2999.8499999999995|
|Pelican
Sunstream...|2013-10-01 00:00:...|
CLOSED| 399.98|
|Pelican
Sunstream...|2014-07-11 00:00:...|
CLOSED| 399.98|
|Under Armour
Wome...|2014-07-20 00:00:...|
COMPLETE| 31.99|
|Under Armour
Wome...|2013-11-28 00:00:...|
COMPLETE| 31.99|
+--------------------+--------------------+------------+------------------+
Conclusion: The second approach is much more easier
compared to first one if one know SQL.
Now lets Check whether both the approach gives the
same result.
Lets find output from both method for some product say
("Under Armour Hustle Storm Medium Duffle Bag")
scala>
productWiseRevenueMap.filter(x => x._1._1=="Under Armour Hustle Storm
Medium Duffle Bag").collect.take(10).foreach(println)
((Under Armour Hustle Storm Medium Duffle
Bag,2014-07-13 00:00:00.0),209.94)
((Under Armour Hustle Storm Medium Duffle
Bag,2014-02-06 00:00:00.0),139.96)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-08-01 00:00:00.0),139.96)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-10-15 00:00:00.0),139.96)
((Under Armour
Hustle Storm Medium Duffle Bag,2014-01-01 00:00:00.0),139.96)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-09-17 00:00:00.0),34.99)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-10-26 00:00:00.0),104.97)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-09-14 00:00:00.0),104.97)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-08-02 00:00:00.0),104.97)
((Under Armour
Hustle Storm Medium Duffle Bag,2013-09-10 00:00:00.0),174.95)
scala>
sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, SUM(ORDER_ITEM_SUBTOTAL)
AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID = ORDER_ITEM_ORDER_ID) JOIN
PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID) WHERE (ORDER_STATUS='CLOSED'
OR ORDER_STATUS='COMPLETE') AND
PRODUCT_NAME='Under Armour Hustle Storm Medium Duffle Bag' and
ORDER_DATE='2014-07-13 00:00:00.0' GROUP BY PRODUCT_NAME,ORDER_DATE
").limit(10).show
+--------------------+--------------------+------+
| PRODUCT_NAME| ORDER_DATE| SUM|
+--------------------+--------------------+------+
|Under Armour Hust...|2014-07-13 00:00:...|209.94|
+--------------------+--------------------+------+
scala>
sqlContext.sql("SELECT PRODUCT_NAME, ORDER_DATE, SUM(ORDER_ITEM_SUBTOTAL)
AS SUM FROM ORDERS JOIN ORDER_ITEMS ON (ORDER_ID = ORDER_ITEM_ORDER_ID) JOIN
PRODUCTS ON (ORDER_ITEM_PRODUCT_ID = PRODUCT_ID) WHERE (ORDER_STATUS='CLOSED'
OR ORDER_STATUS='COMPLETE') AND
PRODUCT_NAME='Under Armour Hustle Storm Medium Duffle Bag' and
ORDER_DATE='2014-02-06 00:00:00.0' GROUP BY PRODUCT_NAME,ORDER_DATE
").limit(10).show
+--------------------+--------------------+------+
| PRODUCT_NAME| ORDER_DATE| SUM|
+--------------------+--------------------+------+
|Under Armour Hust...|2014-02-06 00:00:...|139.96|
+--------------------+--------------------+------+
Comments
Post a Comment