Read data from MYSQL using sqlContext and filter records.
Excercise 3: Read data from MYSQL using sqlContext and
filter records.
Problem statement: Find all the Person having age
greater than 30 years from MYSQL database table PERSON using SPARK
1. Create a table called person
mysql>
CREATE TABLE PERSON(
ID INT NOT NULL
AUTO_INCREMENT,
FNAME VARCHAR(100),
AGE INT,
PRIMARY KEY(ID)
);
2. Insert records into PERSON table.
INSERT INTO PERSON
(FNAME, AGE) VALUES('Vinayak', 35);
INSERT INTO PERSON
(FNAME, AGE) VALUES('Nilesh', 37);
INSERT INTO PERSON
(FNAME, AGE) VALUES('Raju', 30);
INSERT INTO PERSON
(FNAME, AGE) VALUES('Karthik', 28);
INSERT INTO PERSON
(FNAME, AGE) VALUES('Shreshta',1);
INSERT INTO PERSON
(FNAME, AGE) VALUES('Siddhish', 2);
3. Check the data in PERSON table.
mysql> SELECT *
FROM PERSON;
+----+----------+------+
| ID | FNAME | AGE
|
+----+----------+------+
| 1 | Vinayak
| 35 |
| 2 | Nilesh
| 37 |
| 3 | Raju
| 30 |
| 4 | Karthik
| 28 |
| 5 | Shreshta | 1 |
| 6 | Siddhish | 2 |
+----+----------+------+
6 rows in set (0.00
sec)
4. we need to fetch all the Person records with age
greater than 30 (Using Spark)
mysql> SELECT *
FROM PERSON WHERE AGE>30;
+----+---------+------+
| ID | FNAME | AGE
|
+----+---------+------+
| 1 | Vinayak | 35 |
| 2 | Nilesh
| 37 |
+----+---------+------+
2 rows in set (0.00
sec)
5. Start the spark console by setting mysql driver
class path
[root@localhost
adminuser]# spark-shell --driver-class-path
/usr/local/hive/lib/mysql-connector-java-5.1.42-bin.jar
6. Once the spark console is started, One can see
scala> command option as shown in below commands.
Create sqlContext using Spark Context (sc).
scala> val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
7. Use sqlContext to get DataFrame of person.
Note you may need to
change the host, username and password details in the url which is passed to
sql context.
val personmysql =
sqlContext.jdbc("jdbc:mysql://localhost:3306/userdb?user=<?>&password=<?>",
"PERSON")
8. Use show method to check all the data. Use show
method only if you are sure of DataFrame size. If the personmysql DF is having
million of records it is better to use other methods like take, first. This
will ensure there is no Out of memory.
scala>
personmysql.show
+---+--------+---+
| ID| FNAME|AGE|
+---+--------+---+
| 1| Vinayak| 35|
| 2|
Nilesh| 37|
| 3|
Raju| 30|
| 4| Karthik| 28|
| 5|Shreshta|
1|
| 6|Siddhish|
2|
+---+--------+---+
9. Fetch the required details using any of the below
methods.
scala>
personmysql.where("AGE>30").show
+---+-------+---+
| ID| FNAME|AGE|
+---+-------+---+
| 1|Vinayak| 35|
| 2| Nilesh| 37|
+---+-------+---+
9.1: One can use filter method to fetch the required
records.
scala>
personmysql.filter("AGE>30").show
+---+-------+---+
| ID| FNAME|AGE|
+---+-------+---+
| 1|Vinayak| 35|
| 2| Nilesh| 37|
+---+-------+---+
Note:
sqlContext.jdbc method is deprecated.
Once can use read method to get DF object (Spark
version >1.4).
val jdbcDF =
sqlContext.read.format("jdbc").options(
Map("url" ->
"jdbc:mysql://localhost:3306/userdb?user=root",
"dbtable" ->
"userdb.PERSON")).load()
scala>
jdbcDF.show
+---+--------+---+
| ID| FNAME|AGE|
+---+--------+---+
| 1| Vinayak| 35|
| 2|
Nilesh| 37|
| 3|
Raju| 30|
| 4| Karthik| 28|
| 5|Shreshta|
1|
| 6|Siddhish|
2|
+---+--------+---+
Comments
Post a Comment