How to read a tab separated text file and display the details in a table using spark and Scala

How to read a tab separated text file and display the details in a table using spark and Scala

1. Read a text (tab separated) file which needs to be processed.

scala> val election2014 = sc.textFile("file:///home/hadoop/workspace/scala/rajudata/data-master/electionresults/ls2014.tsv")
election2014: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/scala/rajudata/data-master/electionresults/ls2014.tsv MapPartitionsRDD[59] at textFile at <console>:24

Note: You can get the data file at below location.

2. The input text file is having a header. Which needs to be filtered before actual processing.

scala> election2014.first
res72: String = state        constituency        candidate_name        sex        age        category        partyname        partysymbol        general        postal        total        pct_of_total_votes        pct_of_polled_votes        totalvoters

3. Remove the header using mapPartitionsWithIndex method.

scala> val electionDataWithoutHeader = election2014.mapPartitionsWithIndex((idx,rows) =>
                      {
                         if(idx==0)rows.drop(1)
                         else{                         
                          rows                      
                         }
                      })

scala> electionDataWithoutHeader.take(3).foreach(println)
Andhra Pradesh        Adilabad         GODAM NAGESH        M        49        ST        TRS        Car        425762        5085        430847        31.07931864        40.81807244        1386282
Andhra Pradesh        Adilabad         NARESH        M        37        ST        INC        Hand        257994        1563        259557        18.72324679        24.59020587        1386282
Andhra Pradesh        Adilabad         RAMESH RATHOD        M        48        ST        TDP        Bicycle        182879        1319        184198        13.28719553        17.45075933        1386282

electionDataWithoutHeader RDD is having data in tab separated lines.

4. We need to import required library
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._


5. Create a case class for Election data. Add one field for each column in the input data.

case class Election2014 (state: String, constituency:String, candidate_name:String,
sex:String, age:Int,category:String, partyname:String,partysymbol:String,general:String,
postal:String,total:Long,pct_of_total_votes:Double, pct_of_polled_votes:Double,totalvoters:Long)

6. Define some utility function which is required to process the numeric data, Execute the methods on spark-shell.

def toInt(s: String): Int = {
  try {
    s.toInt
  } catch {
    case e: Exception => 0
  }
}

def toLong(s: String): Long = {
  try {
    s.toLong
  } catch {
    case e: Exception => 0
  }
}

def toDouble(s: String): Double = {
  try {
    s.toDouble
  } catch {
    case e: Exception => 0.0
  }
}

5. Now parse each row and cell of electionDataWithoutHeader RDD[String]

scala> val election2014Obj = electionDataWithoutHeader.map(row => {
      val cells = row.split("\t")
      val state        = cells(0)
      val constituency        = cells(1)        
      val candidate_name        = cells(2)
      val sex        = cells(3)
      val age        = toInt(cells(4))
      val category        = cells(5)
      val partyname        = cells(6)
      val partysymbol        = cells(7)
      val general        = cells(8)
      val postal        = cells(9)
      val total        = toLong(cells(10))
      val pct_of_total_votes        = toDouble(cells(11))
      val pct_of_polled_votes        = toDouble(cells(12))
      val totalvoters        = toLong(cells(13))
      Election2014(state,constituency,candidate_name,sex,age,category,partyname,partysymbol,general,postal,total,
      pct_of_total_votes,pct_of_polled_votes,totalvoters)         
           
   })
election2014Obj: org.apache.spark.rdd.RDD[Election2014] = MapPartitionsRDD[66] at map at <console>:39

6. Check the data type of election2014Obj
scala> election2014Obj
res73: org.apache.spark.rdd.RDD[Election2014] = MapPartitionsRDD[66] at map at <console>:39

7. Convert election2014Obj into a DataFrame.
scala> val election2014DataFrame = election2014Obj.toDF
election2014DataFrame: org.apache.spark.sql.DataFrame = [state: string, constituency: string ... 12 more fields]

8. Display the content of the DataFrame.

scala> election2014DataFrame.select("partyname", "total").limit(3).show
+---------+------+
|partyname| total|
+---------+------+
|      TRS|430847|
|      INC|259557|
|      TDP|184198|
+---------+------+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS