Spark Structured Streaming + Kafka

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Spark Structured Streaming + Kafka

Agostino Calamita
I have a problem with Structured Streaming and Kafka.
I have 2 brokers and a topic with 8 partitions and replication factor 2.

This is my driver program:

public static void main(String[] args)
        SparkSession spark = SparkSession
                  .config("spark.sql.streaming.minBatchesToRetain", 5)
        spark.sessionState().conf().setConf( org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS() ,8);
        Dataset<Row> df = spark
                  .option("kafka.bootstrap.servers", "host100:9092,host200:9092")
                  .option("subscribe", "acrmonitor")
                  .option("startingOffsets", "earliest")
        String windowSize = "10 minutes";
        String slide = "10 minutes";
        String startTime = "0 minutes";
        Dataset<Row> df3 =
                                     get_json_object( col("value").cast("string"), "$.address").alias("address"),
                                     get_json_object( col("value").cast("string"), "$.type").alias("type"),
                                     get_json_object( col("value").cast("string"), "$.insertDate").alias("insertDate").cast("timestamp")
                .withWatermark("insertDate", "15 minutes")
                         window(col("insertDate").cast("timestamp"), windowSize, slide , startTime)
    String  chkptDir = "/tmp/checkPoint" ;
     StreamingQuery query = df3
                .option("checkpointLocation", chkptDir)
                .foreach(new JDBCsink() ).trigger(Trigger.ProcessingTime(30000)).start();
        try {
        } catch (Exception e)
        { System.err.println("ERROR: " + e.getMessage()); }

I use a checkpoint directory.

When I stop one kafka broker ( the other one remain alive ), driver program stops reading messages from queue and process them. I waited for more than 5 minutes.
If I restart driver program ( with one broker down, it reads and processes messages.

If I try to stop one kafka broker, ( so the driver stops to read ...) and after a few minutes I restart the broker, then driver programs begins again to read messages and process them ( without restarting driver program ).

Is there a way to permit driver program to continue reading kafka topic, when only one of broker goes down, without restart driver ?