Process each kafka record for structured streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Process each kafka record for structured streaming

kumar.rajat20del
Hi,

I want to apply custom logic for each row of data I am getting through kafka and want to do it with microbatch.
When I am running it , it is not progressing.


kafka_stream_df \
        .writeStream \
        .foreach(process_records) \
        .outputMode("append") \
        .option("checkpointLocation", "checkpt") \
        .trigger(continuous="5 seconds").start()

Regards

Rajat


Reply | Threaded
Open this post in threaded view
|

Re: Process each kafka record for structured streaming

Jacek Laskowski
Hi,

Can you use console sink and make sure that the pipeline shows some progress?

On Wed, Jan 20, 2021 at 10:44 AM rajat kumar <[hidden email]> wrote:
Hi,

I want to apply custom logic for each row of data I am getting through kafka and want to do it with microbatch.
When I am running it , it is not progressing.


kafka_stream_df \
        .writeStream \
        .foreach(process_records) \
        .outputMode("append") \
        .option("checkpointLocation", "checkpt") \
        .trigger(continuous="5 seconds").start()

Regards

Rajat