Problems on Apache spark rdd, DataFrame, SQL query using SQLContext with solution
Problem:
1. Download data from below site.
https://datasets.imdbws.com/
2. Download the movies data title.ratings.tsv.gz and title.akas.tsv.gz
3. Find the top 50 rated movies with more than 100000 votes
4. Storage details
Columns: titleId,title,region,language,averageRating,numVotes
Store the result at below location: /home/cloudera/workspace/movies/<Method>/<formatname>
Store the result in following format.
a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec
b. Sequence file.
Compression: Bzip2cOdec
c. JSON file.
Compression: Bzip2cOdec
d. Parquet.
Compression: uncompressed
e. ORC file.
f. Avro file.
Compression: uncompressed
Use following methods:
Method 1: Use RDD
Method 2: Use DF
Method 3: Use SQL query.
Pre work:
hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -put /home/cloudera/Downloads/movies/title.ratings.tsv.gz /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 1 items
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
or
val titleRating = sc.textFile("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)
[root@quickstart Downloads]# hadoop fs -put /home/cloudera/Downloads/movies/title.akas.tsv.gz /home/cloudera/workspace/movies
[root@quickstart Downloads]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 2 items
-rw-r--r-- 1 root supergroup 52301827 2018-02-18 15:32 /home/cloudera/workspace/movies/title.akas.tsv.gz
-rw-r--r-- 1 root supergroup 3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz
val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
or
val title = sc.textFile("/home/cloudera/workspace/movies/title.akas.tsv.gz")
scala> title.take(2)
res22: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1 Carmencita - spanyol tánc HU \N imdbDisplay \N 0)
Solution:
-----------------------------------------------------------------------------------------
Method 1: Using RDD
scala> titleRating.take(2)
res25: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)
scala> title.take(2)
res26: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1
Carmencita - spanyol tánc HU \N imdbDisplay \N 0)
Output columns: titleId,title,region,language,averageRating,numVotes
val titlefirst = title.first
val titleMap = title.filter(e => e!=titlefirst).map(e => {
val splitted = e.split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
(titleId, (title, region, language))
})
scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)
val titleRatingFirst = titleRating.first
val titleRatingMap = titleRating.filter(e=>e!=titleRatingFirst).map(e=>{
val splitted = e.split("\t")
val titleId = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
(titleId, (averageRating, numVotes))
})
val highVotedMovies = titleRatingMap.filter(e=>{
e._2._2>=100000
})
val titleRatingMapJoin = highVotedMovies.join(titleMap).distinct
scala> titleRatingMapJoin.lookup("tt0000001")
res32: Seq[((String, String), (String, String, String))] = ArrayBuffer(((5.8,1350),(Carmencita - spanyol tánc,HU,\N)), ((5.8,1350),(Карменсита,RU,\N)), ((5.8,1350),(Carmencita,US,\N)), ((5.8,1350),(Carmencita,\N,\N)))
scala> titleRatingMapJoin.take(2)
res33: Array[(String, ((String, String), (String, String, String)))] = Array((tt0000001,((5.8,1350),(Carmencita - spanyol tánc,HU,\N))), (tt0000001,((5.8,1350),(Карменсита,RU,\N))))
val titleRatingSorted = titleRatingMapJoin.map(e=> {
val averageRating = e._2._1._1
val titleId = e._1
val title = e._2._2._1
val region = e._2._2._2
val language = e._2._2._3
val numVotes = e._2._1._2
(averageRating, (titleId,title,region,language,averageRating,numVotes))
}).sortByKey(false)
val top5MoviesAvgRating = titleRatingSorted.take(5)
val top5MovieMap = sc.parallelize(top5MoviesAvgRating)
scala> val top5MovieMap = sc.parallelize(top5MoviesAvgRating)
top5MovieMap: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, String))] = ParallelCollectionRDD[86] at parallelize at <console>:45
Saving the data in different file formats.
a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec
val tabbedMovies = top5MovieMap.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes
})
tabbedMovies.saveAsTextFile("/home/cloudera/workspace/movies/rdd/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: Bzip2cOdec
val tabbedMoviesSeq = top5MovieMap.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})
tabbedMoviesSeq.saveAsSequenceFile("/home/cloudera/workspace/movies/rdd/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: Bzip2cOdec
val tabbedMoviesJson = top5MovieMap.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId , title , region , language , averageRating , numVotes)
}).toDF("titleId" , "title" , "region" , "language" , "averageRating" , "numVotes")
tabbedMoviesJson.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/rdd/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
val tabbedMoviesDF = top5MovieMap.map(e=>{
val titleId = e._2._1
val title = e._2._2
val region = e._2._3
val language = e._2._4
val averageRating = e._2._5
val numVotes = e._2._6
(titleId , title , region , language , averageRating , numVotes)
}).toDF("titleId" , "title" , "region" , "language" , "averageRating" , "numVotes")
tabbedMoviesDF.write.parquet("/home/cloudera/workspace/movies/rdd/parquet")
e. ORC file.
tabbedMoviesDF.write.orc("/home/cloudera/workspace/movies/rdd/orc")
f. Avro file.
Compression: uncompressed
import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
tabbedMoviesDF.write.avro("/home/cloudera/workspace/movies/rdd/avro")
-----------------------------------------------------------------------------------------
Method 2: Using RDD
File location:
/home/cloudera/workspace/movies/title.ratings.tsv.gz
/home/cloudera/workspace/movies/title.akas.tsv.gz
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val title = sqlContext.read.text("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> title.limit(5).show
+--------------------+
| value|
+--------------------+
|tconst averageRat...|
| tt0000001 5.8 1350|
| tt0000002 6.5 157|
| tt0000003 6.6 933|
| tt0000004 6.4 93|
+--------------------+
val titlerdd = title.rdd
case class Title(titleId:String, averageRating:Float, numVotes:Int)
val titlefirst = titlerdd.first
val titleMapped = titlerdd.filter(e=> e!=titlefirst).map(e=> {
val rowStr = e.getString(0)
val splitted = rowStr.split("\t")
val titleId = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
Title(titleId, averageRating, numVotes)
})
val titleMappedDF = titleMapped.toDF
scala> titleMappedDF.limit(2).show
+---------+-------------+--------+
| titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001| 5.8| 1350|
|tt0000002| 6.5| 157|
+---------+-------------+--------+
val titledetails = sqlContext.read.text("/home/cloudera/workspace/movies/title.akas.tsv.gz")
val titledetailsrdd = titledetails.rdd
val titledetailsrddfirst = titledetailsrdd.first
case class TitleDetail(titleId:String, title:String, region:String, language:String)
val titledetailsMap = titledetailsrdd.filter(e=>e!=titledetailsrddfirst).map(e=>{
val row = e.getString(0)
val splitted = row.split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
TitleDetail(titleId, title, region, language)
})
val titledetailsDF = titledetailsMap.toDF
scala> titledetailsDF.limit(5).show
+---------+--------------------+------+--------+
| titleId| title|region|language|
+---------+--------------------+------+--------+
|tt0000001|Carmencita - span...| HU| \N|
|tt0000001| Карменсита| RU| \N|
|tt0000001| Carmencita| US| \N|
|tt0000001| Carmencita| \N| \N|
|tt0000002|Le clown et ses c...| \N| \N|
scala> titleMappedDF.join(titledetailsDF, titleMappedDF("titleId") === titledetailsDF("titleId"), "inner")
res99: org.apache.spark.sql.DataFrame = [titleId: string, averageRating: float, numVotes: int, titleId: string, title: string, region: string, language: string]
or
val movies = titleMappedDF.join(titledetailsDF, titleMappedDF("titleId").equalTo(titledetailsDF("titleId")), "inner").select(titleMappedDF("titleId"), titledetailsDF("title"),titledetailsDF("region"),titledetailsDF("language"),titleMappedDF("averageRating") , titleMappedDF("numVotes"))
scala> movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy($"averageRating".desc).limit(5).show
+---------+--------------------+------+--------+-------------+--------+
| titleId| title|region|language|averageRating|numVotes|
+---------+--------------------+------+--------+-------------+--------+
|tt4283088|Battle of the Bas...| US| \N| 9.9| 152823|
|tt4283094| The Winds of Winter| US| \N| 9.9| 105085|
|tt0944947| Igra prestola| RS| \N| 9.5| 1292492|
|tt0944947| Game of Thrones| \N| \N| 9.5| 1292492|
|tt0944947|Samefo Karis Tama...| GE| \N| 9.5| 1292492|
+---------+--------------------+------+--------+-------------+--------+
movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(movies("averageRating").desc).limit(5).show
movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(col("averageRating").desc).limit(5).show
movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(desc("averageRating")).limit(5).show
val top5movies = movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy($"averageRating".desc).limit(5)
a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec
val moviestext = top5movies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(4)
val numVotes = e.getInt(5)
(titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})
moviestext.saveAsTextFile("/home/cloudera/workspace/movies/df/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: Bzip2cOdec
val moviesSequence = top5movies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(4)
val numVotes = e.getInt(5)
(titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})
moviesSequence.saveAsSequenceFile("/home/cloudera/workspace/movies/df/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: Bzip2cOdec
top5movies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/df/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top5movies.write.parquet("/home/cloudera/workspace/movies/df/parquet")
e. ORC file.
top5movies.write.orc("/home/cloudera/workspace/movies/df/orc")
f. Avro file.
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top5movies.write.avro("/home/cloudera/workspace/movies/df/avro")
-----------------------------------------------------------------------------------------
Method 3: Use SQL query.
File location:
/home/cloudera/workspace/movies/title.ratings.tsv.gz
/home/cloudera/workspace/movies/title.akas.tsv.gz
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val title = sqlContext.read.text("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> title.limit(5).show
+--------------------+
| value|
+--------------------+
|tconst averageRat...|
| tt0000001 5.8 1350|
| tt0000002 6.5 157|
| tt0000003 6.6 933|
| tt0000004 6.4 93|
+--------------------+
val titlerdd = title.rdd
case class Title(titleId:String, averageRating:Float, numVotes:Int)
val titlefirst = titlerdd.first
val titleMapped = titlerdd.filter(e=> e!=titlefirst).map(e=> {
val rowStr = e.getString(0)
val splitted = rowStr.split("\t")
val titleId = splitted(0).trim
val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
Title(titleId, averageRating, numVotes)
})
val titleMappedDF = titleMapped.toDF
scala> titleMappedDF.limit(2).show
+---------+-------------+--------+
| titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001| 5.8| 1350|
|tt0000002| 6.5| 157|
+---------+-------------+--------+
val titledetails = sqlContext.read.text("/home/cloudera/workspace/movies/title.akas.tsv.gz")
val titledetailsrdd = titledetails.rdd
val titledetailsrddfirst = titledetailsrdd.first
case class TitleDetail(titleId:String, title:String, region:String, language:String)
val titledetailsMap = titledetailsrdd.filter(e=>e!=titledetailsrddfirst).map(e=>{
val row = e.getString(0)
val splitted = row.split("\t")
val titleId = splitted(0).trim
val title = splitted(2).trim
val region = splitted(3).trim
val language = splitted(4).trim
TitleDetail(titleId, title, region, language)
})
val titledetailsDF = titledetailsMap.toDF
titleMappedDF.registerTempTable("title")
val sqlContext = new org.apache.sql.SQLContext(sc)
sqlContext.sql("select * from title limit 2").show
Using above methos throws org.apache.spark.sql.AnalysisException: Table not found: title;
As a work around use below method.
val sqlContext = titleMappedDF.sqlContext
scala> val sqlContext = titleMappedDF.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38f221e1
scala> sqlContext.sql("select * from title limit 2").show
+---------+-------------+--------+
| titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001| 5.8| 1350|
|tt0000002| 6.5| 157|
+---------+-------------+--------+
Similarly register other table.
titledetailsDF.registerTempTable("titledetail")
scala> titledetailsDF.registerTempTable("titledetail")
scala> sqlContext.sql("select * from titledetail limit 2").show
+---------+--------------------+------+--------+
| titleId| title|region|language|
+---------+--------------------+------+--------+
|tt0000001|Carmencita - span...| HU| \N|
|tt0000001| Карменсита| RU| \N|
+---------+--------------------+------+--------+
val top5movies = sqlContext.sql("select td.titleId, title, region, language, averageRating, numVotes from titledetail td inner join title t on (t.titleId=td.titleId) where numVotes>100000 order by averageRating desc limit 5")
scala> top5movies.show
+---------+--------------------+------+--------+-------------+--------+
| titleId| title|region|language|averageRating|numVotes|
+---------+--------------------+------+--------+-------------+--------+
|tt4283088|Battle of the Bas...| US| \N| 9.9| 152823|
|tt4283094| The Winds of Winter| US| \N| 9.9| 105085|
|tt0944947| Igra prestola| RS| \N| 9.5| 1292492|
|tt0944947| Game of Thrones| \N| \N| 9.5| 1292492|
|tt0944947|Samefo Karis Tama...| GE| \N| 9.5| 1292492|
+---------+--------------------+------+--------+-------------+--------+
a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec
val moviestext = top5movies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(4)
val numVotes = e.getInt(5)
(titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})
moviestext.saveAsTextFile("/home/cloudera/workspace/movies/sqlquery/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: Bzip2cOdec
val moviesSequence = top5movies.rdd.map(e=>{
val titleId = e.getString(0)
val title = e.getString(1)
val region = e.getString(2)
val language = e.getString(3)
val averageRating = e.getFloat(4)
val numVotes = e.getInt(5)
(titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})
moviesSequence.saveAsSequenceFile("/home/cloudera/workspace/movies/sqlquery/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
c. JSON file.
Compression: Bzip2cOdec
top5movies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/sqlquery/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])
d. Parquet.
Compression: uncompressed
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top5movies.write.parquet("/home/cloudera/workspace/movies/sqlquery/parquet")
e. ORC file.
top5movies.write.orc("/home/cloudera/workspace/movies/sqlquery/orc")
f. Avro file.
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top5movies.write.avro("/home/cloudera/workspace/movies/sqlquery/avro")
-----------------------------------------------------------------------------------------
Comments
Post a Comment