Read data from MYSQL using sqlContext and filter records.

Excercise 3: Read data from MYSQL using sqlContext and filter records.

Problem statement: Find all the Person having age greater than 30 years from MYSQL database table PERSON using SPARK

1. Create a table called person
mysql>
CREATE TABLE PERSON(
ID INT NOT NULL AUTO_INCREMENT,
FNAME VARCHAR(100),
AGE INT,
PRIMARY KEY(ID)
);


2. Insert records into PERSON table.
INSERT INTO PERSON (FNAME, AGE) VALUES('Vinayak', 35);
INSERT INTO PERSON (FNAME, AGE) VALUES('Nilesh', 37);
INSERT INTO PERSON (FNAME, AGE) VALUES('Raju', 30);
INSERT INTO PERSON (FNAME, AGE) VALUES('Karthik', 28);
INSERT INTO PERSON (FNAME, AGE) VALUES('Shreshta',1);
INSERT INTO PERSON (FNAME, AGE) VALUES('Siddhish', 2);

3. Check the data in PERSON table.

mysql> SELECT * FROM PERSON;
+----+----------+------+
| ID | FNAME    | AGE  |
+----+----------+------+
|  1 | Vinayak  |   35 |
|  2 | Nilesh   |   37 |
|  3 | Raju     |   30 |
|  4 | Karthik  |   28 |
|  5 | Shreshta |    1 |
|  6 | Siddhish |    2 |
+----+----------+------+
6 rows in set (0.00 sec)


4. we need to fetch all the Person records with age greater than 30 (Using Spark)

mysql> SELECT * FROM PERSON WHERE AGE>30;
+----+---------+------+
| ID | FNAME   | AGE  |
+----+---------+------+
|  1 | Vinayak |   35 |
|  2 | Nilesh  |   37 |
+----+---------+------+
2 rows in set (0.00 sec)


5. Start the spark console by setting mysql driver class path

[root@localhost adminuser]# spark-shell --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.42-bin.jar

6. Once the spark console is started, One can see scala> command option as shown in below commands.
Create sqlContext using Spark Context (sc).
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

7. Use sqlContext to get DataFrame of person.
Note you may need to change the host, username and password details in the url which is passed to sql context.

val personmysql = sqlContext.jdbc("jdbc:mysql://localhost:3306/userdb?user=<?>&password=<?>", "PERSON")


8. Use show method to check all the data. Use show method only if you are sure of DataFrame size. If the personmysql DF is having million of records it is better to use other methods like take, first. This will ensure there is no Out of memory.

scala> personmysql.show
+---+--------+---+
| ID|   FNAME|AGE|
+---+--------+---+
|  1| Vinayak| 35|
|  2|  Nilesh| 37|
|  3|    Raju| 30|
|  4| Karthik| 28|
|  5|Shreshta|  1|
|  6|Siddhish|  2|
+---+--------+---+

9. Fetch the required details using any of the below methods.

scala> personmysql.where("AGE>30").show
+---+-------+---+
| ID|  FNAME|AGE|
+---+-------+---+
|  1|Vinayak| 35|
|  2| Nilesh| 37|
+---+-------+---+

9.1: One can use filter method to fetch the required records.

scala> personmysql.filter("AGE>30").show
+---+-------+---+
| ID|  FNAME|AGE|
+---+-------+---+
|  1|Vinayak| 35|
|  2| Nilesh| 37|

+---+-------+---+

Note: sqlContext.jdbc method is deprecated.

Once can use read method to get DF object (Spark version >1.4).

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:mysql://localhost:3306/userdb?user=root",
  "dbtable" -> "userdb.PERSON")).load()

scala> jdbcDF.show
+---+--------+---+
| ID|   FNAME|AGE|
+---+--------+---+
|  1| Vinayak| 35|
|  2|  Nilesh| 37|
|  3|    Raju| 30|
|  4| Karthik| 28|
|  5|Shreshta|  1|
|  6|Siddhish|  2|

+---+--------+---+

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS