Spark: Read HDFS text file and filter records based on certain criteria.

Spark - Exercise 1: Read HDFS text file and filter records based on certain criteria.

Problem statement: Find all the Person records having age greater than 30 years.

1. Create a file on local file system with name Person.txt

>vi Person.txt.

2. Add below records.

Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2

3. Create a directory on HDFS file system.

hadoop fs -mkdir /user/spark/PersonExample/

4. Put Person.txt file onto HDFS system

hadoop fs -put Person.txt /user/spark/PersonExample/

5. Check whether the file is been uploaded?

[root@localhost PersonExample]# hadoop fs -ls /user/spark/PersonExample/
Found 1 items
-rw-r--r--   1 root supergroup         77 2017-12-17 14:34 /user/spark/PersonExample/Person.txt

6. Start spark shell using command spark-shell
$>spark-shell

7. Load the file using spark context.

scala> var persons = sc.textFile("/user/spark/PersonExample/Person.txt");

8. Check the content of the loaded file.
scala> persons
res4: org.apache.spark.rdd.RDD[String] = /user/spark/PersonExample/Person.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> persons.collect
res3: Array[String] = Array(Name, Age, Vinayak, 35, Nilesh, 37, Raju, 30, Karthik, 28, Shreshta,1, Siddhish, 2)

scala> persons.collect.foreach(println)
Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2

9. Since our Person RDD is having header value, We need to remove the row before processing further.

scala> val onlyPersons = persons.filter(line => !line.contains("Name") )

onlyPersons RDD will have values other than header.

scala> onlyPersons.collect
res17: Array[String] = Array(Vinayak, 35, Nilesh, 37, Raju, 30, Karthik, 28, Shreshta,1, Siddhish, 2)

Note: We can skip this step if there is bo header in the input text file.

10. Create RDD of Person object
scala> val PersonRDD = onlyPersons.map(lines =>lines.split(",")).map(p=> Person(p(0),p(1).trim().toInt))

Note: If we dont remove the header part, We wont be able to map our persons object to Person RDD.

11. Create Data Frame of Person using PersonRDD, Which can be used to register it as a table.

scala> val PersonDF = PersonRDD.toDF
PersonDF: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> PersonDF.show
+--------+---+
|    name|age|
+--------+---+
| Vinayak| 35|
|  Nilesh| 37|
|    Raju| 30|
| Karthik| 28|
|Shreshta|  1|
|Siddhish|  2|
+--------+---+

12. We can use filter methos of Data frame to find all the person with age greater than 30.
scala> PersonDF.filter("age>30").show
Alternatively we can register DF as a table and use sqlContext.sql query to fetch the required details.

scala> PersonDF.registerTempTable("PERSONTABLE");

Use Spark Context(sc) to get sqlContext
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala> sqlContext.sql("SELECT * FROM PERSONTABLE WHERE AGE>30").show
+-------+---+
|   name|age|
+-------+---+
|Vinayak| 35|
| Nilesh| 37|

+-------+---+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS