How to scale more consumer to Kafka stream

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to scale more consumer to Kafka stream

richiesgr
Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions.
But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags.

Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU

I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get

ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] -> PartitionFetchInfo(929838589,1048576),[IA2,6] -> PartitionFetchInfo(929515796,1048576),[IA2,9] -> PartitionFetchInfo(929577946,1048576),[IA2,8] -> PartitionFetchInfo(930751599,1048576),[IA2,2] -> PartitionFetchInfo(926457704,1048576),[IA2,5] -> PartitionFetchInfo(930774385,1048576),[IA2,0] -> PartitionFetchInfo(929913213,1048576),[IA2,3] -> PartitionFetchInfo(929268891,1048576),[IA2,4] -> PartitionFetchInfo(929949877,1048576),[IA2,1] -> PartitionFetchInfo(930063114,1048576)
java.lang.OutOfMemoryError: Java heap space

Is someone have ideas ?
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: How to scale more consumer to Kafka stream

Tim Smith
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten times to create 10 parallel receivers/executors and then use "union" to combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <[hidden email]> wrote:
Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production
we've 5 consumers that share 10 partitions.
But on spark streaming kafka only 1 worker act as a consumer then distribute
the tasks to workers so I can have only 1 machine acting as consumer but I
need more because only 1 consumer means Lags.

Do you've any idea what I can do ? Another point is interresting the master
is not loaded at all I can get up more than 10 % CPU

I've tried to increase the queued.max.message.chunks on the kafka client to
read more records thinking it'll speed up the read but I only get

ERROR consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
PartitionFetchInfo(929838589,1048576),[IA2,6] ->
PartitionFetchInfo(929515796,1048576),[IA2,9] ->
PartitionFetchInfo(929577946,1048576),[IA2,8] ->
PartitionFetchInfo(930751599,1048576),[IA2,2] ->
PartitionFetchInfo(926457704,1048576),[IA2,5] ->
PartitionFetchInfo(930774385,1048576),[IA2,0] ->
PartitionFetchInfo(929913213,1048576),[IA2,3] ->
PartitionFetchInfo(929268891,1048576),[IA2,4] ->
PartitionFetchInfo(929949877,1048576),[IA2,1] ->
PartitionFetchInfo(930063114,1048576)
java.lang.OutOfMemoryError: Java heap space

Is someone have ideas ?
Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: How to scale more consumer to Kafka stream

Dibyendu Bhattacharya
Hi, 

You can use this Kafka Spark Consumer. https://github.com/dibbhatt/kafka-spark-consumer

This is exactly does that . It creates parallel Receivers for every Kafka topic partitions. You can see the Consumer.java under consumer.kafka.client package to see an example how to use it. 

There is some discussion on this Consumer you can find it here : https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689

Regards,
Dib


On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith <[hidden email]> wrote:
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten times to create 10 parallel receivers/executors and then use "union" to combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <[hidden email]> wrote:
Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production
we've 5 consumers that share 10 partitions.
But on spark streaming kafka only 1 worker act as a consumer then distribute
the tasks to workers so I can have only 1 machine acting as consumer but I
need more because only 1 consumer means Lags.

Do you've any idea what I can do ? Another point is interresting the master
is not loaded at all I can get up more than 10 % CPU

I've tried to increase the queued.max.message.chunks on the kafka client to
read more records thinking it'll speed up the read but I only get

ERROR consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
PartitionFetchInfo(929838589,1048576),[IA2,6] ->
PartitionFetchInfo(929515796,1048576),[IA2,9] ->
PartitionFetchInfo(929577946,1048576),[IA2,8] ->
PartitionFetchInfo(930751599,1048576),[IA2,2] ->
PartitionFetchInfo(926457704,1048576),[IA2,5] ->
PartitionFetchInfo(930774385,1048576),[IA2,0] ->
PartitionFetchInfo(929913213,1048576),[IA2,3] ->
PartitionFetchInfo(929268891,1048576),[IA2,4] ->
PartitionFetchInfo(929949877,1048576),[IA2,1] ->
PartitionFetchInfo(930063114,1048576)
java.lang.OutOfMemoryError: Java heap space

Is someone have ideas ?
Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: How to scale more consumer to Kafka stream

richiesgr
Thanks for all
I'm going to check both solution
Reply | Threaded
Open this post in threaded view
|

Re: How to scale more consumer to Kafka stream

maasg
In reply to this post by Tim Smith
This pattern works. 

One note, thought: Use 'union' only if you need to group the data from all RDDs into one RDD for processing (like count distinct or need a groupby). If your process can be parallelized over every stream of incoming data, I suggest you just apply the required transformations on every dstream and avoid 'union' altogether.

-kr, Gerard.



On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith <[hidden email]> wrote:
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten times to create 10 parallel receivers/executors and then use "union" to combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <[hidden email]> wrote:
Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production
we've 5 consumers that share 10 partitions.
But on spark streaming kafka only 1 worker act as a consumer then distribute
the tasks to workers so I can have only 1 machine acting as consumer but I
need more because only 1 consumer means Lags.

Do you've any idea what I can do ? Another point is interresting the master
is not loaded at all I can get up more than 10 % CPU

I've tried to increase the queued.max.message.chunks on the kafka client to
read more records thinking it'll speed up the read but I only get

ERROR consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
PartitionFetchInfo(929838589,1048576),[IA2,6] ->
PartitionFetchInfo(929515796,1048576),[IA2,9] ->
PartitionFetchInfo(929577946,1048576),[IA2,8] ->
PartitionFetchInfo(930751599,1048576),[IA2,2] ->
PartitionFetchInfo(926457704,1048576),[IA2,5] ->
PartitionFetchInfo(930774385,1048576),[IA2,0] ->
PartitionFetchInfo(929913213,1048576),[IA2,3] ->
PartitionFetchInfo(929268891,1048576),[IA2,4] ->
PartitionFetchInfo(929949877,1048576),[IA2,1] ->
PartitionFetchInfo(930063114,1048576)
java.lang.OutOfMemoryError: Java heap space

Is someone have ideas ?
Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: How to scale more consumer to Kafka stream

Dibyendu Bhattacharya
I agree Gerard. Thanks for pointing this..

Dib

On Thu, Sep 11, 2014 at 5:28 PM, Gerard Maas <[hidden email]> wrote:
This pattern works. 

One note, thought: Use 'union' only if you need to group the data from all RDDs into one RDD for processing (like count distinct or need a groupby). If your process can be parallelized over every stream of incoming data, I suggest you just apply the required transformations on every dstream and avoid 'union' altogether.

-kr, Gerard.



On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith <[hidden email]> wrote:
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten times to create 10 parallel receivers/executors and then use "union" to combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr <[hidden email]> wrote:
Hi (my previous post as been used by someone else)

I'm building a application the read from kafka stream event. In production
we've 5 consumers that share 10 partitions.
But on spark streaming kafka only 1 worker act as a consumer then distribute
the tasks to workers so I can have only 1 machine acting as consumer but I
need more because only 1 consumer means Lags.

Do you've any idea what I can do ? Another point is interresting the master
is not loaded at all I can get up more than 10 % CPU

I've tried to increase the queued.max.message.chunks on the kafka client to
read more records thinking it'll speed up the read but I only get

ERROR consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
PartitionFetchInfo(929838589,1048576),[IA2,6] ->
PartitionFetchInfo(929515796,1048576),[IA2,9] ->
PartitionFetchInfo(929577946,1048576),[IA2,8] ->
PartitionFetchInfo(930751599,1048576),[IA2,2] ->
PartitionFetchInfo(926457704,1048576),[IA2,5] ->
PartitionFetchInfo(930774385,1048576),[IA2,0] ->
PartitionFetchInfo(929913213,1048576),[IA2,3] ->
PartitionFetchInfo(929268891,1048576),[IA2,4] ->
PartitionFetchInfo(929949877,1048576),[IA2,1] ->
PartitionFetchInfo(930063114,1048576)
java.lang.OutOfMemoryError: Java heap space

Is someone have ideas ?
Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]