Problem: Find the top 50 voted movies using Spark RDD, DataFrame and SQL
Problem:
1. Download data from below site.
https://datasets.imdbws.com/
2. Download the movies data title.ratings.tsv.gz and title.akas.tsv.gz
3. Find the top 50 voted movies
4. Storage details
Columns: titleId,title,region,language,averageRating,numVotes
Store the result at below location: /home/cloudera/workspace/movies/<Method>/<formatname>
Store the result in following format.
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
b. Sequence file.
Compression: BZip2Codec
c. JSON file.
Compression: BZip2Codec
d. Parquet.
Compression: uncompressed
e. ORC file.
f. Avro file.
Compression: uncompressed
Use following methods:
Method 1: Use RDD
Method 2: Use DF
Method 3: Use SQL query.
Pre work:
hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -put /home/cloudera/Downloads/movies/title.ratings.tsv.gz /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 1 items
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
or
val titleRating = sc.textFile("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)
[root@quickstart Downloads]# hadoop fs -put /home/cloudera/Downloads/movies/title.akas.tsv.gz /home/cloudera/workspace/movies
[root@quickstart Downloads]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 2 items
-rw-r--r-- 1 root supergroup 52301827 2018-02-18 15:32 /home/cloudera/workspace/movies/title.akas.tsv.gz
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
or
val title = sc.textFile("/home/cloudera/workspace/movies/title.akas.tsv.gz")
scala> title.take(2)
res22: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1 Carmencita - spanyol tánc HU \N imdbDisplay \N 0)
Solution:
-------------------------------------------------------------------------------------------------
Method 1: Using RDD
Output col: titleId,title,region,language,averageRating,numVotes
val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
val titleMap = title.filter(e=> e!=titlefirst).map(e=>{
val splitted = e.split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
(titleId, (title, region, language))
})
val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingMap = titleRating.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.split("\t")
val titlerid = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerid, (averageRating, numVotes))
})
val titlejoin = titleMap.join(titleRatingMap)
val titleSBK = titlejoin.map(e=>{
val titleId = e._1
val title = e._2._1._1
val region = e._2._1._2
val language = e._2._1._3
val averageRating = e._2._2._1
val numVotes = e._2._2._2
(numVotes, (titleId, title, region, language, averageRating, numVotes))
}).sortByKey(false).take(50)
val top50moviesedd = sc.parallelize(titleSBK)
Save the result in different format.
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
val top50ratedmoviestext = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})
top50ratedmoviestext.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
val top50ratedmoviesseq = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})
top50ratedmoviesseq.saveAsSequenceFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
val top50ratedmoviesDF = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId, title, region, language, averageRating, numVotes)
}).toDF("titleId", "title", "region", "language", "averageRating", "numVotes")
top50ratedmoviesDF.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
val sqlContext = new org.apache.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50ratedmoviesDF.write.parquet("/home/cloudera/workspace/movies/rdd/top50ratedmoviesparquet")
e. ORC file.
top50ratedmoviesDF.write.orc("/home/cloudera/workspace/movies/rdd/top50ratedmoviesorc")
f. Avro file.
Compression: uncompressed
import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50ratedmoviesDF.write.avro("/home/cloudera/workspace/movies/rdd/top50ratedmoviesavro")
-------------------------------------------------------------------------------------------------
Method 1: Using DF
Output col: titleId,title,region,language,averageRating,numVotes
val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
val titleDF = title.rdd.filter(e=> e!=titlefirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
(titleId, title, region, language)
}).toDF("titleId", "title", "region", "language")
val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingDF = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titlerid = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerid, averageRating, numVotes)
}).toDF("titlerid", "averageRating", "numVotes")
val topRatedMovies = titleDF.join(titleRatingDF, $"titleId" === $"titlerid").orderBy($"numVotes".desc).limit(50)
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
topRatedMovies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(5)
val numVotes = e.getInt(6)
(titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes)
}).saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
topRatedMovies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(5)
val numVotes = e.getInt(6)
(titleId, (titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes))
}).saveAsSequenceFile("/home/cloudera/workspace/movies/df/topRatedMoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
topRatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviesJSON", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
topRatedMovies.write.parquet("/home/cloudera/workspace/movies/df/topratedmoviesparquet")
e. ORC file.
topRatedMovies.write.orc("/home/cloudera/workspace/movies/df/topratedmoviesorc")
f. Avro file.
Compression: uncompressed
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
topRatedMovies.write.avro("/home/cloudera/workspace/movies/df/topratedmoviesavro")
-------------------------------------------------------------------------------------------------
Method 1: Using SQL
Output col: titleId,title,region,language,averageRating,numVotes
val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
titleId ordering title region language types attributes isOriginalTitle
val titleMap = title.rdd.filter(e=>e!=titlefirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titleId = splitted(0).trim
val ordering = splitted(1).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
val types = splitted(5).trim
val attributes = splitted(6).trim
val isOriginalTitle = splitted(7).trim
(titleId, ordering, title, region, language, types, attributes, isOriginalTitle)
})
val titleDF = titleMap.toDF("titleId", "ordering", "title", "region", "language", "types", "attributes", "isOriginalTitle")
titleDF.registerTempTable("title")
val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingMap = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titlerId = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerId, averageRating, numVotes)
})
val titleRatingMapDF = titleRatingMap.toDF("titlerId", "averageRating", "numVotes")
titleRatingMapDF.registerTempTable("titlerating")
val sqlContext = titleDF.sqlContext
val top50RatedMovies = sqlContext.sql("select titleId, title, region, language, averageRating, numVotes from title inner join titlerating on (titleId = titlerId) order by numVotes desc limit 50")
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
val top50RatedMoviesText = top50RatedMovies.rdd.map(e=>{
val titleId = e.getString(0).trim
val title = e.getString(1).trim
val region = e.getString(2).trim
val language = e.getString(3).trim
val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
(titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes)
})
top50RatedMoviesText.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
val top50RatedMoviesSeq = top50RatedMovies.rdd.map(e=>{
val titleId = e.getString(0).trim
val title = e.getString(1).trim
val region = e.getString(2).trim
val language = e.getString(3).trim
val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
(titleId, (titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes))
})
top50RatedMoviesSeq.saveAsSequenceFile("/home/cloudera/workspace/movies/sql/topratedmoviessequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
top50RatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50RatedMovies.write.parquet("/home/cloudera/workspace/movies/sql/topratedmoviesparquet")
e. ORC file.
top50RatedMovies.write.orc("/home/cloudera/workspace/movies/sql/topratedmoviesorc")
f. Avro file.
Compression: uncompressed
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50RatedMovies.write.avro("/home/cloudera/workspace/movies/sql/topratedmoviesavro")
-------------------------------------------------------------------------------------------------
1. Download data from below site.
https://datasets.imdbws.com/
2. Download the movies data title.ratings.tsv.gz and title.akas.tsv.gz
3. Find the top 50 voted movies
4. Storage details
Columns: titleId,title,region,language,averageRating,numVotes
Store the result at below location: /home/cloudera/workspace/movies/<Method>/<formatname>
Store the result in following format.
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
b. Sequence file.
Compression: BZip2Codec
c. JSON file.
Compression: BZip2Codec
d. Parquet.
Compression: uncompressed
e. ORC file.
f. Avro file.
Compression: uncompressed
Use following methods:
Method 1: Use RDD
Method 2: Use DF
Method 3: Use SQL query.
Pre work:
hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -put /home/cloudera/Downloads/movies/title.ratings.tsv.gz /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 1 items
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
or
val titleRating = sc.textFile("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)
[root@quickstart Downloads]# hadoop fs -put /home/cloudera/Downloads/movies/title.akas.tsv.gz /home/cloudera/workspace/movies
[root@quickstart Downloads]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 2 items
-rw-r--r-- 1 root supergroup 52301827 2018-02-18 15:32 /home/cloudera/workspace/movies/title.akas.tsv.gz
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
or
val title = sc.textFile("/home/cloudera/workspace/movies/title.akas.tsv.gz")
scala> title.take(2)
res22: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1 Carmencita - spanyol tánc HU \N imdbDisplay \N 0)
Solution:
-------------------------------------------------------------------------------------------------
Method 1: Using RDD
Output col: titleId,title,region,language,averageRating,numVotes
val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
val titleMap = title.filter(e=> e!=titlefirst).map(e=>{
val splitted = e.split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
(titleId, (title, region, language))
})
val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingMap = titleRating.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.split("\t")
val titlerid = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerid, (averageRating, numVotes))
})
val titlejoin = titleMap.join(titleRatingMap)
val titleSBK = titlejoin.map(e=>{
val titleId = e._1
val title = e._2._1._1
val region = e._2._1._2
val language = e._2._1._3
val averageRating = e._2._2._1
val numVotes = e._2._2._2
(numVotes, (titleId, title, region, language, averageRating, numVotes))
}).sortByKey(false).take(50)
val top50moviesedd = sc.parallelize(titleSBK)
Save the result in different format.
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
val top50ratedmoviestext = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})
top50ratedmoviestext.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
val top50ratedmoviesseq = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})
top50ratedmoviesseq.saveAsSequenceFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
val top50ratedmoviesDF = top50moviesedd.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId, title, region, language, averageRating, numVotes)
}).toDF("titleId", "title", "region", "language", "averageRating", "numVotes")
top50ratedmoviesDF.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/rdd/top50ratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
val sqlContext = new org.apache.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50ratedmoviesDF.write.parquet("/home/cloudera/workspace/movies/rdd/top50ratedmoviesparquet")
e. ORC file.
top50ratedmoviesDF.write.orc("/home/cloudera/workspace/movies/rdd/top50ratedmoviesorc")
f. Avro file.
Compression: uncompressed
import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50ratedmoviesDF.write.avro("/home/cloudera/workspace/movies/rdd/top50ratedmoviesavro")
-------------------------------------------------------------------------------------------------
Method 1: Using DF
Output col: titleId,title,region,language,averageRating,numVotes
val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
val titleDF = title.rdd.filter(e=> e!=titlefirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
(titleId, title, region, language)
}).toDF("titleId", "title", "region", "language")
val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingDF = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titlerid = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerid, averageRating, numVotes)
}).toDF("titlerid", "averageRating", "numVotes")
val topRatedMovies = titleDF.join(titleRatingDF, $"titleId" === $"titlerid").orderBy($"numVotes".desc).limit(50)
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
topRatedMovies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(5)
val numVotes = e.getInt(6)
(titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes)
}).saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
topRatedMovies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(5)
val numVotes = e.getInt(6)
(titleId, (titleId + "," + title+ "," +region + "," + language + "," + averageRating + "," + numVotes))
}).saveAsSequenceFile("/home/cloudera/workspace/movies/df/topRatedMoviesseq", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
topRatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/df/topRatedMoviesJSON", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
topRatedMovies.write.parquet("/home/cloudera/workspace/movies/df/topratedmoviesparquet")
e. ORC file.
topRatedMovies.write.orc("/home/cloudera/workspace/movies/df/topratedmoviesorc")
f. Avro file.
Compression: uncompressed
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
topRatedMovies.write.avro("/home/cloudera/workspace/movies/df/topratedmoviesavro")
-------------------------------------------------------------------------------------------------
Method 1: Using SQL
Output col: titleId,title,region,language,averageRating,numVotes
val sqlContext = new org.apache.sql.SQLContext(sc)
val title = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
val titlefirst = title.first
titleId ordering title region language types attributes isOriginalTitle
val titleMap = title.rdd.filter(e=>e!=titlefirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titleId = splitted(0).trim
val ordering = splitted(1).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
val types = splitted(5).trim
val attributes = splitted(6).trim
val isOriginalTitle = splitted(7).trim
(titleId, ordering, title, region, language, types, attributes, isOriginalTitle)
})
val titleDF = titleMap.toDF("titleId", "ordering", "title", "region", "language", "types", "attributes", "isOriginalTitle")
titleDF.registerTempTable("title")
val titleRating = sqlContext.read.text("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
val titleRatingfirst = titleRating.first
val titleRatingMap = titleRating.rdd.filter(e=>e!=titleRatingfirst).map(e=>{
val splitted = e.getString(0).split("\t")
val titlerId = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titlerId, averageRating, numVotes)
})
val titleRatingMapDF = titleRatingMap.toDF("titlerId", "averageRating", "numVotes")
titleRatingMapDF.registerTempTable("titlerating")
val sqlContext = titleDF.sqlContext
val top50RatedMovies = sqlContext.sql("select titleId, title, region, language, averageRating, numVotes from title inner join titlerating on (titleId = titlerId) order by numVotes desc limit 50")
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec
val top50RatedMoviesText = top50RatedMovies.rdd.map(e=>{
val titleId = e.getString(0).trim
val title = e.getString(1).trim
val region = e.getString(2).trim
val language = e.getString(3).trim
val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
(titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes)
})
top50RatedMoviesText.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviestext", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: BZip2Codec
val top50RatedMoviesSeq = top50RatedMovies.rdd.map(e=>{
val titleId = e.getString(0).trim
val title = e.getString(1).trim
val region = e.getString(2).trim
val language = e.getString(3).trim
val averageRating = scala.util.Try(e.getString(4).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(e.getString(5).trim.toInt) getOrElse(0.0f)
(titleId, (titleId + "," + title + "," + region + "," + language + "," + averageRating + "," + numVotes))
})
top50RatedMoviesSeq.saveAsSequenceFile("/home/cloudera/workspace/movies/sql/topratedmoviessequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: BZip2Codec
top50RatedMovies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/sql/topratedmoviesjson", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top50RatedMovies.write.parquet("/home/cloudera/workspace/movies/sql/topratedmoviesparquet")
e. ORC file.
top50RatedMovies.write.orc("/home/cloudera/workspace/movies/sql/topratedmoviesorc")
f. Avro file.
Compression: uncompressed
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top50RatedMovies.write.avro("/home/cloudera/workspace/movies/sql/topratedmoviesavro")
-------------------------------------------------------------------------------------------------
Comments
Post a Comment