Where can I read the Kafka offsets in SparkSQL application

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Where can I read the Kafka offsets in SparkSQL application

John, Vishal (Agoda)-2

Hello all,


I have to read data from Kafka topic at regular intervals. I create the dataframe as shown below.  I don’t want to start reading from the beginning on each run. At the same time, I don’t want to miss the messages between run intervals.

val queryDf = sqlContext
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hosts)
  .option("enable.auto.commit", true)
  .option("subscribe", topicName)
  .option("auto.commit.interval.ms", 1000)
  .option("startingOffsets", " latest")  //??  earliest OR latest
  .load()
  .selectExpr("CAST(value AS STRING) as message")

I would like to understand where the offsets will be stored, so that I can supply it each time the application starts. Or is there a way to supply a custom location where to store the offsets.
This is not a Steaming application. So, I am not sure if checkpoint directory is valid in this case.

Any pointers would be highly helpful.


thanks,
Vishal

________________________________
This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Where can I read the Kafka offsets in SparkSQL application

Gourav Sengupta
Hi,


can you see whether using the option for checkPointLocation would work in case you are using structured streaming?

Regards,
Gourav Sengupta

On Tue, Jul 24, 2018 at 12:30 PM, John, Vishal (Agoda) <[hidden email]> wrote:

Hello all,


I have to read data from Kafka topic at regular intervals. I create the dataframe as shown below.  I don’t want to start reading from the beginning on each run. At the same time, I don’t want to miss the messages between run intervals.

val queryDf = sqlContext
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hosts)
  .option("enable.auto.commit", true)
  .option("subscribe", topicName)
  .option("auto.commit.interval.ms", 1000)
  .option("startingOffsets", " latest")  //??  earliest OR latest
  .load()
  .selectExpr("CAST(value AS STRING) as message")

I would like to understand where the offsets will be stored, so that I can supply it each time the application starts. Or is there a way to supply a custom location where to store the offsets.
This is not a Steaming application. So, I am not sure if checkpoint directory is valid in this case.

Any pointers would be highly helpful.


thanks,
Vishal

________________________________
This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.