Spark: Read HDFS json file and filter records based on certain criteria.

Spark - Exercise 2: Read HDFS json file and filter records based on certain criteria.

Problem statement: Find all the Person records(from JSON input file) having age greater than 30 years.

1. Create a file on local file system with name Person.txt

>vi Person.json

2. Add below records.

{"Name" : "Vinayak", "Age":35}
{"Name" : "Nilesh", "Age":37}
{"Name" : "Raju", "Age":30}
{"Name" : "Karthik", "Age":28}
{"Name" : "Shreshta","Age":1}
{"Name" : "Siddhish", "Age":2}

3. Create a directory on HDFS file system.

hadoop fs -mkdir /user/spark/PersonJSONExample/

4. Put Person.json file onto HDFS system

hadoop fs -put Person.json /user/spark/PersonJSONExample/

5. Check whether the file is been uploaded?

[root@localhost PersonExample]# hadoop fs -ls /user/spark/PersonJSONExample/
Found 1 items
-rw-r--r--   1 root supergroup        182 2017-12-17 19:49 /user/spark/PersonJSONExample/Person.json

6. Start spark shell using command spark-shell
$>spark-shell

7. Load the file using sql context.

scala> var personsjson = sqlContext.read.json("/user/spark/PersonJSONExample/Person.json");
personsjson: org.apache.spark.sql.DataFrame = [Age: bigint, Name: string]

scala> personsjson.printSchema
root
 |-- Age: long (nullable = true)
 |-- Name: string (nullable = true)

8. Use filter method of DataFrame to filter records.
scala> personsjson.filter("Age>30")
res38: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Age: bigint, Name: string]

scala> personsjson.filter("Age>30").show
+---+-------+
|Age|   Name|
+---+-------+
| 35|Vinayak|
| 37| Nilesh|
+---+-------+

Alternate method:

8.1 Create a class with the same attributes as defined in json(The attributes are case sensitive)
scala> case class Person(Name: String, Age: Long)
defined class Person

DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
scala> val personclass = personsjson.as[Person]
personclass: org.apache.spark.sql.Dataset[Person] = [Age: bigint, Name: string]

Using class object one can filter out the required data
scala> personclass.filter("Age>30").show
+---+-------+
|Age|   Name|
+---+-------+
| 35|Vinayak|
| 37| Nilesh|
+---+-------+

8.2. Using where method of DataFrame to fetch required data.
scala> personsjson.where("Age>30").show
+---+-------+
|Age|   Name|
+---+-------+
| 35|Vinayak|
| 37| Nilesh|
+---+-------+

8.3 Similarly DataSet (personclass) where method can be used to fetch the requred details.
scala> personclass.where("Age>30").show
+---+-------+
|Age|   Name|
+---+-------+
| 35|Vinayak|
| 37| Nilesh|

+---+-------+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS