Kafka structured straming - how to read headers

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka structured straming - how to read headers

eugen.wintersberger
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen
Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

German Schiavon Matteo
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)]

On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen
Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

eugen.wintersberger
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen

Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

German Schiavon Matteo
oh, I didn't see that Map, it is weird :/

I did this and it works :
case class Header(key: String, value: Array[Byte])
import spark.implicits._
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()

kafkaDF
.select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

On Thu, 3 Dec 2020 at 23:11, <[hidden email]> wrote:
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen

Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

Jungtaek Lim-2
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.

I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.

On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <[hidden email]> wrote:
oh, I didn't see that Map, it is weird :/

I did this and it works :
case class Header(key: String, value: Array[Byte])
import spark.implicits._
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()

kafkaDF
.select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

On Thu, 3 Dec 2020 at 23:11, <[hidden email]> wrote:
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen

Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

German Schiavon Matteo
I could change it , to something more generic like ?

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]

On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <[hidden email]> wrote:
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.

I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.

On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <[hidden email]> wrote:
oh, I didn't see that Map, it is weird :/

I did this and it works :
case class Header(key: String, value: Array[Byte])
import spark.implicits._
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()

kafkaDF
.select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

On Thu, 3 Dec 2020 at 23:11, <[hidden email]> wrote:
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen

Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

Jungtaek Lim-2
Yeah that would be great, once you manually verified it. Thanks!

On Fri, Dec 4, 2020 at 5:54 PM German Schiavon <[hidden email]> wrote:
I could change it , to something more generic like ?

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]

On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <[hidden email]> wrote:
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.

I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.

On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <[hidden email]> wrote:
oh, I didn't see that Map, it is weird :/

I did this and it works :
case class Header(key: String, value: Array[Byte])
import spark.implicits._
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()

kafkaDF
.select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

On Thu, 3 Dec 2020 at 23:11, <[hidden email]> wrote:
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen

Reply | Threaded
Open this post in threaded view
|

Re: Kafka structured straming - how to read headers

German Schiavon Matteo
Yep I've tried this morning:

Captura de pantalla 2020-12-04 a las 15.42.09.png

On Fri, 4 Dec 2020 at 13:25, Jungtaek Lim <[hidden email]> wrote:
Yeah that would be great, once you manually verified it. Thanks!

On Fri, Dec 4, 2020 at 5:54 PM German Schiavon <[hidden email]> wrote:
I could change it , to something more generic like ?

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]

On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <[hidden email]> wrote:
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.

I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.

On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <[hidden email]> wrote:
oh, I didn't see that Map, it is weird :/

I did this and it works :
case class Header(key: String, value: Array[Byte])
import spark.implicits._
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()

kafkaDF
.select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

On Thu, 3 Dec 2020 at 23:11, <[hidden email]> wrote:
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
Hello, 

see if this works, from the documentation:


// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this


The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.


On Thu, 3 Dec 2020 at 18:22, <[hidden email]> wrote:
Hi folks,
  I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. 
I try something like this:

val stream = sparkSession.readStream.format("kafka")......load()
stream.map(row => { 
 ...
 val headers = row.getAs[HeaderT]("headers")
....
})

My question is: what would be HeaderT?

Thanks in advance

 Eugen