Different ways of creating DataFrame in Spark using Scala
Create DataFrame using
existing RDD.
Method 1: Create
DataFrame using sqlContext [SparkSession.createDataFrame(RDD obj)].
1. Create DataFrame
using List RDD
scala> val mapRDD
= sc.parallelize(List((1, "value1"),(2, "value2"),(3,
"value3")))
mapRDD:
org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at
parallelize at <console>:24
Check the data type
of RDD.
scala> mapRDD
res16:
org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at
parallelize at <console>:24
2. Create sqlContext
using spark context.
scala> val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was
one deprecation warning; re-run with -deprecation for details
sqlContext:
org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@35a792
3. Create DataFrame
using sqlContext.
scala> val
rddToDF = sqlContext.createDataFrame(mapRDD)
rddToDF:
org.apache.spark.sql.DataFrame = [_1: int, _2: string]
4. We can view the
content of the DataFrame using show function.
scala>
rddToDF.show
+---+------+
| _1| _2|
+---+------+
| 1|value1|
| 2|value2|
| 3|value3|
+---+------+
5. We can select
only column using select function. Note by default the column are named as
_Numbwe.
scala>
rddToDF.select("_1").show
+---+
| _1|
+---+
| 1|
| 2|
| 3|
+---+
6. One can filter
the Dataframe using filter function.
scala>
rddToDF.filter("_1>1").show
+---+------+
| _1| _2|
+---+------+
| 2|value2|
| 3|value3|
+---+------+
Or we can use where
function.
scala>
rddToDF.select("_1", "_2").where("_1>1").show
+---+------+
| _1| _2|
+---+------+
| 2|value2|
| 3|value3|
+---+------+
scala>
rddToDF.where("_1>1").show
+---+------+
| _1| _2|
+---+------+
| 2|value2|
| 3|value3|
+---+------+
Method 2: Using SparkSession.createDataFrame(RDD obj)
and specifying column names.
1. Create DataFrame
using List RDD
scala> val mapRDD
= sc.parallelize(List((1, "value1"),(2, "value2"),(3,
"value3")))
mapRDD:
org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at
parallelize at <console>:24
Check the data type
of RDD.
scala> mapRDD
res16:
org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at
parallelize at <console>:24
2. Create sqlContext
using spark context.
scala> val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was
one deprecation warning; re-run with -deprecation for details
sqlContext:
org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@35a792
3. Create DataFrame
using sqlContext.
scala> val
rddToDF = sqlContext.createDataFrame(mapRDD)
rddToDF:
org.apache.spark.sql.DataFrame = [_1: int, _2: string]
4. Create Dataframe
using sqlContext and by specifying the column names. If column are not
specified the
column are named as
_Number.
scala> val
rddToDF =
sqlContext.createDataFrame(mapRDD).toDF("KEY","VALUE")
rddToDF:
org.apache.spark.sql.DataFrame = [KEY: int, VALUE: string]
5. Once we get DF we
can call any of the Dataframe method to get the Dataframe column and values.
scala>
rddToDF.show()
17/12/31 00:10:16
WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set;
assuming yes
+---+------+
|KEY| VALUE|
+---+------+
| 1|value1|
| 2|value2|
| 3|value3|
+---+------+
scala>
rddToDF.where("KEY=1").show
+---+------+
|KEY| VALUE|
+---+------+
| 1|value1|
+---+------+
Method 3: Use sqlContext to create Dataframe using
rowRDD and schema.
Method Signature
def
createDataFrame(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],
schema:
org.apache.spark.sql.types.StructType): org.apache.spark.sql.DataFrame
1. Import necessary
packages
scala> import
org.apache.spark.sql.Row
import
org.apache.spark.sql.Row
scala> import
org.apache.spark.sql.types.IntegerType
import
org.apache.spark.sql.types.IntegerType
scala> import
org.apache.spark.sql.types.StringType
import
org.apache.spark.sql.types.StringType
scala> import
org.apache.spark.sql.types.StructType
import
org.apache.spark.sql.types.StructType
scala> import
org.apache.spark.sql.types.StructField
import
org.apache.spark.sql.types.StructField
2. Create List of
Row.
scala> val rowRDD
= List(Row(1, "value1"), Row(2, "value2"), Row(3,
"value3"))
rowRDD:
List[org.apache.spark.sql.Row] = List([1,value1], [2,value2], [3,value3])
3. parallelize the
List.
scala> val
rowRDDParallelized =sc.parallelize(rowRDD)
rowRDDParallelized:
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[9]
at parallelize at <console>:31
scala>
rowRDDParallelized
res5:
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[9]
at parallelize at <console>:31
4. Create schema for
the data added in the List.
scala> val schema
= StructType( StructField("key", IntegerType, true)
::StructField("value", StringType, false) :: Nil)
schema:
org.apache.spark.sql.types.StructType =
StructType(StructField(key,IntegerType,true),
StructField(value,StringType,false))
5. Create Dataframe
using SQLContext.
scala> val
rowRDDDataFrame = sqlContext.createDataFrame(sc.parallelize(rowRDD), schema)
rowRDDDataFrame:
org.apache.spark.sql.DataFrame = [key: int, value: string]
scala>
rowRDDDataFrame.show
+---+------+
|key| value|
+---+------+
| 1|value1|
| 2|value2|
| 3|value3|
+---+------+
Method 4: Using sqlContext to read the text file.
1. Create a text
file which needs to be read.
[root@localhost
spark]# pwd
/home/hadoop/workspace/spark
[root@localhost
spark]# vi keyvalue.txt
2. Add below content
in the text file.
1,value1
2,value2
3,value3
3. Create new
sqlContext using spark Context.
scala> val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was
one deprecation warning; re-run with -deprecation for details
sqlContext:
org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@167312d
4. Create DataFrame
using the text file.
scala> val
keyvalue =
sqlContext.read.textFile("file:///home/hadoop/workspace/spark/keyvalue.txt")
keyvalue:
org.apache.spark.sql.DataFrame = [value: string]
scala>
keyvalue.collect.foreach(println)
[1,value1]
[2,value2]
[3,value3]
5. Check the rows of
Dataframe created using text file.
scala>
keyvalue.show
+--------+
| value|
+--------+
|1,value1|
|2,value2|
|3,value3|
+--------+
6. Create a case
class, Which will hold the data types of the input data. In the above example
there are
two column with key
as Int and value as String.
scala> case class
KeyValueClass(key:Int, value:String)
defined class
KeyValue
7. Since the value
in keyvalue Dataframe is a string, We need to split/map the data.
scala> val
keyvalueJavaRDD = keyvalue.map(row =>{
|
KeyValueClass(row.toString.split(",")(0).toInt,row.toString.split(",")(1).toString)})
keyvalueJavaRDD:
org.apache.spark.sql.Dataset[KeyValueClass] = [key: int, value: string]
scala>
keyvalueJavaRDD.show
+---+------+
|key| value|
+---+------+
| 1|value1|
| 2|value2|
| 3|value3|
+---+------+
Comments
Post a Comment