Exception handling in Spark

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception handling in Spark

Mich Talebzadeh
Hi,

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//
// Here I am performing a collection 
try  {
         spark.sql(sqltext)
} catch {
    case e: SQLException => e.printStackTrace
    sys.exit()
}

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
Thanks  Brandon!

i should have remembered that.

basically the code gets out with sys.exit(1)  if it cannot find the file

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

tnist
Could you do something like this prior to calling the action.

// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")

Not sure that will help you or not, just a thought.

-Todd




On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks  Brandon!

i should have remembered that.

basically the code gets out with sys.exit(1)  if it cannot find the file

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise
In reply to this post by Mich Talebzadeh

Read is an action, so you could wrap it in a Try (or whatever you want)

scala> val df = Try(spark.read.csv("test"))

df: scala.util.Try[org.apache.spark.sql.DataFrame] = Failure(org.apache.spark.sql.AnalysisException: Path does not exist: file:/test;)

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 12:45 PM
To: Brandon Geise <[hidden email]>
Cc: "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
In reply to this post by tnist
Thanks Todd. This is what I did before creating DF on top of that file

var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
     sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
   val hadoopConf = new org.apache.hadoop.conf.Configuration()
   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
 }

And checked it. It works.

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:
Could you do something like this prior to calling the action.

// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")

Not sure that will help you or not, just a thought.

-Todd




On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks  Brandon!

i should have remembered that.

basically the code gets out with sys.exit(1)  if it cannot find the file

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
I am trying this approach


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

Thanks,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:
Thanks Todd. This is what I did before creating DF on top of that file

var exists = true
exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)
if(!exists) {
  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")
     sys.exit(1)
}
.
.
def xmlFileExists(hdfsDirectory: String): Boolean = {
   val hadoopConf = new org.apache.hadoop.conf.Configuration()
   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
 }

And checked it. It works.

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:
Could you do something like this prior to calling the action.

// Create FileSystem object from Hadoop Configuration
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")

Not sure that will help you or not, just a thought.

-Todd




On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:
Thanks  Brandon!

i should have remembered that.

basically the code gets out with sys.exit(1)  if it cannot find the file

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
This is what I get

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise

Import scala.util.Try

Import scala.util.Success

Import scala.util.Failure

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:11 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

This is what I get

 

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
In reply to this post by Mich Talebzadeh

scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}


Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <[hidden email]> wrote:
This is what I get

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise

Match needs to be lower case “match”

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 


scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <[hidden email]> wrote:

This is what I get

 

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
OK looking promising thanks

scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint ... 6 more fields]


regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:13, Brandon Geise <[hidden email]> wrote:

Match needs to be lower case “match”

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 


scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <[hidden email]> wrote:

This is what I get

 

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Mich Talebzadeh
Hi Brandon.

In dealing with 

df case Failure(e) => throw new Exception("foo") 

Can one print the Exception message?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:15, Mich Talebzadeh <[hidden email]> wrote:
OK looking promising thanks

scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint ... 6 more fields]


regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 5 May 2020 at 23:13, Brandon Geise <[hidden email]> wrote:

Match needs to be lower case “match”

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 


scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <[hidden email]> wrote:

This is what I get

 

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Exception handling in Spark

Brandon Geise

Sure, just do case Failure(e) => throw e

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:36 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

Hi Brandon.

 

In dealing with 

 

df case Failure(e) => throw new Exception("foo") 

 

Can one print the Exception message?

 

Thanks


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:15, Mich Talebzadeh <[hidden email]> wrote:

OK looking promising thanks

 

scala> import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
df: org.apache.spark.sql.DataFrame = [brand: string, ocis_party_id: bigint ... 6 more fields]

 

regards,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:13, Brandon Geise <[hidden email]> wrote:

Match needs to be lower case “match”

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 6:13 PM
To: Brandon Geise <[hidden email]>
Cc: Todd Nist <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 


scala> import scala.util.{Try, Success, Failure}

import scala.util.{Try, Success, Failure}

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:48: error: value Match is not a member of scala.util.Try[org.apache.spark.sql.DataFrame]
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:10, Mich Talebzadeh <[hidden email]> wrote:

This is what I get

 

scala> val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
<console>:47: error: not found: value Try
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                ^
<console>:47: error: not found: value Success
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}
                                                                                                                                                                         ^
<console>:47: error: not found: value Failure
       val df = Try(spark.read.format("com.databricks.spark.xml").option("rootTag", "hierarchy").option("rowTag", "sms_request").load("/tmp/broadcast.xml")) Match {case Success(df) => df case Failure(e) => throw new Exception("foo")}

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 23:03, Brandon Geise <[hidden email]> wrote:

This is what I had in mind.  Can you give this approach a try?

 

val df = Try(spark.read.csv("")) match {

      case Success(df) => df

      case Failure(e) => throw new Exception("foo")

      }

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 5:17 PM
To: Todd Nist <[hidden email]>
Cc: Brandon Geise <[hidden email]>, "user @spark" <[hidden email]>
Subject: Re: Exception handling in Spark

 

I am trying this approach

 


val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch
// Create a DF on top of XML
try {
      val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
          df
}  catch {
    case ex: FileNotFoundException => {
        println (s"\nFile /tmp/broadcast.xml not found\n")
        None
        }
   case unknown: Exception => {
             println(s"\n Error encountered $unknown\n")
        None
        }
}

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

But this does not work

 

scala> try {
     |       val df = spark.read.
     |                 format("com.databricks.spark.xml").
     |                 option("rootTag", "hierarchy").
     |                 option("rowTag", "sms_request").
     |                 load("/tmp/broadcast.xml")
     |           Some(df)
     | }  catch {
     |     case ex: FileNotFoundException => {
     |         println (s"\nFile /tmp/broadcast.xml not found\n")
     |         None
     |         }
     |    case unknown: Exception => {
     |              println(s"\n Error encountered $unknown\n")
     |         None
     |         }
     | }
res6: Option[org.apache.spark.sql.DataFrame] = Some([brand: string, ocis_party_id: bigint ... 6 more fields])

scala>

scala> df.printSchema
<console>:48: error: not found: value df
       df.printSchema

       

data frame seems to be lost!

 

Thanks,

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 18:08, Mich Talebzadeh <[hidden email]> wrote:

Thanks Todd. This is what I did before creating DF on top of that file

 

var exists = true

exists = xmlDirExists(broadcastStagingConfig.xmlFilePath)

if(!exists) {

  println(s"\n Error: The xml file ${ broadcastStagingConfig.xmlFilePath} does not exist, aborting!\n")

     sys.exit(1)

}

.

.

def xmlFileExists(hdfsDirectory: String): Boolean = {

   val hadoopConf = new org.apache.hadoop.conf.Configuration()

   val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

   fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))

 }

 

And checked it. It works.

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 17:54, Todd Nist <[hidden email]> wrote:

Could you do something like this prior to calling the action.

 

// Create FileSystem object from Hadoop Configuration

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// This methods returns Boolean (true - if file exists, false - if file doesn't exist

val fileExists = fs.exists(new Path("<parh_to_file>"))

if (fileExists) println("File exists!")

else println("File doesn't exist!")

 

Not sure that will help you or not, just a thought.

 

-Todd

 

 

 

 

On Tue, May 5, 2020 at 11:45 AM Mich Talebzadeh <[hidden email]> wrote:

Thanks  Brandon!

 

i should have remembered that.

 

basically the code gets out with sys.exit(1)  if it cannot find the file

 

I guess there is no easy way of validating DF except actioning it by show(1,0) etc and checking if it works?

 

Regards,


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Tue, 5 May 2020 at 16:41, Brandon Geise <[hidden email]> wrote:

You could use the Hadoop API and check if the file exists.

 

From: Mich Talebzadeh <[hidden email]>
Date: Tuesday, May 5, 2020 at 11:25 AM
To: "user @spark" <[hidden email]>
Subject: Exception handling in Spark

 

Hi,

 

As I understand exception handling in Spark only makes sense if one attempts an action as opposed to lazy transformations?

 

Let us assume that I am reading an XML file from the HDFS directory  and create a dataframe DF on it

 

val broadcastValue = "123456789"  // I assume this will be sent as a constant for the batch

// Create a DF on top of XML
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")

val newDF = df.withColumn("broadcastid", lit(broadcastValue))

newDF.createOrReplaceTempView("tmp")

  // Put data in Hive table
  //
  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastid="123456", brand)
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
        , brand
        , broadcastid
  FROM tmp
  """
//

// Here I am performing a collection 

try  {

         spark.sql(sqltext)

} catch {

    case e: SQLException => e.printStackTrace

    sys.exit()

}

 

Now the issue I have is that what if the xml file  /tmp/broadcast.xml does not exist or deleted? I won't be able to catch the error until the hive table is populated. Of course I can write a shell script to check if the file exist before running the job or put small collection like df.show(1,0). Are there more general alternatives?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.