Structured streaming from Kafka by timestamp

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured streaming from Kafka by timestamp

Tomas Bartalos
Hello,

I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range:

select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


The problem is that timestamp query is not pushed-down to Kafka, so Spark tries to read the whole topic from beginning.


explain query:

....

         +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 1535148000000000)) && (timestamp#57 < 1535234400000000))


Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] PushedFilters: [], ReadSchema: struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,times...


Obviously the query takes forever to complete. Is there a solution to this ?

I'm using kafka and kafka-client version 1.1.1


BR,

Tomas

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming from Kafka by timestamp

Gabor Somogyi
Hi Tomas,

As a general note don't fully understand your use-case. You've mentioned structured streaming but your query is more like a one-time SQL statement.
Kafka doesn't support predicates how it's integrated with spark. What can be done from spark perspective is to look for an offset for a specific lowest timestamp and start the reading from there.

BR,
G


On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos <[hidden email]> wrote:
Hello,

I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range:

select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


The problem is that timestamp query is not pushed-down to Kafka, so Spark tries to read the whole topic from beginning.


explain query:

....

         +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 1535148000000000)) && (timestamp#57 < 1535234400000000))


Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] PushedFilters: [], ReadSchema: struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,times...


Obviously the query takes forever to complete. Is there a solution to this ?

I'm using kafka and kafka-client version 1.1.1


BR,

Tomas

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming from Kafka by timestamp

Shixiong(Ryan) Zhu
Hey Tomas,

From your description, you just ran a batch query rather than a Structured Streaming query. The Kafka data source doesn't support filter push down right now. But that's definitely doable. One workaround here is setting proper  "startingOffsets" and "endingOffsets" options when loading from Kafka. 

Best Regards,

Ryan


On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi <[hidden email]> wrote:
Hi Tomas,

As a general note don't fully understand your use-case. You've mentioned structured streaming but your query is more like a one-time SQL statement.
Kafka doesn't support predicates how it's integrated with spark. What can be done from spark perspective is to look for an offset for a specific lowest timestamp and start the reading from there.

BR,
G


On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos <[hidden email]> wrote:
Hello,

I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range:

select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


The problem is that timestamp query is not pushed-down to Kafka, so Spark tries to read the whole topic from beginning.


explain query:

....

         +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 1535148000000000)) && (timestamp#57 < 1535234400000000))


Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] PushedFilters: [], ReadSchema: struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,times...


Obviously the query takes forever to complete. Is there a solution to this ?

I'm using kafka and kafka-client version 1.1.1


BR,

Tomas

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming from Kafka by timestamp

Tomas Bartalos
Hello,

sorry for my late answer.
You're right, what I'm doing is a one time query, not a structured streaming. Probably it will be best to describe my use case:
I'd like to expose live data (via jdbc/odbc) residing in Kafka with the power of spark's distributed sql engine. As jdbc server I use spark thrift server.
Since timestamp pushdown is not possible :-(, this is a very cumbersome task.
Let's say I want to inspect last 5 minutes of kafka. First I have to find out offsetFrom per each partition that corresponds to now() - 5 minutes.
Then I can register a kafka table:

CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers 'server1,server2,...', 

subscribe 'my_topic',

startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" : 49456213, "3" : 48400521}}');


Then I can issue queries against this table (Data in Kafka is stored in Avro format but I've created custom genericUDF to deserialize the data).

select event.id as id, explode(event.picks) as picks from (

select from_avro(value) as event from ticket_kafka_x where timestamp > from_unixtime(unix_timestamp() - 5 * 60, "YYYY-MM-dd HH:mm:ss")

) limit 100;


Whats even more irritating after few minutes I have to re-create this table to reflect the last 5 minutes interval, otherwise the query performance would suffer from increasing data to filter.

Colleague of mine was able to make direct queries with timestamp pushdown in latest Hive.
How difficult is it to implement this feature in spark, could you lead me to code where I could have a look ?

Thank you,


pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu <[hidden email]> napísal(a):
Hey Tomas,

From your description, you just ran a batch query rather than a Structured Streaming query. The Kafka data source doesn't support filter push down right now. But that's definitely doable. One workaround here is setting proper  "startingOffsets" and "endingOffsets" options when loading from Kafka. 

Best Regards,

Ryan


On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi <[hidden email]> wrote:
Hi Tomas,

As a general note don't fully understand your use-case. You've mentioned structured streaming but your query is more like a one-time SQL statement.
Kafka doesn't support predicates how it's integrated with spark. What can be done from spark perspective is to look for an offset for a specific lowest timestamp and start the reading from there.

BR,
G


On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos <[hidden email]> wrote:
Hello,

I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range:

select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


The problem is that timestamp query is not pushed-down to Kafka, so Spark tries to read the whole topic from beginning.


explain query:

....

         +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 1535148000000000)) && (timestamp#57 < 1535234400000000))


Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] PushedFilters: [], ReadSchema: struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,times...


Obviously the query takes forever to complete. Is there a solution to this ?

I'm using kafka and kafka-client version 1.1.1


BR,

Tomas