Save Spark dataframe as dynamic partitioned table in Hive

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Save Spark dataframe as dynamic partitioned table in Hive

Mich Talebzadeh
Hi,

I have an XML file that is read into Spark using Databa bricks jar file 

spark-xml_2.11-0.9.0.jar

Doing some tests

This is the format of XML (one row here)

//*
<sms_request>
<sms_campaign_code>SKY</sms_campaign_code>
<target_mobile_no>0123456789</target_mobile_no>
<ocis_party_id>123456789</ocis_party_id>
<brand>XYZ</brand>
<sms_template_code>GLX</sms_template_code>
<sms_request_external_ref>12345678</sms_request_external_ref>
<sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
<sms_template_variable>
</sms_template_variable>
</sms_request>
*/


OK I am trying to insert data into a hive partitioned table through spark as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.util.Date
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.SaveMode

sc.setLogLevel("WARN")
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

// xml stuff
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._
import spark.implicits._
//
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

val broadcastValue = "123456789"
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
df.printSchema
df.show(10,false)

df.createOrReplaceTempView("tmp")
// Need to create and populate target ORC table michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
     partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
     broadcastId STRING
   , brand STRING)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)
  //
  // Put data in Hive table
  //
     // Dynamic partitioning is disabled by default. We turn it on
     spark.sql("SET hive.exec.dynamic.partition = true")
     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
     // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")


  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp
  """

  spark.sql(sqltext)
  spark.sql("select * from michtest.BroadcastStaging").show(10,false)


This does not work because I need to pass onetime fixed value for partition column value broadcastId and dynamic value for brand column from the table itself


This is the outcome of run


Started at
[16/04/2020 00:37:34.34]
root
 |-- brand: string (nullable = true)
 |-- ocis_party_id: long (nullable = true)
 |-- sms_campaign_code: string (nullable = true)
 |-- sms_request_external_ref: long (nullable = true)
 |-- sms_request_external_txn_ref: long (nullable = true)
 |-- sms_template_code: string (nullable = true)
 |-- sms_template_variable: string (nullable = true)
 |-- target_mobile_no: long (nullable = true)

+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|XYZ  |123456789    |SKY              |12345678                |55555555                    |GLX              |
                    |123456789       |
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+

org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)

== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = dummy)
-------------------------------------------------------------------------------------^^^
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp


It fails passing partition values


Thanks,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Save Spark dataframe as dynamic partitioned table in Hive

Patrick McCarthy-2
What happens if you change your insert statement to be 

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand)

and then add the value for brand into the select as
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
You may need to rearrange the order of the partitions to put dynamic partitions before static.

On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have an XML file that is read into Spark using Databa bricks jar file 

spark-xml_2.11-0.9.0.jar

Doing some tests

This is the format of XML (one row here)

//*
<sms_request>
<sms_campaign_code>SKY</sms_campaign_code>
<target_mobile_no>0123456789</target_mobile_no>
<ocis_party_id>123456789</ocis_party_id>
<brand>XYZ</brand>
<sms_template_code>GLX</sms_template_code>
<sms_request_external_ref>12345678</sms_request_external_ref>
<sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
<sms_template_variable>
</sms_template_variable>
</sms_request>
*/


OK I am trying to insert data into a hive partitioned table through spark as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.util.Date
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.SaveMode

sc.setLogLevel("WARN")
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

// xml stuff
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._
import spark.implicits._
//
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

val broadcastValue = "123456789"
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
df.printSchema
df.show(10,false)

df.createOrReplaceTempView("tmp")
// Need to create and populate target ORC table michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
     partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
     broadcastId STRING
   , brand STRING)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)
  //
  // Put data in Hive table
  //
     // Dynamic partitioning is disabled by default. We turn it on
     spark.sql("SET hive.exec.dynamic.partition = true")
     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
     // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")


  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp
  """

  spark.sql(sqltext)
  spark.sql("select * from michtest.BroadcastStaging").show(10,false)


This does not work because I need to pass onetime fixed value for partition column value broadcastId and dynamic value for brand column from the table itself


This is the outcome of run


Started at
[16/04/2020 00:37:34.34]
root
 |-- brand: string (nullable = true)
 |-- ocis_party_id: long (nullable = true)
 |-- sms_campaign_code: string (nullable = true)
 |-- sms_request_external_ref: long (nullable = true)
 |-- sms_request_external_txn_ref: long (nullable = true)
 |-- sms_template_code: string (nullable = true)
 |-- sms_template_variable: string (nullable = true)
 |-- target_mobile_no: long (nullable = true)

+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|XYZ  |123456789    |SKY              |12345678                |55555555                    |GLX              |
                    |123456789       |
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+

org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)

== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = dummy)
-------------------------------------------------------------------------------------^^^
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp


It fails passing partition values


Thanks,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: Save Spark dataframe as dynamic partitioned table in Hive

Mich Talebzadeh
Thanks Patrick,

The partition  broadcastId is static as defined as a value below


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

// add this constant column to dataframe itself 
val newDF = df.withColumn("broadcastId", lit(broadcastValue))

newDF.show(100,false)

newDF.createOrReplaceTempView("tmp")

// Need to create and populate target Parquet table michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
     partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
     broadcastId STRING
   , brand STRING
)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)

  //
  // Put data in Hive table
  //
     // Dynamic partitioning is disabled by default. We turn it on
     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")

// Now put static partition (
broadcastId) first and dynamic partition (brand)last 

 
sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand)
  SELECT

          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , broadcastId
        , brand
  FROM tmp
  """
  spark.sql(sqltext)


Still not working properly

sqltext: String =
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , broadcastId
        , brand
  FROM tmp

s
cala>   spark.sql(sqltext)
org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)


== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand)
-------------------------------------------------------------------------------------^^^

  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , broadcastId
        , brand
  FROM tmp

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 55 elided


The thing is that if I replace broadcastId = broadcastValue with  broadcastId = " 123456789" it works!


Thanks,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 16 Apr 2020 at 13:25, Patrick McCarthy <[hidden email]> wrote:
What happens if you change your insert statement to be 

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand)

and then add the value for brand into the select as
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
You may need to rearrange the order of the partitions to put dynamic partitions before static.

On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have an XML file that is read into Spark using Databa bricks jar file 

spark-xml_2.11-0.9.0.jar

Doing some tests

This is the format of XML (one row here)

//*
<sms_request>
<sms_campaign_code>SKY</sms_campaign_code>
<target_mobile_no>0123456789</target_mobile_no>
<ocis_party_id>123456789</ocis_party_id>
<brand>XYZ</brand>
<sms_template_code>GLX</sms_template_code>
<sms_request_external_ref>12345678</sms_request_external_ref>
<sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
<sms_template_variable>
</sms_template_variable>
</sms_request>
*/


OK I am trying to insert data into a hive partitioned table through spark as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.util.Date
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.SaveMode

sc.setLogLevel("WARN")
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

// xml stuff
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._
import spark.implicits._
//
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

val broadcastValue = "123456789"
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
df.printSchema
df.show(10,false)

df.createOrReplaceTempView("tmp")
// Need to create and populate target ORC table michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
     partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
     broadcastId STRING
   , brand STRING)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)
  //
  // Put data in Hive table
  //
     // Dynamic partitioning is disabled by default. We turn it on
     spark.sql("SET hive.exec.dynamic.partition = true")
     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
     // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")


  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp
  """

  spark.sql(sqltext)
  spark.sql("select * from michtest.BroadcastStaging").show(10,false)


This does not work because I need to pass onetime fixed value for partition column value broadcastId and dynamic value for brand column from the table itself


This is the outcome of run


Started at
[16/04/2020 00:37:34.34]
root
 |-- brand: string (nullable = true)
 |-- ocis_party_id: long (nullable = true)
 |-- sms_campaign_code: string (nullable = true)
 |-- sms_request_external_ref: long (nullable = true)
 |-- sms_request_external_txn_ref: long (nullable = true)
 |-- sms_template_code: string (nullable = true)
 |-- sms_template_variable: string (nullable = true)
 |-- target_mobile_no: long (nullable = true)

+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|XYZ  |123456789    |SKY              |12345678                |55555555                    |GLX              |
                    |123456789       |
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+

org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)

== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = broadcastValue, brand = dummy)
-------------------------------------------------------------------------------------^^^
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp


It fails passing partition values


Thanks,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: Save Spark dataframe as dynamic partitioned table in Hive

ZHANG Wei
AFAICT, we can use spark.sql(s"select $name ..."), name is a value in
Scala context[1].

--
Cheers,
-z

[1] https://docs.scala-lang.org/overviews/core/string-interpolation.html

On Fri, 17 Apr 2020 00:10:59 +0100
Mich Talebzadeh <[hidden email]> wrote:

> Thanks Patrick,
>
> The partition  broadcastId is static as defined as a value below
>
>
> val broadcastValue = "123456789"  // I assume this will be sent as a
> constant for the batch
>
> // Create a DF on top of XML
> val df = spark.read.
>                 format("com.databricks.spark.xml").
>                 option("rootTag", "hierarchy").
>                 option("rowTag", "sms_request").
>                 load("/tmp/broadcast.xml")
>
> // add this constant column to dataframe itself
> val newDF = df.withColumn("broadcastId", lit(broadcastValue))
>
> newDF.show(100,false)
>
> newDF.createOrReplaceTempView("tmp")
>
> // Need to create and populate target Parquet table
> michtest.BroadcastStaging
> //
> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
>
>   var sqltext = """
>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
>      partyId STRING
>    , phoneNumber STRING
>   )
>   PARTITIONED BY (
>      broadcastId STRING
>    , brand STRING
> )
>   STORED AS PARQUET
>   """
>   HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table
>   //
>      // Dynamic partitioning is disabled by default. We turn it on
>      spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
>
> // Now put static partition (broadcastId) first and dynamic partition
> (brand)last
>
>   sqltext = """
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
>   """
>   spark.sql(sqltext)
>
> Still not working properly
>
> sqltext: String =
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
>
> scala>   spark.sql(sqltext)
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
>
> == SQL ==
>
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
> -------------------------------------------------------------------------------------^^^
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
>
>   at
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
>   at
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>   ... 55 elided
>
>
> The thing is that if I replace broadcastId = broadcastValue with  broadcastId
> = " 123456789" it works!
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Apr 2020 at 13:25, Patrick McCarthy
> <[hidden email]> wrote:
>
> > What happens if you change your insert statement to be
> >
> >   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> > broadcastValue, brand)
> >
> > and then add the value for brand into the select as
> >   SELECT
> >           ocis_party_id AS partyId
> >         , target_mobile_no AS phoneNumber
> >         , brand
> > You may need to rearrange the order of the partitions to put dynamic
> > partitions before static.
> >
> > On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> I have an XML file that is read into Spark using Databa bricks jar file
> >>
> >> spark-xml_2.11-0.9.0.jar
> >>
> >> Doing some tests
> >>
> >> This is the format of XML (one row here)
> >>
> >> //*
> >> <sms_request>
> >> <sms_campaign_code>SKY</sms_campaign_code>
> >> <target_mobile_no>0123456789</target_mobile_no>
> >> <ocis_party_id>123456789</ocis_party_id>
> >> <brand>XYZ</brand>
> >> <sms_template_code>GLX</sms_template_code>
> >> <sms_request_external_ref>12345678</sms_request_external_ref>
> >> <sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
> >> <sms_template_variable>
> >> </sms_template_variable>
> >> </sms_request>
> >> */
> >>
> >> OK I am trying to insert data into a hive partitioned table through spark
> >> as follows:
> >>
> >> import org.apache.spark.sql.DataFrame
> >> import org.apache.spark.sql.functions._
> >> import java.util.Date
> >> import org.apache.spark.sql.functions.col
> >> import org.apache.spark.sql.SaveMode
> >>
> >> sc.setLogLevel("WARN")
> >> import org.apache.log4j.Logger
> >> import org.apache.log4j.Level
> >> Logger.getLogger("org").setLevel(Level.OFF)
> >> Logger.getLogger("akka").setLevel(Level.OFF)
> >>
> >> // xml stuff
> >> import com.databricks.spark.xml.functions.from_xml
> >> import com.databricks.spark.xml.schema_of_xml
> >> import org.apache.spark.sql.SparkSession
> >> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> >> DoubleType}
> >> import com.databricks.spark.xml._
> >> import spark.implicits._
> >> //
> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> >> println ("\nStarted at"); HiveContext.sql("SELECT
> >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> >> ").collect.foreach(println)
> >>
> >> val broadcastValue = "123456789"
> >> val df = spark.read.
> >>                 format("com.databricks.spark.xml").
> >>                 option("rootTag", "hierarchy").
> >>                 option("rowTag", "sms_request").
> >>                 load("/tmp/broadcast.xml")
> >> df.printSchema
> >> df.show(10,false)
> >>
> >> df.createOrReplaceTempView("tmp")
> >> // Need to create and populate target ORC table michtest.BroadcastStaging
> >> //
> >> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
> >>
> >>   var sqltext = """
> >>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
> >>      partyId STRING
> >>    , phoneNumber STRING
> >>   )
> >>   PARTITIONED BY (
> >>      broadcastId STRING
> >>    , brand STRING)
> >>   STORED AS PARQUET
> >>   """
> >>   HiveContext.sql(sqltext)
> >>   //
> >>   // Put data in Hive table
> >>   //
> >>      // Dynamic partitioning is disabled by default. We turn it on
> >>      spark.sql("SET hive.exec.dynamic.partition = true")
> >>      spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
> >>      // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> * sqltext = """  INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> >> (broadcastId = broadcastValue, brand = brand)  SELECT
> >> ocis_party_id AS partyId        , target_mobile_no AS phoneNumber  FROM
> >> tmp  """*
> >>   spark.sql(sqltext)
> >>   spark.sql("select * from michtest.BroadcastStaging").show(10,false)
> >>
> >>
> >> This does not work because I need to pass onetime fixed value for
> >> partition column value *broadcastId *and dynamic value for *brand* column
> >> from the table itself
> >>
> >>
> >> *This is the outcome of run*
> >>
> >>
> >> Started at
> >> [16/04/2020 00:37:34.34]
> >> root
> >>  |-- brand: string (nullable = true)
> >>  |-- ocis_party_id: long (nullable = true)
> >>  |-- sms_campaign_code: string (nullable = true)
> >>  |-- sms_request_external_ref: long (nullable = true)
> >>  |-- sms_request_external_txn_ref: long (nullable = true)
> >>  |-- sms_template_code: string (nullable = true)
> >>  |-- sms_template_variable: string (nullable = true)
> >>  |-- target_mobile_no: long (nullable = true)
> >>
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >>
> >> |brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >> |XYZ  |123456789    |SKY              |12345678                |55555555
> >>                    |GLX              |
> >>                     |123456789       |
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >>
> >> org.apache.spark.sql.catalyst.parser.ParseException:
> >> missing STRING at ','(line 2, pos 85)
> >>
> >> == SQL ==
> >>
> >>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> >> broadcastValue, brand = dummy)
> >>
> >> -------------------------------------------------------------------------------------^^^
> >>   SELECT
> >>           ocis_party_id AS partyId
> >>         , target_mobile_no AS phoneNumber
> >>   FROM tmp
> >>
> >> It fails passing partition values
> >>
> >>
> >> Thanks,
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> >> any loss, damage or destruction of data or any other property which may
> >> arise from relying on this email's technical content is explicitly
> >> disclaimed. The author will in no case be liable for any monetary damages
> >> arising from such loss, damage or destruction.
> >>
> >>
> >>
> >
> >
> > --
> >
> >
> > *Patrick McCarthy  *
> >
> > Senior Data Scientist, Machine Learning Engineering
> >
> > Dstillery
> >
> > 470 Park Ave South, 17th Floor, NYC 10016
> >

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]