spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacek Laskowski <ja...@japila.pl>
Subject Re: Spark code to write to MySQL and Hive
Date Wed, 29 Aug 2018 15:26:38 GMT
Hi,

I haven't checked my answer (too lazy today), but think I know what might
be going on.

tl;dr Use cache to preserve the initial set of rows from mysql

After you append new rows, you will have twice as many rows as you had
previously. Correct?

Since newDF references the table every time you use it in a structured
query, say to write it to a table, the source table will get re-loaded and
hence the number of rows changes.

What you should do is to execute newDF.cache.count right after val newDF =
mysqlDF.select... so the data (rows) remains on executors and won't get
reloaded.

Hope that helps.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 29, 2018 at 4:59 PM <ryandam.9@gmail.com> wrote:

> Sorry, last mail format was not good.
>
>
>
> *println*(*"Going to talk to mySql"*)
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
> mysqlDF.show()
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
> *// Insert records into the table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
>
>
> Going to talk to mySql
>
> I am back from mySql
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* ryandam.9@gmail.com <ryandam.9@gmail.com>
> *Sent:* Wednesday, August 29, 2018 8:19 PM
> *To:* user@spark.apache.org
> *Subject:* Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> Can anyone help me to understand what is happening with my code ?
>
>
>
> I wrote a Spark application to read from a MySQL table [that already has 4
> records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
> write the new DF to MySQL as well as to Hive.
>
>
>
> I am surprised to see additional set of records in Hive !! I am not able
> to understand how the *newDF *has records with IDs 21 to 24.  I know that
> a DF is immutable. If so, how come it has 4 records at one point and 8
> records at later point ?
>
>
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
>
>
>
>
>
>
>
>
> mysqlDF.show()
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
>
>
>
>
> *// Insert records into the MySQL table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
> *Records already existing in mySql*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> *newDF.show()*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
>
>
> Thanks for you time.
>
> Ravi
>

Mime
View raw message