Spark code to write to MySQL and Hive

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark code to write to MySQL and Hive

ryandam.9

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive.

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")


 

 

 

 

mysqlDF.show()


 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()


 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)


 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time.

Ravi

Reply | Threaded
Open this post in threaded view
|

RE: Spark code to write to MySQL and Hive

ryandam.9

Sorry, last mail format was not good.

 

println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: [hidden email] <[hidden email]>
Sent: Wednesday, August 29, 2018 8:19 PM
To: [hidden email]
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive.

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

 

 

 

 

mysqlDF.show()

 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time.

Ravi

Reply | Threaded
Open this post in threaded view
|

Re: Spark code to write to MySQL and Hive

Jacek Laskowski
Hi,

I haven't checked my answer (too lazy today), but think I know what might be going on.

tl;dr Use cache to preserve the initial set of rows from mysql

After you append new rows, you will have twice as many rows as you had previously. Correct?

Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes.

What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded.

Hope that helps.

On Wed, Aug 29, 2018 at 4:59 PM <[hidden email]> wrote:

Sorry, last mail format was not good.

 

println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: [hidden email] <[hidden email]>
Sent: Wednesday, August 29, 2018 8:19 PM
To: [hidden email]
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive.

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

 

 

 

 

mysqlDF.show()

 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time.

Ravi



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

image004.png (808 bytes) Download Attachment
image005.png (664 bytes) Download Attachment
image006.png (366 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Spark code to write to MySQL and Hive

ryandam.9

Thank you Jacek.  You’re right.  Unless a DF is persisted / Cached [Both are same I believe], the DF is reevaluated every time it is referenced in an action.  [??]

So far, I have been reading/writing to files. This point never came to my attention.

 

So, instead of thinking DataFrames as “Fixed data once created by an action”, it makes sense to think of them as kind of “expressions”.  Unless persisted, they’re reevaluated and can get/lose their old data based on the changes in their sources.

 

Is there anyway to restrict Spark to not to read the newly inserted data ? [the reads look like Uncommitted Reads in DB2] ?

 

 

 

Thanks,

Ravi

 

From: Jacek Laskowski <[hidden email]>
Sent: Wednesday, August 29, 2018 8:57 PM
To: [hidden email]
Cc: user @spark <[hidden email]>
Subject: Re: Spark code to write to MySQL and Hive

 

Hi,

 

I haven't checked my answer (too lazy today), but think I know what might be going on.

 

tl;dr Use cache to preserve the initial set of rows from mysql

 

After you append new rows, you will have twice as many rows as you had previously. Correct?

 

Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes.

 

What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded.

 

Hope that helps.

 

 

On Wed, Aug 29, 2018 at 4:59 PM <[hidden email]> wrote:

Sorry, last mail format was not good.

 

println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: [hidden email] <[hidden email]>
Sent: Wednesday, August 29, 2018 8:19 PM
To: [hidden email]
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive.

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

 

 

 

 

mysqlDF.show()

 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time.

Ravi

Reply | Threaded
Open this post in threaded view
|

Re: Spark code to write to MySQL and Hive

Sonal Goyal
If you have the flexibility to append a new column to the table, you could add an isUpdated column which by default is 0. so mysqlDF would read the rows with isUpdated=0 and newDF would insert/append the rows with isUpdated = 1. 

Actually, whats your use case - you seem to be updating a table while also trying to keep the old state. What makes you not write to a new table instead? 

Thanks,
Sonal
Nube Technologies 





On Wed, Aug 29, 2018 at 11:12 PM, <[hidden email]> wrote:

Thank you Jacek.  You’re right.  Unless a DF is persisted / Cached [Both are same I believe], the DF is reevaluated every time it is referenced in an action.  [??]

So far, I have been reading/writing to files. This point never came to my attention.

 

So, instead of thinking DataFrames as “Fixed data once created by an action”, it makes sense to think of them as kind of “expressions”.  Unless persisted, they’re reevaluated and can get/lose their old data based on the changes in their sources.

 

Is there anyway to restrict Spark to not to read the newly inserted data ? [the reads look like Uncommitted Reads in DB2] ?

 

 

 

Thanks,

Ravi

 

From: Jacek Laskowski <[hidden email]>
Sent: Wednesday, August 29, 2018 8:57 PM
To: [hidden email]
Cc: user @spark <[hidden email]>
Subject: Re: Spark code to write to MySQL and Hive

 

Hi,

 

I haven't checked my answer (too lazy today), but think I know what might be going on.

 

tl;dr Use cache to preserve the initial set of rows from mysql

 

After you append new rows, you will have twice as many rows as you had previously. Correct?

 

Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes.

 

What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded.

 

Hope that helps.

 

 

On Wed, Aug 29, 2018 at 4:59 PM <[hidden email]> wrote:

Sorry, last mail format was not good.

 

println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: [hidden email] <[hidden email]>
Sent: Wednesday, August 29, 2018 8:19 PM
To: [hidden email]
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive.

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println(
"I am back from mySql")

 

 

 

 

mysqlDF.show()

 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.
Append)
  .jdbc(jdbcUrl, table, properties)

 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time.

Ravi