In structured streaming with pySpark, I need to do some work on the row foreach(process_row)
below
def process_row(row):
ticker = row['ticker']
price = row['price']
if ticker == 'IBM':
print(ticker, price)
# read data from BigQuery table for analysis
appName = config['common']['appName']
spark_session = s.spark_session(appName)
dfBatchRead = s.loadTableFromBQ(spark_session),config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
class MDStreamingRow:
def __init__(self, spark_session,spark_context):
self.spark = spark_session
self.sc = spark_context
self.config = config
def fetch_data(self):
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
foreach(process_row). \
The issue I have is that spark-session is created at the driver (see below) and in order to load data from BigQuery table, I need to call spark_session in method def process_row) as above
if __name__ == "__main__":
appName = config['common']['appName']
spark_session = s.spark_session(appName)
mdstreaming = MDStreamingRow(spark_session, spark_context)
However, I get this error when it gets to process_row()
raise Exception("SparkContext should only be created and accessed on the driver.")
Exception: SparkContext should only be created and accessed on the driver.
FYI, the spark_session is defined as
def spark_session(appName):
return SparkSession.builder \
.appName(appName) \
.enableHiveSupport() \
.getOrCreate()
Do I need to create SparkSessionSingleton etc?
Thanks
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.