PySpark .collect() output to Scala Array[Row]

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PySpark .collect() output to Scala Array[Row]

Nick Ruest
Hi,

I've hit a wall with trying to implement a couple of Scala methods in a
Python version of our project. I've implemented a number of these
already, but I'm getting hung up with this one.

My Python function looks like this:

def Write_Graphml(data, graphml_path, sc):
    return sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML(data,
graphml_path).apply


Where data is a DataFrame that has been collected; data.collect().

On the Scala side is it basically:

object WriteGraphML {
  apply(data: Array[Row], graphmlPath: String): Boolean = {
    ...
    massages an Array[Row] into GraphML
    ...
    True
}

When I try to use it in PySpark, I end up getting this error message:

Py4JError: An error occurred while calling
None.io.archivesunleashed.app.WriteGraphML. Trace:
py4j.Py4JException: Constructor
io.archivesunleashed.app.WriteGraphML([class java.util.ArrayList, class
java.lang.String]) does not exist
        at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
        at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
        at py4j.Gateway.invoke(Gateway.java:237)
        at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)


I originally dug into what the error message stated, and tried a variety
of tweaks such as:

sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML.apply(data,
graphml_path)

And, went as far as trying get_attr, and calling the "WriteGraphML$" and
few other varieties with that method.

All the results produced the same variety of error message above; that
the Constructor or method does not exist.

I came across this[1] Based on lots of Googling and Stack Overflow
searches, and it has me thinking that the problem is because of how Py4J
is passing off the Python List (data) to the JVM, and then passing it to
Scala. It's ending up as an ArrayList instead of an Array[Row].

Do I need to tweak data before it is passed to Write_Graphml? Or am I
doing something else wrong here.

I had originally posted a version of this message to the dev list, and
Sean Owen suggested WriteGraphML should be a implemented as a class, not
an object. Is that the right path? I have a number of other Scala
functions implemented in the PySpark side of our project that are
objects, and everything works fine.

...and is there a best practices guide or documentation for implementing
Scala functions in PySpark? I've found a number of blog posts that have
been helpful.

Thanks in advance for any help!

cheers!

-nruest

[1]
https://stackoverflow.com/questions/61928886/pyspark-list-to-scala-sequence


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PySpark .collect() output to Scala Array[Row]

Wim Van Leuven
Looking at the stack trace, your data from Spark gets serialized to an ArrayList (of something) whereas in your scala code you are using an Array of Rows. So, the types don't lign up. That's the exception you are seeing: the JVM searches for a signature that simply does not exist.

Try to turn the Array into a java.util.ArrayList?
-w

On Tue, 26 May 2020 at 03:04, Nick Ruest <[hidden email]> wrote:
Hi,

I've hit a wall with trying to implement a couple of Scala methods in a
Python version of our project. I've implemented a number of these
already, but I'm getting hung up with this one.

My Python function looks like this:

def Write_Graphml(data, graphml_path, sc):
    return sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML(data,
graphml_path).apply


Where data is a DataFrame that has been collected; data.collect().

On the Scala side is it basically:

object WriteGraphML {
  apply(data: Array[Row], graphmlPath: String): Boolean = {
    ...
    massages an Array[Row] into GraphML
    ...
    True
}

When I try to use it in PySpark, I end up getting this error message:

Py4JError: An error occurred while calling
None.io.archivesunleashed.app.WriteGraphML. Trace:
py4j.Py4JException: Constructor
io.archivesunleashed.app.WriteGraphML([class java.util.ArrayList, class
java.lang.String]) does not exist
        at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
        at
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
        at py4j.Gateway.invoke(Gateway.java:237)
        at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)


I originally dug into what the error message stated, and tried a variety
of tweaks such as:

sc.getOrCreate()._jvm.io.archivesunleashed.app.WriteGraphML.apply(data,
graphml_path)

And, went as far as trying get_attr, and calling the "WriteGraphML$" and
few other varieties with that method.

All the results produced the same variety of error message above; that
the Constructor or method does not exist.

I came across this[1] Based on lots of Googling and Stack Overflow
searches, and it has me thinking that the problem is because of how Py4J
is passing off the Python List (data) to the JVM, and then passing it to
Scala. It's ending up as an ArrayList instead of an Array[Row].

Do I need to tweak data before it is passed to Write_Graphml? Or am I
doing something else wrong here.

I had originally posted a version of this message to the dev list, and
Sean Owen suggested WriteGraphML should be a implemented as a class, not
an object. Is that the right path? I have a number of other Scala
functions implemented in the PySpark side of our project that are
objects, and everything works fine.

...and is there a best practices guide or documentation for implementing
Scala functions in PySpark? I've found a number of blog posts that have
been helpful.

Thanks in advance for any help!

cheers!

-nruest

[1]
https://stackoverflow.com/questions/61928886/pyspark-list-to-scala-sequence