Spark: Read HDFS text file and filter records based on certain criteria.
Spark - Exercise 1: Read HDFS text file and filter
records based on certain criteria.
Problem statement: Find all the Person records having
age greater than 30 years.
1. Create a file on local file system with name
Person.txt
>vi Person.txt.
2. Add below records.
Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2
3. Create a directory on HDFS file system.
hadoop fs -mkdir
/user/spark/PersonExample/
4. Put Person.txt file onto HDFS system
hadoop fs -put
Person.txt /user/spark/PersonExample/
5. Check whether the file is been uploaded?
[root@localhost
PersonExample]# hadoop fs -ls /user/spark/PersonExample/
Found 1 items
-rw-r--r-- 1 root supergroup 77 2017-12-17 14:34
/user/spark/PersonExample/Person.txt
6. Start spark shell using command spark-shell
$>spark-shell
7. Load the file using spark context.
scala> var
persons = sc.textFile("/user/spark/PersonExample/Person.txt");
8. Check the content of the loaded file.
scala> persons
res4:
org.apache.spark.rdd.RDD[String] = /user/spark/PersonExample/Person.txt
MapPartitionsRDD[1] at textFile at <console>:24
scala>
persons.collect
res3: Array[String]
= Array(Name, Age, Vinayak, 35, Nilesh, 37, Raju, 30, Karthik, 28, Shreshta,1,
Siddhish, 2)
scala>
persons.collect.foreach(println)
Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2
9. Since our Person RDD is having header value, We
need to remove the row before processing further.
scala> val
onlyPersons = persons.filter(line => !line.contains("Name") )
onlyPersons RDD will
have values other than header.
scala>
onlyPersons.collect
res17: Array[String]
= Array(Vinayak, 35, Nilesh, 37, Raju, 30, Karthik, 28, Shreshta,1, Siddhish,
2)
Note: We can skip
this step if there is bo header in the input text file.
10. Create RDD of Person object
scala> val
PersonRDD = onlyPersons.map(lines =>lines.split(",")).map(p=>
Person(p(0),p(1).trim().toInt))
Note: If we dont
remove the header part, We wont be able to map our persons object to Person
RDD.
11. Create Data Frame of Person using PersonRDD, Which
can be used to register it as a table.
scala> val
PersonDF = PersonRDD.toDF
PersonDF:
org.apache.spark.sql.DataFrame = [name: string, age: int]
scala>
PersonDF.show
+--------+---+
| name|age|
+--------+---+
| Vinayak| 35|
| Nilesh| 37|
| Raju| 30|
| Karthik| 28|
|Shreshta| 1|
|Siddhish| 2|
+--------+---+
12. We can use filter methos of Data frame to find all
the person with age greater than 30.
scala>
PersonDF.filter("age>30").show
Alternatively we can
register DF as a table and use sqlContext.sql query to fetch the required
details.
scala>
PersonDF.registerTempTable("PERSONTABLE");
Use Spark
Context(sc) to get sqlContext
scala> val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala>
sqlContext.sql("SELECT * FROM PERSONTABLE WHERE AGE>30").show
+-------+---+
| name|age|
+-------+---+
|Vinayak| 35|
| Nilesh| 37|
+-------+---+
Comments
Post a Comment