Spark Structured Streaming + Kafka

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Structured Streaming + Kafka

Agostino Calamita
Hi,
I have a problem with Structured Streaming and Kafka.
I have 2 brokers and a topic with 8 partitions and replication factor 2.

This is my driver program:

public static void main(String[] args)
    {
       
        SparkSession spark = SparkSession
                  .builder()
                  .appName("StreamFromKafka")
                  .config("spark.sql.streaming.minBatchesToRetain", 5)
                  .getOrCreate();
       
       
        spark.sessionState().conf().setConf( org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS() ,8);
   
        Dataset<Row> df = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "host100:9092,host200:9092")
                  .option("subscribe", "acrmonitor")
                  .option("startingOffsets", "earliest")
                  .load();
       
        String windowSize = "10 minutes";
        String slide = "10 minutes";
        String startTime = "0 minutes";
       
        Dataset<Row> df3 = df.select(
                                     get_json_object( col("value").cast("string"), "$.address").alias("address"),
                                     get_json_object( col("value").cast("string"), "$.type").alias("type"),
                                     get_json_object( col("value").cast("string"), "$.insertDate").alias("insertDate").cast("timestamp")
                                    )
                .withWatermark("insertDate", "15 minutes")
                .groupBy(
                         col("address"),
                         col("type"),
                         window(col("insertDate").cast("timestamp"), windowSize, slide , startTime)
                         )
                .count();
       
    String  chkptDir = "/tmp/checkPoint" ;
        
     StreamingQuery query = df3
                .writeStream()
                .outputMode(OutputMode.Update())
                .option("checkpointLocation", chkptDir)
                .foreach(new JDBCsink() ).trigger(Trigger.ProcessingTime(30000)).start();
          
        try {
              query.awaitTermination();
        } catch (Exception e)
        { System.err.println("ERROR: " + e.getMessage()); }
       
    spark.cloneSession();
       
    }


I use a checkpoint directory.

When I stop one kafka broker ( the other one remain alive ), driver program stops reading messages from queue and process them. I waited for more than 5 minutes.
If I restart driver program ( with one broker down, it reads and processes messages.

If I try to stop one kafka broker, ( so the driver stops to read ...) and after a few minutes I restart the broker, then driver programs begins again to read messages and process them ( without restarting driver program ).

Is there a way to permit driver program to continue reading kafka topic, when only one of broker goes down, without restart driver ?

Thanks.