|
|
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)]
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
oh, I didn't see that Map, it is weird :/
I did this and it works : case class Header(key: String, value: Array[Byte])
import spark.implicits._ val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("includeHeaders", "true") .option("startingOffsets", "earliest") .load()
kafkaDF .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header]) .writeStream .format("console") .option("truncate", false) .start() .awaitTermination() Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.
I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.
On Fri, Dec 4, 2020 at 4:09 PM German Schiavon < [hidden email]> wrote: oh, I didn't see that Map, it is weird :/
I did this and it works : case class Header(key: String, value: Array[Byte])
import spark.implicits._ val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("includeHeaders", "true") .option("startingOffsets", "earliest") .load()
kafkaDF .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header]) .writeStream .format("console") .option("truncate", false) .start() .awaitTermination()
Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
I could change it , to something more generic like ?
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") .as[(String, String, Array[(String, Array[Byte])])] Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.
I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.
On Fri, Dec 4, 2020 at 4:09 PM German Schiavon < [hidden email]> wrote: oh, I didn't see that Map, it is weird :/
I did this and it works : case class Header(key: String, value: Array[Byte])
import spark.implicits._ val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("includeHeaders", "true") .option("startingOffsets", "earliest") .load()
kafkaDF .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header]) .writeStream .format("console") .option("truncate", false) .start() .awaitTermination()
Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
Yeah that would be great, once you manually verified it. Thanks! On Fri, Dec 4, 2020 at 5:54 PM German Schiavon < [hidden email]> wrote: I could change it , to something more generic like ?
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") .as[(String, String, Array[(String, Array[Byte])])]
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.
I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.
On Fri, Dec 4, 2020 at 4:09 PM German Schiavon < [hidden email]> wrote: oh, I didn't see that Map, it is weird :/
I did this and it works : case class Header(key: String, value: Array[Byte])
import spark.implicits._ val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("includeHeaders", "true") .option("startingOffsets", "earliest") .load()
kafkaDF .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header]) .writeStream .format("console") .option("truncate", false) .start() .awaitTermination()
Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|
Yep I've tried this morning: Yeah that would be great, once you manually verified it. Thanks!
On Fri, Dec 4, 2020 at 5:54 PM German Schiavon < [hidden email]> wrote: I could change it , to something more generic like ?
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") .as[(String, String, Array[(String, Array[Byte])])]
Probably it's missing - that was Map from the initial proposal and changed to Array during review. We look to miss updating the doc in sync during review.
I'd love to see a volunteer to fix the doc. If there's no volunteer I might be able to take a look.
On Fri, Dec 4, 2020 at 4:09 PM German Schiavon < [hidden email]> wrote: oh, I didn't see that Map, it is weird :/
I did this and it works : case class Header(key: String, value: Array[Byte])
import spark.implicits._ val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("includeHeaders", "true") .option("startingOffsets", "earliest") .load()
kafkaDF .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header]) .writeStream .format("console") .option("truncate", false) .start() .awaitTermination()
Hi German, unfortunately the documentation does not help.
On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote: Hello,
see if this works, from the documentation:
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)
I am actually wondering what this code is doing since using Map without key and value type is rather strange. And this is actually what I get when I am trying to do this
The error messages make totally sense. Now I can try to add the type parameters to Map, assuming that the key is a String type and the value a byte array, in this case I get
What seems to work is this.
I will try to continue in this direction. I'll let you know about my findings. The documentation is in any case wrong.
Hi folks, I am trying to read the message headers from a Kafka structured stream which should be stored in a column named ``headers``. I try something like this:
val stream = sparkSession.readStream.format("kafka")......load() stream.map(row => { ... val headers = row.getAs[HeaderT]("headers") .... })
My question is: what would be HeaderT?
Thanks in advance
Eugen
|
|