Structured Streaming With Kafka - processing each event

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming With Kafka - processing each event

Sachit Murarka
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming With Kafka - processing each event

Mich Talebzadeh
If you are receiving data from Kafka, Wouldn't that be better in Json format?

.       try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
            return streamingDataFrame
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

and pass a class to the writer

       result = streamingDataFrame. \
                     writeStream. \
                     foreach(ForeachWriter()). \
                     start()

You don't want to use a row by row (cursor) approach as it would leave a lot of messages un processed (as you correctly stated it runs on a single JVM).


I am doing the same trying to process and write messages to BigQuery. 


HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming With Kafka - processing each event

Mich Talebzadeh
BTW you intend to process these in 30 seconds?

processingTime="30 seconds

So how many rows of data are sent in microbatch and what is the interval at which you receive the data in batches from the producer?




LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 12:21, Mich Talebzadeh <[hidden email]> wrote:
If you are receiving data from Kafka, Wouldn't that be better in Json format?

.       try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
            return streamingDataFrame
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

and pass a class to the writer

       result = streamingDataFrame. \
                     writeStream. \
                     foreach(ForeachWriter()). \
                     start()

You don't want to use a row by row (cursor) approach as it would leave a lot of messages un processed (as you correctly stated it runs on a single JVM).


I am doing the same trying to process and write messages to BigQuery. 


HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming With Kafka - processing each event

Mich Talebzadeh
In reply to this post by Sachit Murarka
Hi Sachit,

I managed to make mine work using the foreachBatch function in writeStream. 

"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key. batchId is just the counter (monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     foreachBatch(SendToBigQuery). \
                     outputMode("update"). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)


        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.fetch_data()


My batch interval is 2 seconds and in this case I am sending 10 rows for each ticker (security).

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming With Kafka - processing each event

Sachit Murarka
Hi Mich,

Thanks for reply.  Will checkout this.

Kind Regards,
Sachit Murarka


On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh <[hidden email]> wrote:
Hi Sachit,

I managed to make mine work using the foreachBatch function in writeStream. 

"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key. batchId is just the counter (monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     foreachBatch(SendToBigQuery). \
                     outputMode("update"). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)


        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.fetch_data()


My batch interval is 2 seconds and in this case I am sending 10 rows for each ticker (security).

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming With Kafka - processing each event

Gourav Sengupta
Hi,
Are you using structured streaming, which is the spark version and Kafka version, and where are you fetching the data from? 
Semantically speaking if your data in Kafka represents an action to be performed then it should be actually a queue like rabbitmq or SQS. If it is simply data then it should be Kafka.
That once again begs the question, what is the data you are fetching, is it just another field from a table to join or an explicit fetch operation using Https or SQL call?
Regards
Gourav

On Tue, 2 Mar 2021, 12:57 Sachit Murarka, <[hidden email]> wrote:
Hi Mich,

Thanks for reply.  Will checkout this.

Kind Regards,
Sachit Murarka


On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh <[hidden email]> wrote:
Hi Sachit,

I managed to make mine work using the foreachBatch function in writeStream. 

"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key. batchId is just the counter (monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     foreachBatch(SendToBigQuery). \
                     outputMode("update"). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)


        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.fetch_data()


My batch interval is 2 seconds and in this case I am sending 10 rows for each ticker (security).

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Feb 2021 at 06:27, Sachit Murarka <[hidden email]> wrote:
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing.
Processing includes : Reading the actual data location from metadata and fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets", START_OFFSET).load() .selectExpr("CAST(value AS STRING)")

       
query = df.writeStream.foreach(process_events).option("checkpointLocation", "/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple times for each single message. If I change it for foreachbatch, will it optimize it?


Kind Regards,
Sachit Murarka