Connection SparkStreaming with SchemaRegistry

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Connection SparkStreaming with SchemaRegistry

Guillermo Ortiz
I'm trying to integrate with schemaRegistry and SparkStreaming. By the moment I want to use GenericRecords. It seems that my producer works and new schemas are published in _schemas topic. When I try to read with my Consumer, I'm not able to deserialize the data. 

How could I say to Spark that I'm going to deserializer to GenericRecord?



public class SparkStreamingSchemaRegister {

    public static void main(String[] args) throws InterruptedException {
        String topic = "avro_example_schemaRegistry";

        final JavaStreamingContext jssc = new JavaStreamingContext(getSparkConf(),
                Durations.milliseconds(Constants.STREAM_BATCH_DURATION_MILLIS));


        final JavaInputDStream<ConsumerRecord<byte[], GenericRecord>> rawStream = KafkaSource.getKafkaDirectStream(jssc);

        rawStream.foreachRDD(rdd -> {
            JavaRDD<Client> javaRddClient = rdd.map(
                    kafkaRecord -> {

                        GenericRecord record = kafkaRecord.value(); --> ERROR
                        return CrmClient.getCrmClient(kafkaRecord.value());
                    });


           CassandraJavaUtil
                    .javaFunctions(javaRddClient)
                    .writerBuilder("keyspace", "client", CassandraJavaUtil.mapToRow(CrmClient.class))
                    .withColumnSelector(CassandraJavaUtil.someColumns("id", "name", "lastname"))
                    .saveToCassandra();
        });


        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }


    private static class KafkaSource {
        public static JavaInputDStream<ConsumerRecord<byte[], GenericRecord>> getKafkaDirectStream(JavaStreamingContext jssc) {
            JavaInputDStream<ConsumerRecord<byte[], GenericRecord>> stream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<byte[], GenericRecord>Subscribe(getKafkaTopic(), getKafkaConf()));
            return stream;
        }

 
        private static Map<String, Object> getKafkaConf() {
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_BOOTSTRAP_SERVERS.getValue(), Constants.KAFKA_BOOTSTRAP_SERVERS);
            kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_KEY_DESERIALIZER.getValue(), ByteArrayDeserializer.class);
            kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_GROUPID.getValue(), Constants.KAFKA_GROUP_ID);
            kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_ENABLE_AUTO_COMMIT.getValue(), false);
            kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_AUTO_OFFSET_RESET.getValue(), "earliest");
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class.getName());
            kafkaParams.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
            kafkaParams.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

            return kafkaParams;
        }

    }
}