pyspark createTopicAndPartition error

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

pyspark createTopicAndPartition error

crazyzi
This post has NOT been accepted by the mailing list yet.
This post was updated on .
Hi:


I use pyspark development spark streaming applications.(env: Python2.7 and spark2.0.1). I don't use checkpoint way for save kafka offset, and use the zookeeper save manually. But the spark an error:
Py4JError: An error occurred while calling o38.createTopicAndPartition. Trace: py4j.Py4JException: Method createTopicAndPartition([class java.lang.String, class java.lang.String]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)
Feel like a spark of bugs, I don't know, who met the same problem?(ps: - jars spark - streaming - kafka - 0-8 - assembly_2. 11-2.0.1. Jar)

ZK core code:

def _get_topic_and_partition(self, part_and_offset):
    partition, from_offset = part_and_offset.split(':')
    return (TopicAndPartition(self._topic, partition), from_offset)

def get_last(self):
    try:
        offset = self.zk_client.get(CHECKPOINT_ROOT_NODE)[0]
        if not offset:
            return None
    except NoNodeError:
        return None

    return dict([self._get_topic_and_partition(x) for x in offset.split(',')])

def save(self, rdd):
    offset = ','.join(['%s:%s' % (x.partition, x.fromOffset) for x in rdd.offsetRanges()])
    try:
        self.zk_client.set(CHECKPOINT_ROOT_NODE, offset)
    except NoNodeError:
        self._init_root_node()
        self.zk_client.set(CHECKPOINT_ROOT_NODE, offset)


and during initialization spark stream call get_last() method.

 KafkaUtils.createDirectStream(
        ssc,
        [self._config['kafka']['topic']],
        kafka_config,
        fromOffsets=self.checkpoint.get_last(),
        valueDecoder=lambda x: json.loads(x))
Loading...