Multiple Kafka Receivers and Union

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple Kafka Receivers and Union

Matt Narrell
Hey,

Spark 1.1.0
Kafka 0.8.1.1
Hadoop (YARN/HDFS) 2.5.1

I have a five partition Kafka topic.  I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine.  Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver.  I’m following something like this:  https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132  But for Kafka obviously.  From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s)."

However, I’m not able to consume any messages from Kafka after I perform the union operation.  Again, if I create a single, multi-threaded, receiver I can consume messages fine.  If I create 5 receivers in a loop, and call jssc.union(…) i get:

INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks

Do I need to do anything to the unioned DStream?  Am I going about this incorrectly?

Thanks in advance.

Matt
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Tim Smith
Posting your code would be really helpful in figuring out gotchas.

On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:

> Hey,
>
> Spark 1.1.0
> Kafka 0.8.1.1
> Hadoop (YARN/HDFS) 2.5.1
>
> I have a five partition Kafka topic.  I can create a single Kafka receiver
> via KafkaUtils.createStream with five threads in the topic map and consume
> messages fine.  Sifting through the user list and Google, I see that its
> possible to split the Kafka receiver among the Spark workers such that I can
> have a receiver per topic, and have this distributed to workers rather than
> localized to the driver.  I’m following something like this:
> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
> multiple data streams can therefore be achieved by creating multiple input
> DStreams and configuring them to receive different partitions of the data
> stream from the source(s)."
>
> However, I’m not able to consume any messages from Kafka after I perform the
> union operation.  Again, if I create a single, multi-threaded, receiver I
> can consume messages fine.  If I create 5 receivers in a loop, and call
> jssc.union(…) i get:
>
> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>
>
> Do I need to do anything to the unioned DStream?  Am I going about this
> incorrectly?
>
> Thanks in advance.
>
> Matt

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.unpersist", "true");
sparkConf.set("spark.logConf", "true");

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
kafkaProps.put("group.id", groupId);

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
jsc.checkpoint("hdfs://<some_location>");

List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);

for (int i = 0; i < 5; i++) {
    streamList.add(KafkaUtils.createStream(jsc,
                                           String.class, ProtobufModel.class,
                                           StringDecoder.class, ProtobufModelDecoder.class,
                                           kafkaProps,
                                           Collections.singletonMap(topic, 1),
                                           StorageLevel.MEMORY_ONLY_SER()));
}

final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

//  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
//                  KafkaUtils.createStream(jsc,
//                                          String.class, ProtobufModel.class,
//                                          StringDecoder.class, ProtobufModelDecoder.class,
//                                          kafkaProps,
//                                          Collections.singletonMap(topic, 5),
//                                          StorageLevel.MEMORY_ONLY_SER());

final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
        new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
                return new Tuple2<>(tuple._2().getDeviceId(), 1);
            }
        });

… and futher Spark functions ...

On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:

> Posting your code would be really helpful in figuring out gotchas.
>
> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>> Hey,
>>
>> Spark 1.1.0
>> Kafka 0.8.1.1
>> Hadoop (YARN/HDFS) 2.5.1
>>
>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>> via KafkaUtils.createStream with five threads in the topic map and consume
>> messages fine.  Sifting through the user list and Google, I see that its
>> possible to split the Kafka receiver among the Spark workers such that I can
>> have a receiver per topic, and have this distributed to workers rather than
>> localized to the driver.  I’m following something like this:
>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>> multiple data streams can therefore be achieved by creating multiple input
>> DStreams and configuring them to receive different partitions of the data
>> stream from the source(s)."
>>
>> However, I’m not able to consume any messages from Kafka after I perform the
>> union operation.  Again, if I create a single, multi-threaded, receiver I
>> can consume messages fine.  If I create 5 receivers in a loop, and call
>> jssc.union(…) i get:
>>
>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>
>>
>> Do I need to do anything to the unioned DStream?  Am I going about this
>> incorrectly?
>>
>> Thanks in advance.
>>
>> Matt
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Tim Smith
Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
-> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:

> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map<String, String> kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
> jsc.checkpoint("hdfs://<some_location>");
>
> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
>     streamList.add(KafkaUtils.createStream(jsc,
>                                            String.class, ProtobufModel.class,
>                                            StringDecoder.class, ProtobufModelDecoder.class,
>                                            kafkaProps,
>                                            Collections.singletonMap(topic, 1),
>                                            StorageLevel.MEMORY_ONLY_SER()));
> }
>
> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>
> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
> //                  KafkaUtils.createStream(jsc,
> //                                          String.class, ProtobufModel.class,
> //                                          StringDecoder.class, ProtobufModelDecoder.class,
> //                                          kafkaProps,
> //                                          Collections.singletonMap(topic, 5),
> //                                          StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>         new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>             @Override
>             public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>                 return new Tuple2<>(tuple._2().getDeviceId(), 1);
>             }
>         });
>
> … and futher Spark functions ...
>
> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>
>> Posting your code would be really helpful in figuring out gotchas.
>>
>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>> Hey,
>>>
>>> Spark 1.1.0
>>> Kafka 0.8.1.1
>>> Hadoop (YARN/HDFS) 2.5.1
>>>
>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>> messages fine.  Sifting through the user list and Google, I see that its
>>> possible to split the Kafka receiver among the Spark workers such that I can
>>> have a receiver per topic, and have this distributed to workers rather than
>>> localized to the driver.  I’m following something like this:
>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>> multiple data streams can therefore be achieved by creating multiple input
>>> DStreams and configuring them to receive different partitions of the data
>>> stream from the source(s)."
>>>
>>> However, I’m not able to consume any messages from Kafka after I perform the
>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>> jssc.union(…) i get:
>>>
>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>
>>>
>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>> incorrectly?
>>>
>>> Thanks in advance.
>>>
>>> Matt
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)

Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.

Thanks,
Matt

On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:

> Sorry, I am almost Java illiterate but here's my Scala code to do the
> equivalent (that I have tested to work):
>
> val kInStreams = (1 to 10).map{_ =>
> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
> across the cluster, one for each partition, potentially but active
> receivers are only as many kafka partitions you have
>
> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>
>
>
> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>>
>> SparkConf sparkConf = new SparkConf();
>> sparkConf.set("spark.streaming.unpersist", "true");
>> sparkConf.set("spark.logConf", "true");
>>
>> Map<String, String> kafkaProps = new HashMap<>();
>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>> kafkaProps.put("group.id", groupId);
>>
>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>> jsc.checkpoint("hdfs://<some_location>");
>>
>> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>>
>> for (int i = 0; i < 5; i++) {
>>    streamList.add(KafkaUtils.createStream(jsc,
>>                                           String.class, ProtobufModel.class,
>>                                           StringDecoder.class, ProtobufModelDecoder.class,
>>                                           kafkaProps,
>>                                           Collections.singletonMap(topic, 1),
>>                                           StorageLevel.MEMORY_ONLY_SER()));
>> }
>>
>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>
>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>> //                  KafkaUtils.createStream(jsc,
>> //                                          String.class, ProtobufModel.class,
>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>> //                                          kafkaProps,
>> //                                          Collections.singletonMap(topic, 5),
>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>
>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>        new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>            @Override
>>            public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>>                return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>            }
>>        });
>>
>> … and futher Spark functions ...
>>
>> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>>
>>> Posting your code would be really helpful in figuring out gotchas.
>>>
>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>>> Hey,
>>>>
>>>> Spark 1.1.0
>>>> Kafka 0.8.1.1
>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>
>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>> possible to split the Kafka receiver among the Spark workers such that I can
>>>> have a receiver per topic, and have this distributed to workers rather than
>>>> localized to the driver.  I’m following something like this:
>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>> multiple data streams can therefore be achieved by creating multiple input
>>>> DStreams and configuring them to receive different partitions of the data
>>>> stream from the source(s)."
>>>>
>>>> However, I’m not able to consume any messages from Kafka after I perform the
>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>> jssc.union(…) i get:
>>>>
>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>
>>>>
>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>> incorrectly?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Matt
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> For additional commands, e-mail: [hidden email]
>>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Tim Smith
Maybe post the before-code as in what was the code before you did the
loop (that worked)? I had similar situations where reviewing code
before (worked) and after (does not work) helped. Also, what helped is
the Scala REPL because I can see what are the object types being
returned by each statement.

Other than code, in the driver logs, you should see events that say
"Registered receiver for stream 0 from
akka.tcp://[hidden email]:53135"

Now, if you goto "node5" and look at Spark or YarnContainer logs
(depending on who's doing RM), you should be able to see if the
receiver has any errors when trying to talk to kafka.



On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:

> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:
>
>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>> equivalent (that I have tested to work):
>>
>> val kInStreams = (1 to 10).map{_ =>
>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>> across the cluster, one for each partition, potentially but active
>> receivers are only as many kafka partitions you have
>>
>> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>
>>
>>
>>
>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
>>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>>>
>>> SparkConf sparkConf = new SparkConf();
>>> sparkConf.set("spark.streaming.unpersist", "true");
>>> sparkConf.set("spark.logConf", "true");
>>>
>>> Map<String, String> kafkaProps = new HashMap<>();
>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>> kafkaProps.put("group.id", groupId);
>>>
>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>>> jsc.checkpoint("hdfs://<some_location>");
>>>
>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>>>
>>> for (int i = 0; i < 5; i++) {
>>>    streamList.add(KafkaUtils.createStream(jsc,
>>>                                           String.class, ProtobufModel.class,
>>>                                           StringDecoder.class, ProtobufModelDecoder.class,
>>>                                           kafkaProps,
>>>                                           Collections.singletonMap(topic, 1),
>>>                                           StorageLevel.MEMORY_ONLY_SER()));
>>> }
>>>
>>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>
>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>> //                  KafkaUtils.createStream(jsc,
>>> //                                          String.class, ProtobufModel.class,
>>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>>> //                                          kafkaProps,
>>> //                                          Collections.singletonMap(topic, 5),
>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>
>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>        new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>            @Override
>>>            public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>>>                return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>            }
>>>        });
>>>
>>> … and futher Spark functions ...
>>>
>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>>>
>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>
>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>>>> Hey,
>>>>>
>>>>> Spark 1.1.0
>>>>> Kafka 0.8.1.1
>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>
>>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>> possible to split the Kafka receiver among the Spark workers such that I can
>>>>> have a receiver per topic, and have this distributed to workers rather than
>>>>> localized to the driver.  I’m following something like this:
>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>>> multiple data streams can therefore be achieved by creating multiple input
>>>>> DStreams and configuring them to receive different partitions of the data
>>>>> stream from the source(s)."
>>>>>
>>>>> However, I’m not able to consume any messages from Kafka after I perform the
>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>> jssc.union(…) i get:
>>>>>
>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>
>>>>>
>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>> incorrectly?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> Matt
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [hidden email]
>>>> For additional commands, e-mail: [hidden email]
>>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
The part that works is the commented out, single receiver stream below the loop.  It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.  I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:

> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
>
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://[hidden email]:53135"
>
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
>
>
>
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:
>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)
>>
>> Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.
>>
>> Thanks,
>> Matt
>>
>> On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:
>>
>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>> equivalent (that I have tested to work):
>>>
>>> val kInStreams = (1 to 10).map{_ =>
>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>> across the cluster, one for each partition, potentially but active
>>> receivers are only as many kafka partitions you have
>>>
>>> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>
>>>
>>>
>>>
>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
>>>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>>>>
>>>> SparkConf sparkConf = new SparkConf();
>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>> sparkConf.set("spark.logConf", "true");
>>>>
>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>> kafkaProps.put("group.id", groupId);
>>>>
>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>
>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>>>>
>>>> for (int i = 0; i < 5; i++) {
>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>                                          String.class, ProtobufModel.class,
>>>>                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>>                                          kafkaProps,
>>>>                                          Collections.singletonMap(topic, 1),
>>>>                                          StorageLevel.MEMORY_ONLY_SER()));
>>>> }
>>>>
>>>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>
>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>> //                  KafkaUtils.createStream(jsc,
>>>> //                                          String.class, ProtobufModel.class,
>>>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>> //                                          kafkaProps,
>>>> //                                          Collections.singletonMap(topic, 5),
>>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>>
>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>       new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>           @Override
>>>>           public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>>>>               return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>           }
>>>>       });
>>>>
>>>> … and futher Spark functions ...
>>>>
>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>>>>
>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>
>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>>>>> Hey,
>>>>>>
>>>>>> Spark 1.1.0
>>>>>> Kafka 0.8.1.1
>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>
>>>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>>> possible to split the Kafka receiver among the Spark workers such that I can
>>>>>> have a receiver per topic, and have this distributed to workers rather than
>>>>>> localized to the driver.  I’m following something like this:
>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>>>> multiple data streams can therefore be achieved by creating multiple input
>>>>>> DStreams and configuring them to receive different partitions of the data
>>>>>> stream from the source(s)."
>>>>>>
>>>>>> However, I’m not able to consume any messages from Kafka after I perform the
>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>> jssc.union(…) i get:
>>>>>>
>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>
>>>>>>
>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>> incorrectly?
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Matt
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: [hidden email]
>>>>> For additional commands, e-mail: [hidden email]
>>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> For additional commands, e-mail: [hidden email]
>>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Tim Smith
Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <[hidden email]> wrote:

> The part that works is the commented out, single receiver stream below the loop.  It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:
>
>> Maybe post the before-code as in what was the code before you did the
>> loop (that worked)? I had similar situations where reviewing code
>> before (worked) and after (does not work) helped. Also, what helped is
>> the Scala REPL because I can see what are the object types being
>> returned by each statement.
>>
>> Other than code, in the driver logs, you should see events that say
>> "Registered receiver for stream 0 from
>> akka.tcp://[hidden email]:53135"
>>
>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>> (depending on who's doing RM), you should be able to see if the
>> receiver has any errors when trying to talk to kafka.
>>
>>
>>
>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:
>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)
>>>
>>> Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:
>>>
>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>> equivalent (that I have tested to work):
>>>>
>>>> val kInStreams = (1 to 10).map{_ =>
>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>> across the cluster, one for each partition, potentially but active
>>>> receivers are only as many kafka partitions you have
>>>>
>>>> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>>>>>
>>>>> SparkConf sparkConf = new SparkConf();
>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>> sparkConf.set("spark.logConf", "true");
>>>>>
>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>> kafkaProps.put("group.id", groupId);
>>>>>
>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>
>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>>>>>
>>>>> for (int i = 0; i < 5; i++) {
>>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>>                                          String.class, ProtobufModel.class,
>>>>>                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>>>                                          kafkaProps,
>>>>>                                          Collections.singletonMap(topic, 1),
>>>>>                                          StorageLevel.MEMORY_ONLY_SER()));
>>>>> }
>>>>>
>>>>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>
>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>> //                  KafkaUtils.createStream(jsc,
>>>>> //                                          String.class, ProtobufModel.class,
>>>>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>>> //                                          kafkaProps,
>>>>> //                                          Collections.singletonMap(topic, 5),
>>>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>>>
>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>       new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>>           @Override
>>>>>           public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>>>>>               return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>           }
>>>>>       });
>>>>>
>>>>> … and futher Spark functions ...
>>>>>
>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>>>>>
>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>
>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>>>>>> Hey,
>>>>>>>
>>>>>>> Spark 1.1.0
>>>>>>> Kafka 0.8.1.1
>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>
>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>>>> possible to split the Kafka receiver among the Spark workers such that I can
>>>>>>> have a receiver per topic, and have this distributed to workers rather than
>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>>>>> multiple data streams can therefore be achieved by creating multiple input
>>>>>>> DStreams and configuring them to receive different partitions of the data
>>>>>>> stream from the source(s)."
>>>>>>>
>>>>>>> However, I’m not able to consume any messages from Kafka after I perform the
>>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>>> jssc.union(…) i get:
>>>>>>>
>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>
>>>>>>>
>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>>> incorrectly?
>>>>>>>
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>> Matt
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: [hidden email]
>>>>>> For additional commands, e-mail: [hidden email]
>>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [hidden email]
>>>> For additional commands, e-mail: [hidden email]
>>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
I suppose I have other problems as I can’t get the Scala example to work either.  Puzzling, as I have literally coded like the examples (that are purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith <[hidden email]> wrote:

> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <[hidden email]> wrote:
>> The part that works is the commented out, single receiver stream below the loop.  It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.
>>
>> I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.  I’ll have to look closer.
>>
>> mn
>>
>> On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:
>>
>>> Maybe post the before-code as in what was the code before you did the
>>> loop (that worked)? I had similar situations where reviewing code
>>> before (worked) and after (does not work) helped. Also, what helped is
>>> the Scala REPL because I can see what are the object types being
>>> returned by each statement.
>>>
>>> Other than code, in the driver logs, you should see events that say
>>> "Registered receiver for stream 0 from
>>> akka.tcp://[hidden email]:53135"
>>>
>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>> (depending on who's doing RM), you should be able to see if the
>>> receiver has any errors when trying to talk to kafka.
>>>
>>>
>>>
>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:
>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)
>>>>
>>>> Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.
>>>>
>>>> Thanks,
>>>> Matt
>>>>
>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:
>>>>
>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>> equivalent (that I have tested to work):
>>>>>
>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>> across the cluster, one for each partition, potentially but active
>>>>> receivers are only as many kafka partitions you have
>>>>>
>>>>> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.
>>>>>>
>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>>
>>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>> kafkaProps.put("group.id", groupId);
>>>>>>
>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>>
>>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>>>>>>
>>>>>> for (int i = 0; i < 5; i++) {
>>>>>>  streamList.add(KafkaUtils.createStream(jsc,
>>>>>>                                         String.class, ProtobufModel.class,
>>>>>>                                         StringDecoder.class, ProtobufModelDecoder.class,
>>>>>>                                         kafkaProps,
>>>>>>                                         Collections.singletonMap(topic, 1),
>>>>>>                                         StorageLevel.MEMORY_ONLY_SER()));
>>>>>> }
>>>>>>
>>>>>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>>
>>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>>> //                  KafkaUtils.createStream(jsc,
>>>>>> //                                          String.class, ProtobufModel.class,
>>>>>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>>>> //                                          kafkaProps,
>>>>>> //                                          Collections.singletonMap(topic, 5),
>>>>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>>>>
>>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>>      new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>>>          @Override
>>>>>>          public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
>>>>>>              return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>>          }
>>>>>>      });
>>>>>>
>>>>>> … and futher Spark functions ...
>>>>>>
>>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>>>>>>
>>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>>
>>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> Spark 1.1.0
>>>>>>>> Kafka 0.8.1.1
>>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>>
>>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>>>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>>>>> possible to split the Kafka receiver among the Spark workers such that I can
>>>>>>>> have a receiver per topic, and have this distributed to workers rather than
>>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>>>>>> multiple data streams can therefore be achieved by creating multiple input
>>>>>>>> DStreams and configuring them to receive different partitions of the data
>>>>>>>> stream from the source(s)."
>>>>>>>>
>>>>>>>> However, I’m not able to consume any messages from Kafka after I perform the
>>>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>>>> jssc.union(…) i get:
>>>>>>>>
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>>
>>>>>>>>
>>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>>>> incorrectly?
>>>>>>>>
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>> Matt
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: [hidden email]
>>>>>>> For additional commands, e-mail: [hidden email]
>>>>>>>
>>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: [hidden email]
>>>>> For additional commands, e-mail: [hidden email]
>>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> For additional commands, e-mail: [hidden email]
>>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
Tim,

I think I understand this now.  I had a five node Spark cluster and a five partition topic, and I created five receivers.  I found this:  http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing.  If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages.

mn

On Sep 25, 2014, at 10:18 AM, Matt Narrell <[hidden email]> wrote:

I suppose I have other problems as I can’t get the Scala example to work either.  Puzzling, as I have literally coded like the examples (that are purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith <[hidden email]> wrote:

Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <[hidden email]> wrote:
The part that works is the commented out, single receiver stream below the loop.  It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.  I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:

Maybe post the before-code as in what was the code before you did the
loop (that worked)? I had similar situations where reviewing code
before (worked) and after (does not work) helped. Also, what helped is
the Scala REPL because I can see what are the object types being
returned by each statement.

Other than code, in the driver logs, you should see events that say
"Registered receiver for stream 0 from
<a href="akka.tcp://spark@node5.acme.net:53135">akka.tcp://spark@...:53135"

Now, if you goto "node5" and look at Spark or YarnContainer logs
(depending on who's doing RM), you should be able to see if the
receiver has any errors when trying to talk to kafka.



On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:
To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)

Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.

Thanks,
Matt

On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:

Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
-> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.unpersist", "true");
sparkConf.set("spark.logConf", "true");

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
kafkaProps.put("group.id", groupId);

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
jsc.checkpoint("hdfs://<some_location>");

List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);

for (int i = 0; i < 5; i++) {
streamList.add(KafkaUtils.createStream(jsc,
                                       String.class, ProtobufModel.class,
                                       StringDecoder.class, ProtobufModelDecoder.class,
                                       kafkaProps,
                                       Collections.singletonMap(topic, 1),
                                       StorageLevel.MEMORY_ONLY_SER()));
}

final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

//  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
//                  KafkaUtils.createStream(jsc,
//                                          String.class, ProtobufModel.class,
//                                          StringDecoder.class, ProtobufModelDecoder.class,
//                                          kafkaProps,
//                                          Collections.singletonMap(topic, 5),
//                                          StorageLevel.MEMORY_ONLY_SER());

final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
    new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
            return new Tuple2<>(tuple._2().getDeviceId(), 1);
        }
    });

… and futher Spark functions ...

On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:

Posting your code would be really helpful in figuring out gotchas.

On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
Hey,

Spark 1.1.0
Kafka 0.8.1.1
Hadoop (YARN/HDFS) 2.5.1

I have a five partition Kafka topic.  I can create a single Kafka receiver
via KafkaUtils.createStream with five threads in the topic map and consume
messages fine.  Sifting through the user list and Google, I see that its
possible to split the Kafka receiver among the Spark workers such that I can
have a receiver per topic, and have this distributed to workers rather than
localized to the driver.  I’m following something like this:
https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
multiple data streams can therefore be achieved by creating multiple input
DStreams and configuring them to receive different partitions of the data
stream from the source(s)."

However, I’m not able to consume any messages from Kafka after I perform the
union operation.  Again, if I create a single, multi-threaded, receiver I
can consume messages fine.  If I create 5 receivers in a loop, and call
jssc.union(…) i get:

INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks


Do I need to do anything to the unioned DStream?  Am I going about this
incorrectly?

Thanks in advance.

Matt

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Matt Narrell
Additionally,

If I dial up/down the number of executor cores, this does what I want.  Thanks for the extra eyes!

mn

On Sep 25, 2014, at 12:34 PM, Matt Narrell <[hidden email]> wrote:

Tim,

I think I understand this now.  I had a five node Spark cluster and a five partition topic, and I created five receivers.  I found this:  http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing.  If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages.

mn

On Sep 25, 2014, at 10:18 AM, Matt Narrell <[hidden email]> wrote:

I suppose I have other problems as I can’t get the Scala example to work either.  Puzzling, as I have literally coded like the examples (that are purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith <[hidden email]> wrote:

Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <[hidden email]> wrote:
The part that works is the commented out, single receiver stream below the loop.  It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.  I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:

Maybe post the before-code as in what was the code before you did the
loop (that worked)? I had similar situations where reviewing code
before (worked) and after (does not work) helped. Also, what helped is
the Scala REPL because I can see what are the object types being
returned by each statement.

Other than code, in the driver logs, you should see events that say
"Registered receiver for stream 0 from
<a href="akka.tcp://spark@node5.acme.net:53135">akka.tcp://spark@...:53135"

Now, if you goto "node5" and look at Spark or YarnContainer logs
(depending on who's doing RM), you should be able to see if the
receiver has any errors when trying to talk to kafka.



On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]> wrote:
To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)

Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach that would be appreciated.

Thanks,
Matt

On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:

Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
-> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]> wrote:
So, this is scrubbed some for confidentiality, but the meat of it is as follows.  Note, that if I substitute the commented section for the loop, I receive messages from the topic.

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.unpersist", "true");
sparkConf.set("spark.logConf", "true");

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
kafkaProps.put("group.id", groupId);

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
jsc.checkpoint("hdfs://<some_location>");

List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);

for (int i = 0; i < 5; i++) {
streamList.add(KafkaUtils.createStream(jsc,
                                       String.class, ProtobufModel.class,
                                       StringDecoder.class, ProtobufModelDecoder.class,
                                       kafkaProps,
                                       Collections.singletonMap(topic, 1),
                                       StorageLevel.MEMORY_ONLY_SER()));
}

final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

//  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
//                  KafkaUtils.createStream(jsc,
//                                          String.class, ProtobufModel.class,
//                                          StringDecoder.class, ProtobufModelDecoder.class,
//                                          kafkaProps,
//                                          Collections.singletonMap(topic, 5),
//                                          StorageLevel.MEMORY_ONLY_SER());

final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
    new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> tuple) throws Exception {
            return new Tuple2<>(tuple._2().getDeviceId(), 1);
        }
    });

… and futher Spark functions ...

On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:

Posting your code would be really helpful in figuring out gotchas.

On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]> wrote:
Hey,

Spark 1.1.0
Kafka 0.8.1.1
Hadoop (YARN/HDFS) 2.5.1

I have a five partition Kafka topic.  I can create a single Kafka receiver
via KafkaUtils.createStream with five threads in the topic map and consume
messages fine.  Sifting through the user list and Google, I see that its
possible to split the Kafka receiver among the Spark workers such that I can
have a receiver per topic, and have this distributed to workers rather than
localized to the driver.  I’m following something like this:
https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
multiple data streams can therefore be achieved by creating multiple input
DStreams and configuring them to receive different partitions of the data
stream from the source(s)."

However, I’m not able to consume any messages from Kafka after I perform the
union operation.  Again, if I create a single, multi-threaded, receiver I
can consume messages fine.  If I create 5 receivers in a loop, and call
jssc.union(…) i get:

INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks


Do I need to do anything to the unioned DStream?  Am I going about this
incorrectly?

Thanks in advance.

Matt

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

Re: Multiple Kafka Receivers and Union

Tim Smith
Good to know it worked out and thanks for the update. I didn't realize
you need to provision for receiver workers + processing workers. One
would think a worker would process multiple stages of an app/job and
receive is just a stage of the job.



On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell <[hidden email]> wrote:

> Additionally,
>
> If I dial up/down the number of executor cores, this does what I want.
> Thanks for the extra eyes!
>
> mn
>
> On Sep 25, 2014, at 12:34 PM, Matt Narrell <[hidden email]> wrote:
>
> Tim,
>
> I think I understand this now.  I had a five node Spark cluster and a five
> partition topic, and I created five receivers.  I found this:
> http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
> Indicating that if I use all my workers as receivers, there are none left to
> do the processing.  If I drop the number of partitions/receivers down while
> still having multiple unioned receivers, I see messages.
>
> mn
>
> On Sep 25, 2014, at 10:18 AM, Matt Narrell <[hidden email]> wrote:
>
> I suppose I have other problems as I can’t get the Scala example to work
> either.  Puzzling, as I have literally coded like the examples (that are
> purported to work), but no luck.
>
> mn
>
> On Sep 24, 2014, at 11:27 AM, Tim Smith <[hidden email]> wrote:
>
> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <[hidden email]>
> wrote:
>
> The part that works is the commented out, single receiver stream below the
> loop.  It seems that when I call KafkaUtils.createStream more than once, I
> don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see
> anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith <[hidden email]> wrote:
>
> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
>
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://[hidden email]:53135"
>
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
>
>
>
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <[hidden email]>
> wrote:
>
> To my eyes, these are functionally equivalent.  I’ll try a Scala approach,
> but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith <[hidden email]> wrote:
>
> Sorry, I am almost Java illiterate but here's my Scala code to do the
> equivalent (that I have tested to work):
>
> val kInStreams = (1 to 10).map{_ =>
> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
> across the cluster, one for each partition, potentially but active
> receivers are only as many kafka partitions you have
>
> val kInMsg =
> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>
>
>
> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <[hidden email]>
> wrote:
>
> So, this is scrubbed some for confidentiality, but the meat of it is as
> follows.  Note, that if I substitute the commented section for the loop, I
> receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map<String, String> kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
> Seconds.apply(1));
> jsc.checkpoint("hdfs://<some_location>");
>
> List<JavaPairDStream<String, ProtobufModel>> streamList = new
> ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
> streamList.add(KafkaUtils.createStream(jsc,
>                                        String.class, ProtobufModel.class,
>                                        StringDecoder.class,
> ProtobufModelDecoder.class,
>                                        kafkaProps,
>                                        Collections.singletonMap(topic, 1),
>                                        StorageLevel.MEMORY_ONLY_SER()));
> }
>
> final JavaPairDStream<String, ProtobufModel> stream =
> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>
> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
> //                  KafkaUtils.createStream(jsc,
> //                                          String.class,
> ProtobufModel.class,
> //                                          StringDecoder.class,
> ProtobufModelDecoder.class,
> //                                          kafkaProps,
> //                                          Collections.singletonMap(topic,
> 5),
> //                                          StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>     new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>         @Override
>         public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel>
> tuple) throws Exception {
>             return new Tuple2<>(tuple._2().getDeviceId(), 1);
>         }
>     });
>
> … and futher Spark functions ...
>
> On Sep 23, 2014, at 2:55 PM, Tim Smith <[hidden email]> wrote:
>
> Posting your code would be really helpful in figuring out gotchas.
>
> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[hidden email]>
> wrote:
>
> Hey,
>
> Spark 1.1.0
> Kafka 0.8.1.1
> Hadoop (YARN/HDFS) 2.5.1
>
> I have a five partition Kafka topic.  I can create a single Kafka receiver
> via KafkaUtils.createStream with five threads in the topic map and consume
> messages fine.  Sifting through the user list and Google, I see that its
> possible to split the Kafka receiver among the Spark workers such that I can
> have a receiver per topic, and have this distributed to workers rather than
> localized to the driver.  I’m following something like this:
> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
> multiple data streams can therefore be achieved by creating multiple input
> DStreams and configuring them to receive different partitions of the data
> stream from the source(s)."
>
> However, I’m not able to consume any messages from Kafka after I perform the
> union operation.  Again, if I create a single, multi-threaded, receiver I
> can consume messages fine.  If I create 5 receivers in a loop, and call
> jssc.union(…) i get:
>
> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>
>
> Do I need to do anything to the unioned DStream?  Am I going about this
> incorrectly?
>
> Thanks in advance.
>
> Matt
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]