Creating spark context outside of the driver throws error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Creating spark context outside of the driver throws error

Mich Talebzadeh

In structured streaming with pySpark, I need to do some work on the row  foreach(process_row)

below


def process_row(row):

    ticker = row['ticker']

    price = row['price']

    if ticker == 'IBM':

      print(ticker, price)

      # read data from BigQuery table for analysis

      appName = config['common']['appName']

      spark_session = s.spark_session(appName)

     dfBatchRead = s.loadTableFromBQ(spark_session),config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


class MDStreamingRow:

    def __init__(self, spark_session,spark_context):

        self.spark = spark_session

        self.sc = spark_context

        self.config = config


    def fetch_data(self):


                   writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreach(process_row). \



The issue I have is that spark-session is created at the driver (see below) and in order to load data from BigQuery table, I need to call spark_session in method def process_row) as above


if __name__ == "__main__":

    appName = config['common']['appName']

    spark_session = s.spark_session(appName)

    mdstreaming = MDStreamingRow(spark_session, spark_context)


However, I get this error when it gets to process_row()


raise Exception("SparkContext should only be created and accessed on the driver.")

Exception: SparkContext should only be created and accessed on the driver.


FYI, the spark_session is defined as

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

Do I need to create SparkSessionSingleton etc?

Thanks

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Creating spark context outside of the driver throws error

srowen
Yep, you can never use Spark inside Spark.
You could run N jobs in parallel from the driver using Spark, however.

On Mon, Mar 8, 2021 at 3:14 PM Mich Talebzadeh <[hidden email]> wrote:

In structured streaming with pySpark, I need to do some work on the row  foreach(process_row)

below


def process_row(row):

    ticker = row['ticker']

    price = row['price']

    if ticker == 'IBM':

      print(ticker, price)

      # read data from BigQuery table for analysis

      appName = config['common']['appName']

      spark_session = s.spark_session(appName)

     dfBatchRead = s.loadTableFromBQ(spark_session),config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


class MDStreamingRow:

    def __init__(self, spark_session,spark_context):

        self.spark = spark_session

        self.sc = spark_context

        self.config = config


    def fetch_data(self):


                   writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreach(process_row). \



The issue I have is that spark-session is created at the driver (see below) and in order to load data from BigQuery table, I need to call spark_session in method def process_row) as above


if __name__ == "__main__":

    appName = config['common']['appName']

    spark_session = s.spark_session(appName)

    mdstreaming = MDStreamingRow(spark_session, spark_context)


However, I get this error when it gets to process_row()


raise Exception("SparkContext should only be created and accessed on the driver.")

Exception: SparkContext should only be created and accessed on the driver.


FYI, the spark_session is defined as

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

Do I need to create SparkSessionSingleton etc?

Thanks

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Creating spark context outside of the driver throws error

Mich Talebzadeh
Ok so I am wondering.

Calling this outside of the driver

   appName = config['common']['appName']

   spark_session = s.spark_session(appName)

   

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

It says if sparkSEssion already exists then it will get it. Why is this not happening?

Thanks



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 8 Mar 2021 at 21:48, Sean Owen <[hidden email]> wrote:
Yep, you can never use Spark inside Spark.
You could run N jobs in parallel from the driver using Spark, however.

On Mon, Mar 8, 2021 at 3:14 PM Mich Talebzadeh <[hidden email]> wrote:

In structured streaming with pySpark, I need to do some work on the row  foreach(process_row)

below


def process_row(row):

    ticker = row['ticker']

    price = row['price']

    if ticker == 'IBM':

      print(ticker, price)

      # read data from BigQuery table for analysis

      appName = config['common']['appName']

      spark_session = s.spark_session(appName)

     dfBatchRead = s.loadTableFromBQ(spark_session),config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


class MDStreamingRow:

    def __init__(self, spark_session,spark_context):

        self.spark = spark_session

        self.sc = spark_context

        self.config = config


    def fetch_data(self):


                   writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreach(process_row). \



The issue I have is that spark-session is created at the driver (see below) and in order to load data from BigQuery table, I need to call spark_session in method def process_row) as above


if __name__ == "__main__":

    appName = config['common']['appName']

    spark_session = s.spark_session(appName)

    mdstreaming = MDStreamingRow(spark_session, spark_context)


However, I get this error when it gets to process_row()


raise Exception("SparkContext should only be created and accessed on the driver.")

Exception: SparkContext should only be created and accessed on the driver.


FYI, the spark_session is defined as

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

Do I need to create SparkSessionSingleton etc?

Thanks

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.