KafkaUtils module not found on spark 3 pyspark

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

KafkaUtils module not found on spark 3 pyspark

aupres
I use hadoop 3.3.0 and spark 3.0.1-bin-hadoop3.2. And my python ide is
eclipse version 2020-12. I try to develop python application with KafkaUtils
pyspark module. My configuration reference of pyspark and eclipse is  this
site
<https://enahwe.wordpress.com/2015/11/25/how-to-configure-eclipse-for-developing-with-python-and-spark-on-hadoop/>
. Simple codes like below work well without exception.


from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Kafka2RDD").setMaster("local[*]")
sc = SparkContext(conf = conf)
data = [1, 2, 3, 4, 5, 6]
distData = sc.parallelize(data)    

print(distData.count())


But I found the spark 3 pyspark module does not contain KafkaUtils at all.
The below codes can not import KafkaUtils.


from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import OffsetRange


So, I downgrade spark from 3.0.1-bin-hadoop3.2 to 2.4.7-bin-hadoop2.7. Then
I can sucsessfully import KafkaUtils on eclipse ide. But this time the
exceptions related with spark version are thrown continuously.


Traceback (most recent call last):
  File
"/home/jhwang/eclipse-workspace/BigData_Etl_Python/com/aaa/etl/kafka_spark_rdd.py",
line 36, in <module>
    print(distData.count())
  File "/usr/local/spark/python/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/local/spark/python/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/local/spark/python/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File
"/usr/python/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version
55
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)


How on earth can I import KafkaUtils and related modules on spark 3.0.1.
Where is KafkaUtils module on pyspark of Spark 3.0.1 or how can the pyspark
module can be installed? Any reply will be welcome. Best regards.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: KafkaUtils module not found on spark 3 pyspark

Jungtaek Lim-2
I got similar question recently so had to find some history I missed. If I understand correctly the class is "intentionally" removed in Spark 3, because the class refers "kafka 0.8" module which isn't guaranteed to work with recent Kafka version. And looks like there was another decision to not add pyspark support for "kafka 0.10" module.

Nowadays, you're encouraged to use Structured Streaming instead of DStream whenever possible, cause community's main focus is on SQL which is what Structured Streaming is based on. (Few contributors have willingness to maintain DStream. Honestly, contributions on DStream have been quite rare.)

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Feb 17, 2021 at 4:19 PM aupres <[hidden email]> wrote:
I use hadoop 3.3.0 and spark 3.0.1-bin-hadoop3.2. And my python ide is
eclipse version 2020-12. I try to develop python application with KafkaUtils
pyspark module. My configuration reference of pyspark and eclipse is  this
site
<https://enahwe.wordpress.com/2015/11/25/how-to-configure-eclipse-for-developing-with-python-and-spark-on-hadoop/>
. Simple codes like below work well without exception.


from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Kafka2RDD").setMaster("local[*]")
sc = SparkContext(conf = conf)
data = [1, 2, 3, 4, 5, 6]
distData = sc.parallelize(data)   

print(distData.count())


But I found the spark 3 pyspark module does not contain KafkaUtils at all.
The below codes can not import KafkaUtils.


from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import OffsetRange


So, I downgrade spark from 3.0.1-bin-hadoop3.2 to 2.4.7-bin-hadoop2.7. Then
I can sucsessfully import KafkaUtils on eclipse ide. But this time the
exceptions related with spark version are thrown continuously.


Traceback (most recent call last):
  File
"/home/jhwang/eclipse-workspace/BigData_Etl_Python/com/aaa/etl/kafka_spark_rdd.py",
line 36, in <module>
    print(distData.count())
  File "/usr/local/spark/python/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/local/spark/python/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/local/spark/python/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File
"/usr/python/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version
55
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
    at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)


How on earth can I import KafkaUtils and related modules on spark 3.0.1.
Where is KafkaUtils module on pyspark of Spark 3.0.1 or how can the pyspark
module can be installed? Any reply will be welcome. Best regards.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]