[Structured Streaming] Connecting to Kafka via a Custom Consumer / Producer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Structured Streaming] Connecting to Kafka via a Custom Consumer / Producer


The large international bank I work for has a custom Kafka implementation.  The client libraries that are used to connect to Kafka have extra security steps.  They implement the Kafka Consumer and Producer interfaces in this client library so once we use it to connect to Kafka, we can treat our connections as the standard Kafka interfaces in our code.

We can't use the out-of-the-box Kafka connecter from Structured Streaming as only a KafkaConsumer can be used.

Would it be possible / advisable / a good idea to change this to use the Consumer interface and allow users to specify a callback somehow to create their own connection to Kafka?

So the signature of this private method in InternalKafkaConsumer would change to use the Consumer interface (as would the rest of the code base) and somehow users are given the option to create their own Consumer if they wanted.  The same would apply for Producers. 
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParamsWithSecurity)
At the moment we are left with two options, copy the Spark code base and swap in our custom Consumer for the KafkaConsumer used in that function (and a few other changes).  This leaves us with a codebase to maintain that will be out of sync over time.  Or we can build and maintain our own custom connecter.

Bet regards,