Getting RabbitMQ Message Delivery Tag (Stratio/spark-rabbitmq)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Getting RabbitMQ Message Delivery Tag (Stratio/spark-rabbitmq)

Daniel de Oliveira Mantovani
Hello,

I'm using Stratio/spark-rabbitmq to read messages from RabbitMQ and save to Kafka, and I just want "commit" the RabbitMQ message when it's safe on Kafka's broker. For efficiency propose I'm using Kafka buffer and a call back object, which should has the RabbitMQ message Delivery Tag to acknowledge proper on RabbitMQ. 

I couldn't find an interface on Stratio/spark-rabbitmq to get the delivery tag:

val stream = createRabbitStream(ssc);


stream.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
val kafkaOpTopic = "evil_queue"
val broker = getKafka()
partition.foreach( record => {
val data = record.toString
val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
broker.send(message);
} )
broker.close()
})
})

Basic, the  variable "record" contains the message it self and not an object with the RabbitMQ message structure, which would include the delivery tag. I really need the delivery tag to write an efficient and safe reader. 
Someone knows how to get the delivery tag ? Or should I use other library to read from RabbitMQ ?

Thank you



--

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341