broadcast in spark streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

broadcast in spark streaming

lonely7345
This post has NOT been accepted by the mailing list yet.
My spark streaming has two input stream ,one is custom hdfs inputstream, the other is  kafka input stream.
For the hdfs input stream,  i read the hfds file ,and  broadcast the List<String> in the foreachRDD.
And in the kafka input stream,  for the map join. I use the broadcast to filter. but Now the broadcast is null

How to confim the broadcast is finished  when read the  broadcast value ?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: broadcast in spark streaming

lonely7345
This post has NOT been accepted by the mailing list yet.
   final scala.collection.convert.Wrappers.SeqWrapper<String>[] sites = new Wrappers.SeqWrapper[1];

        streawm.map(new Function<Tuple2<LongWritable, Text>, String>() {
            @Override
            public String call(Tuple2<LongWritable, Text> v1) throws Exception {
                return v1._2.toString().split("\\|")[0];
            }
        }).foreachRDD(new Function<JavaRDD<String>, Void>() {
            @Override
            public Void call(JavaRDD<String> v1) throws Exception {
                List<String> ss = jssc.sparkContext().broadcast(v1.coalesce(1).collect()).getValue();
                sites[0] =
                        (scala.collection.convert.Wrappers.SeqWrapper<String>) ss;
                return null;
            }
        });

Loading...