Structured streaming, Writing Kafka topic to BigQuery table, throws error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured streaming, Writing Kafka topic to BigQuery table, throws error

Mich Talebzadeh
With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {

         write to DB

          }


Now with structured streaming in Python, you read data into a dataframe with reaSstream and load


       schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())

            ds = self.spark \

                .readStream \

                .format("kafka") \

             .......

              .load() \

                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

       ds2 = ds \

            .select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     withColumn("currency", lit(config['MDVariables']['currency'])). \

                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \

                     withColumn("op_time", current_timestamp())


# write to console

      query = ds2. \
                    writeStream. \
                    outputMode("append"). \
                    format("console"). \
                    start()
        ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()



So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


'write' can not be called on streaming Dataset/DataFrame;, quitting


I gather need to create RDD from the dataframe or maybe there is another way to write streaming data to DB directly from the dataframe?

Thanks


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

Jungtaek Lim-2
If your code doesn't require "end to end exactly-once" then you could leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the different story. I'm not familiar with BigQuery and even have no idea how sink is implemented, but from quick googling tells me a transaction with multiple DML isn't supported, so end to end exactly-once cannot be implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh <[hidden email]> wrote:
With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {

         write to DB

          }


Now with structured streaming in Python, you read data into a dataframe with reaSstream and load


       schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())

            ds = self.spark \

                .readStream \

                .format("kafka") \

             .......

              .load() \

                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

       ds2 = ds \

            .select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     withColumn("currency", lit(config['MDVariables']['currency'])). \

                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \

                     withColumn("op_time", current_timestamp())


# write to console

      query = ds2. \
                    writeStream. \
                    outputMode("append"). \
                    format("console"). \
                    start()
        ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()



So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


'write' can not be called on streaming Dataset/DataFrame;, quitting


I gather need to create RDD from the dataframe or maybe there is another way to write streaming data to DB directly from the dataframe?

Thanks


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

Mich Talebzadeh
Thanks Jungtaek.

I am stuck on how to add rows to BigQuery. Spark API in PySpark does it fine. However, we are talking about structured streaming with PySpark.

This is my code that reads and display data on the console fine

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
            return streamingDataFrame
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

    def processData(self, streamingDataFrame):

        result = streamingDataFrame. \
                     writeStream. \
                     foreach(ForeachWriter()). \
                     start()

        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.startStreaming()
    mdstreaming.processData(streamingDataFrame)

That class ForeachWriter() is supposed to add data (batchsize 10 rows) to the GCP BigQuery table. My code is as follows: However, it does not seem to invoke methods in this class. Every 2 seconds a batch of 10 rows passed to this class. Specifically in the method process(self,row) what is rows_to_insert = [... signify?


class ForeachWriter:

    '''

    Class to send a set of rows to BigQuery.

    When used with `foreach`, copies of this class is going to be used to write

    multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`

    for more details.

    '''


    def open(self):

        # This is called first when preparing to send multiple rows.

        # Put all the initialization code inside open() so that a fresh

        # copy of this class is initialized in the executor where open()

        # will be called.

        self.config = config

        self.table_id = self.config['MDVariables']['fullyQualifiedoutputTableId']

        self.client = bigquery.Client(self.config['MDVariables']['projectId'])

        self.table_ref = client.dataset(self.config['MDVariables']['targetDataset']).table(self.table_id)

        return True

    def process(self,row):


        # This is called for each row after open() has been called.

        # This implementation sends one row at a time.

        # A more efficient implementation can be to send batches of rows at a time.

        rows_to_insert = [

            {u"full_name": u"Phred Phlyntstone", u"age": 32},

            {u"full_name": u"Wylma Phlyntstone", u"age": 29},

        ]


        errors = client.insert_rows_json(self.table_id, self.config['MDVariables']['rows_to_insert'], row_ids=[None] * len(self.config['MDVariables']['rows_to_insert'])

        )  # Make an API request.

        if errors == []:

          print("New rows have been added.")

        else:

          print("Encountered errors while inserting rows: {}".format(errors))

    def close(self, err):

        # This is called after all the rows have been processed.

        if err:

            raise err





Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 23 Feb 2021 at 22:48, Jungtaek Lim <[hidden email]> wrote:
If your code doesn't require "end to end exactly-once" then you could leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the different story. I'm not familiar with BigQuery and even have no idea how sink is implemented, but from quick googling tells me a transaction with multiple DML isn't supported, so end to end exactly-once cannot be implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh <[hidden email]> wrote:
With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {

         write to DB

          }


Now with structured streaming in Python, you read data into a dataframe with reaSstream and load


       schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())

            ds = self.spark \

                .readStream \

                .format("kafka") \

             .......

              .load() \

                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

       ds2 = ds \

            .select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     withColumn("currency", lit(config['MDVariables']['currency'])). \

                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \

                     withColumn("op_time", current_timestamp())


# write to console

      query = ds2. \
                    writeStream. \
                    outputMode("append"). \
                    format("console"). \
                    start()
        ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()



So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


'write' can not be called on streaming Dataset/DataFrame;, quitting


I gather need to create RDD from the dataframe or maybe there is another way to write streaming data to DB directly from the dataframe?

Thanks


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

Mich Talebzadeh
Hi,

I managed to make mine work using the foreachBatch function in writeStream. 

"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key. batchId is just the counter (monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     foreachBatch(SendToBigQuery). \
                     outputMode("update"). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)


        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.fetch_data()


My batch interval is 2 seconds and in this case I am sending 10 rows for each ticker (security).

As I am running this on-premise and sending data to Google BigQuery using a single JVM, the performance is terrible but that is something I need to worry about later.




LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Feb 2021 at 17:02, Mich Talebzadeh <[hidden email]> wrote:
Thanks Jungtaek.

I am stuck on how to add rows to BigQuery. Spark API in PySpark does it fine. However, we are talking about structured streaming with PySpark.

This is my code that reads and display data on the console fine

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
            return streamingDataFrame
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

    def processData(self, streamingDataFrame):

        result = streamingDataFrame. \
                     writeStream. \
                     foreach(ForeachWriter()). \
                     start()

        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.startStreaming()
    mdstreaming.processData(streamingDataFrame)

That class ForeachWriter() is supposed to add data (batchsize 10 rows) to the GCP BigQuery table. My code is as follows: However, it does not seem to invoke methods in this class. Every 2 seconds a batch of 10 rows passed to this class. Specifically in the method process(self,row) what is rows_to_insert = [... signify?


class ForeachWriter:

    '''

    Class to send a set of rows to BigQuery.

    When used with `foreach`, copies of this class is going to be used to write

    multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`

    for more details.

    '''


    def open(self):

        # This is called first when preparing to send multiple rows.

        # Put all the initialization code inside open() so that a fresh

        # copy of this class is initialized in the executor where open()

        # will be called.

        self.config = config

        self.table_id = self.config['MDVariables']['fullyQualifiedoutputTableId']

        self.client = bigquery.Client(self.config['MDVariables']['projectId'])

        self.table_ref = client.dataset(self.config['MDVariables']['targetDataset']).table(self.table_id)

        return True

    def process(self,row):


        # This is called for each row after open() has been called.

        # This implementation sends one row at a time.

        # A more efficient implementation can be to send batches of rows at a time.

        rows_to_insert = [

            {u"full_name": u"Phred Phlyntstone", u"age": 32},

            {u"full_name": u"Wylma Phlyntstone", u"age": 29},

        ]


        errors = client.insert_rows_json(self.table_id, self.config['MDVariables']['rows_to_insert'], row_ids=[None] * len(self.config['MDVariables']['rows_to_insert'])

        )  # Make an API request.

        if errors == []:

          print("New rows have been added.")

        else:

          print("Encountered errors while inserting rows: {}".format(errors))

    def close(self, err):

        # This is called after all the rows have been processed.

        if err:

            raise err





Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 23 Feb 2021 at 22:48, Jungtaek Lim <[hidden email]> wrote:
If your code doesn't require "end to end exactly-once" then you could leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the different story. I'm not familiar with BigQuery and even have no idea how sink is implemented, but from quick googling tells me a transaction with multiple DML isn't supported, so end to end exactly-once cannot be implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh <[hidden email]> wrote:
With the ols spark streaming (example in Scala), this would have been easier through RDD. You could read data

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {

         write to DB

          }


Now with structured streaming in Python, you read data into a dataframe with reaSstream and load


       schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())

            ds = self.spark \

                .readStream \

                .format("kafka") \

             .......

              .load() \

                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

       ds2 = ds \

            .select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     withColumn("currency", lit(config['MDVariables']['currency'])). \

                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \

                     withColumn("op_time", current_timestamp())


# write to console

      query = ds2. \
                    writeStream. \
                    outputMode("append"). \
                    format("console"). \
                    start()
        ds2.printSchema()


But writing to BigQuery through BigQuery API does not work


 s.writeTableToBQ(ds2, "overwrite", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])


 query.awaitTermination()



So this is the run result and the error


root

 |-- rowkey: string (nullable = true)

 |-- ticker: string (nullable = true)

 |-- timeissued: timestamp (nullable = true)

 |-- price: float (nullable = true)

 |-- currency: string (nullable = false)

 |-- op_type: string (nullable = false)

 |-- op_time: timestamp (nullable = false)


'write' can not be called on streaming Dataset/DataFrame;, quitting


I gather need to create RDD from the dataframe or maybe there is another way to write streaming data to DB directly from the dataframe?

Thanks


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.