Different ways of creating DataFrame in Spark using Scala


Create DataFrame using existing RDD.

Method 1: Create  DataFrame using sqlContext [SparkSession.createDataFrame(RDD obj)].

1. Create DataFrame using List RDD
scala> val mapRDD = sc.parallelize(List((1, "value1"),(2, "value2"),(3, "value3")))
mapRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:24

Check the data type of RDD.
scala> mapRDD
res16: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:24

2. Create sqlContext using spark context.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@35a792


3. Create DataFrame using sqlContext.

scala> val rddToDF = sqlContext.createDataFrame(mapRDD)
rddToDF: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

4. We can view the content of the DataFrame using show function.
scala> rddToDF.show
+---+------+
| _1|    _2|
+---+------+
|  1|value1|
|  2|value2|
|  3|value3|
+---+------+

5. We can select only column using select function. Note by default the column are named as _Numbwe.
scala> rddToDF.select("_1").show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
+---+

6. One can filter the Dataframe using filter function.
scala> rddToDF.filter("_1>1").show
+---+------+
| _1|    _2|
+---+------+
|  2|value2|
|  3|value3|
+---+------+

Or we can use where function.

scala> rddToDF.select("_1", "_2").where("_1>1").show
+---+------+
| _1|    _2|
+---+------+
|  2|value2|
|  3|value3|
+---+------+

scala> rddToDF.where("_1>1").show
+---+------+
| _1|    _2|
+---+------+
|  2|value2|
|  3|value3|
+---+------+

Method 2: Using SparkSession.createDataFrame(RDD obj) and specifying column names.

1. Create DataFrame using List RDD
scala> val mapRDD = sc.parallelize(List((1, "value1"),(2, "value2"),(3, "value3")))
mapRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:24

Check the data type of RDD.
scala> mapRDD
res16: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:24

2. Create sqlContext using spark context.
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@35a792


3. Create DataFrame using sqlContext.
scala> val rddToDF = sqlContext.createDataFrame(mapRDD)
rddToDF: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

4. Create Dataframe using sqlContext and by specifying the column names. If column are not specified the
column are named as _Number.
scala> val rddToDF = sqlContext.createDataFrame(mapRDD).toDF("KEY","VALUE")
rddToDF: org.apache.spark.sql.DataFrame = [KEY: int, VALUE: string]

5. Once we get DF we can call any of the Dataframe method to get the Dataframe column and values.
scala> rddToDF.show()
17/12/31 00:10:16 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
+---+------+
|KEY| VALUE|
+---+------+
|  1|value1|
|  2|value2|
|  3|value3|
+---+------+

scala> rddToDF.where("KEY=1").show
+---+------+
|KEY| VALUE|
+---+------+
|  1|value1|
+---+------+

Method 3: Use sqlContext to create Dataframe using rowRDD and schema.

Method Signature
def createDataFrame(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],
schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.DataFrame

1. Import necessary packages
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.IntegerType

scala> import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StringType

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructField

2. Create List of Row.

scala> val rowRDD = List(Row(1, "value1"), Row(2, "value2"), Row(3, "value3"))
rowRDD: List[org.apache.spark.sql.Row] = List([1,value1], [2,value2], [3,value3])

3. parallelize the List.
scala> val rowRDDParallelized =sc.parallelize(rowRDD)
rowRDDParallelized: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[9] at parallelize at <console>:31

scala> rowRDDParallelized
res5: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[9] at parallelize at <console>:31

4. Create schema for the data added in the List.
scala> val schema = StructType( StructField("key", IntegerType, true) ::StructField("value", StringType, false) :: Nil)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(key,IntegerType,true), StructField(value,StringType,false))

5. Create Dataframe using SQLContext.
scala> val rowRDDDataFrame = sqlContext.createDataFrame(sc.parallelize(rowRDD), schema)
rowRDDDataFrame: org.apache.spark.sql.DataFrame = [key: int, value: string]

scala> rowRDDDataFrame.show
+---+------+
|key| value|
+---+------+
|  1|value1|
|  2|value2|
|  3|value3|
+---+------+

Method 4: Using sqlContext to read the text file.

1. Create a text file which needs to be read.
[root@localhost spark]# pwd
/home/hadoop/workspace/spark
[root@localhost spark]# vi keyvalue.txt

2. Add below content in the text file.
1,value1
2,value2
3,value3

3. Create new sqlContext using spark Context.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@167312d

4. Create DataFrame using the text file.
scala> val keyvalue = sqlContext.read.textFile("file:///home/hadoop/workspace/spark/keyvalue.txt")
keyvalue: org.apache.spark.sql.DataFrame = [value: string]

scala> keyvalue.collect.foreach(println)
[1,value1]
[2,value2]
[3,value3]

5. Check the rows of Dataframe created using text file.
scala> keyvalue.show
+--------+
|   value|
+--------+
|1,value1|
|2,value2|
|3,value3|
+--------+


6. Create a case class, Which will hold the data types of the input data. In the above example there are
two column with key as Int and value as String.

scala> case class KeyValueClass(key:Int, value:String)
defined class KeyValue

7. Since the value in keyvalue Dataframe is a string, We need to split/map the data.

scala> val keyvalueJavaRDD = keyvalue.map(row =>{
     | KeyValueClass(row.toString.split(",")(0).toInt,row.toString.split(",")(1).toString)})
keyvalueJavaRDD: org.apache.spark.sql.Dataset[KeyValueClass] = [key: int, value: string]

scala> keyvalueJavaRDD.show
+---+------+
|key| value|
+---+------+
|  1|value1|
|  2|value2|
|  3|value3|
+---+------+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS