Problem: Find the top 50 voted movies using Spark RDD, DataFrame and SQL

Problem:
1. Download data from below site.
https://datasets.imdbws.com/
2. Download the movies data title.ratings.tsv.gz and title.akas.tsv.gz
3. Find the top 50 voted movies
4. Storage details
Columns: titleId,title,region,language,averageRating,numVotes
Store the result at below location: /home/cloudera/workspace/movies/<Method>/<formatname>
Store the result in following format.

a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
b. Sequence file.
Compression: BZip2Codec
c. JSON file.
Compression: BZip2Codec
d. Parquet.
Compression:  uncompressed
e. ORC file.
f. Avro file.
Compression:  uncompressed

Use following methods:
Method 1: Use RDD
Method 2: Use DF
Method 3: Use SQL query.

Pre work:

hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -put /home/cloudera/Downloads/movies/title.ratings.tsv.gz /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 1 items
-rw-r--r--   1 root supergroup    3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz

val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
or
val titleRating = sc.textFile("/home/cloudera/workspace/movies/title.ratings.tsv.gz")

scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)


[root@quickstart Downloads]# hadoop fs -put /home/cloudera/Downloads/movies/title.akas.tsv.gz /home/cloudera/workspace/movies
[root@quickstart Downloads]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 2 items
-rw-r--r--   1 root supergroup   52301827 2018-02-18 15:32 /home/cloudera/workspace/movies/title.akas.tsv.gz
-rw-r--r--   1 root supergroup    3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz

val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
or
val title = sc.textFile("/home/cloudera/workspace/movies/title.akas.tsv.gz")

scala> title.take(2)
res22: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1 Carmencita - spanyol tánc HU \N imdbDisplay \N 0)


Solution:
-------------------------------------------------------------------------------------------------
Method 1: Using RDD
Output col: titleId,title,region,language,averageRating,numVotes

val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")

val titlefirst = title.first

val titleMap = title.filter(e=> e!=titlefirst).map(e=>{
   val splitted = e.split("\t")
   val titleId = splitted(0).trim
   val title = splitted(2).trim
   val region = splitted(3).trim
   val language = splitted(4).trim
   (titleId, (title, region, language))
})

val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")

val titleRatingfirst = titleRating.first

val titleRatingMap = titleRating.filter(e=>e!=titleRatingfirst).map(e=>{
   val splitted = e.split("\t")
   val titlerid = splitted(0).trim
   val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
   val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
   (titlerid, (averageRating, numVotes))
})

val titlejoin = titleMap.join(titleRatingMap)

val titleSBK = titlejoin.map(e=>{
    val titleId = e._1
    val title = e._2._1._1
    val region = e._2._1._2
    val language = e._2._1._3
    val averageRating = e._2._2._1
    val numVotes = e._2._2._2
   (numVotes, (titleId, title, region, language, averageRating, numVotes))
}).sortByKey(false).take(50)

val top50moviesedd = sc.parallelize(titleSBK)

Save the result in different format.

a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec

val top50ratedmoviestext = top50moviesedd.map(e=>{
    val titleId = e._2._1
    val title = e._2._2
    val region = e._2._3
    val language = e._2._4
    val averageRating = e._2._5
    val numVotes = e._2._6
    (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})

top50ratedmoviestext.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])


b. Sequence file.
Compression: BZip2Codec

val top50ratedmoviesseq = top50moviesedd.map(e=>{
    val titleId = e._2._1
    val title = e._2._2
    val region = e._2._3
    val language = e._2._4
    val averageRating = e._2._5
    val numVotes = e._2._6
    (titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})

top50ratedmoviesseq.saveAsSequenceFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))


c. JSON file.
Compression: BZip2Codec

val top50ratedmoviesDF = top50moviesedd.map(e=>{
    val titleId = e._2._1
    val title = e._2._2
    val region = e._2._3
    val language = e._2._4
    val averageRating = e._2._5
    val numVotes = e._2._6
    (titleId, title, region, language, averageRating, numVotes)
}).toDF("titleId", "title", "region", "language", "averageRating", "numVotes")

top50ratedmoviesDF.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression:  uncompressed

val sqlContext = new org.apache.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50ratedmoviesDF.write.parquet("/home/cloudera/workspace/movies/rdd/top50ratedmoviesparquet")

e. ORC file.

top50ratedmoviesDF.write.orc("/home/cloudera/workspace/movies/rdd/top50ratedmoviesorc")

f. Avro file.
Compression:  uncompressed

import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50ratedmoviesDF.write.avro("/home/cloudera/workspace/movies/rdd/top50ratedmoviesavro")
-------------------------------------------------------------------------------------------------
Method 1: Using DF
Output col: titleId,title,region,language,averageRating,numVotes

val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")

val titlefirst = title.first

val titleDF = title.rdd.filter(e=> e!=titlefirst).map(e=>{
   val splitted = e.getString(0).split("\t")
   val titleId = splitted(0).trim
   val title = splitted(2).trim
   val region = splitted(3).trim
   val language = splitted(4).trim
   (titleId, title, region, language)
}).toDF("titleId", "title", "region", "language")

val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")

val titleRatingfirst = titleRating.first

val titleRatingDF = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
   val splitted = e.getString(0).split("\t")
   val titlerid = splitted(0).trim
   val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
   val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
   (titlerid, averageRating, numVotes)
}).toDF("titlerid", "averageRating", "numVotes")

val topRatedMovies = titleDF.join(titleRatingDF, $"titleId" === $"titlerid").orderBy($"numVotes".desc).limit(50)

a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec

topRatedMovies.rdd.map(e=>{
    val titleId = e.getString(0)
    val title = e.getString(1)
    val region = e.getString(2)
    val language = e.getString(3)
    val averageRating = e.getFloat(5)
    val numVotes = e.getInt(6)
    (titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes)
}).saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])

b. Sequence file.
Compression: BZip2Codec

topRatedMovies.rdd.map(e=>{
    val titleId = e.getString(0)
    val title = e.getString(1)
    val region = e.getString(2)
    val language = e.getString(3)
    val averageRating = e.getFloat(5)
    val numVotes = e.getInt(6)
    (titleId, (titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes))
}).saveAsSequenceFile("/home/cloudera/workspace/movies/df/topRatedMoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))

c. JSON file.
Compression: BZip2Codec

topRatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviesJSON", classOf[org.apache.hadoop.io.compress.BZip2Codec])


d. Parquet.
Compression:  uncompressed

sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
topRatedMovies.write.parquet("/home/cloudera/workspace/movies/df/topratedmoviesparquet")

e. ORC file.

topRatedMovies.write.orc("/home/cloudera/workspace/movies/df/topratedmoviesorc")

f. Avro file.
Compression:  uncompressed

sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
topRatedMovies.write.avro("/home/cloudera/workspace/movies/df/topratedmoviesavro")

-------------------------------------------------------------------------------------------------
Method 1: Using SQL
Output col: titleId,title,region,language,averageRating,numVotes

val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")

val titlefirst = title.first

titleId ordering title region language types attributes isOriginalTitle

val titleMap = title.rdd.filter(e=>e!=titlefirst).map(e=>{
    val splitted = e.getString(0).split("\t")
    val titleId = splitted(0).trim
    val ordering = splitted(1).trim
    val title = splitted(2).trim
    val region = splitted(3).trim
    val language = splitted(4).trim
    val types = splitted(5).trim
    val attributes = splitted(6).trim
    val isOriginalTitle = splitted(7).trim
    (titleId, ordering, title, region, language, types, attributes, isOriginalTitle)
})

val titleDF = titleMap.toDF("titleId", "ordering", "title", "region", "language", "types", "attributes", "isOriginalTitle")

titleDF.registerTempTable("title")

val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")

val titleRatingfirst = titleRating.first

val titleRatingMap = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
    val splitted = e.getString(0).split("\t")
    val titlerId = splitted(0).trim
    val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
    val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
    (titlerId, averageRating, numVotes)
})

val titleRatingMapDF = titleRatingMap.toDF("titlerId", "averageRating", "numVotes")

titleRatingMapDF.registerTempTable("titlerating")

val sqlContext = titleDF.sqlContext

val top50RatedMovies = sqlContext.sql("select titleId, title, region, language, averageRating, numVotes from title inner join titlerating on (titleId = titlerId) order by numVotes desc limit 50")

a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec

val top50RatedMoviesText = top50RatedMovies.rdd.map(e=>{
    val titleId = e.getString(0).trim
    val title = e.getString(1).trim
    val region = e.getString(2).trim
    val language = e.getString(3).trim
    val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
    val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
    (titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes)
})

top50RatedMoviesText.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])

b. Sequence file.
Compression: BZip2Codec

val top50RatedMoviesSeq = top50RatedMovies.rdd.map(e=>{
    val titleId = e.getString(0).trim
    val title = e.getString(1).trim
    val region = e.getString(2).trim
    val language = e.getString(3).trim
    val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
    val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
    (titleId, (titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes))
})

top50RatedMoviesSeq.saveAsSequenceFile("/home/cloudera/workspace/movies/sql/topratedmoviessequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))

c. JSON file.
Compression: BZip2Codec

top50RatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])

d. Parquet.
Compression:  uncompressed

sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50RatedMovies.write.parquet("/home/cloudera/workspace/movies/sql/topratedmoviesparquet")

e. ORC file.

top50RatedMovies.write.orc("/home/cloudera/workspace/movies/sql/topratedmoviesorc")

f. Avro file.
Compression:  uncompressed

sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50RatedMovies.write.avro("/home/cloudera/workspace/movies/sql/topratedmoviesavro")
-------------------------------------------------------------------------------------------------


Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS