convert java dataframe to pyspark dataframe

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

convert java dataframe to pyspark dataframe

Aditya Singh
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

srowen
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

Aditya Singh
Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow missed this mail before) but I am under the impression that passing the py4j.java_gateway.JavaGateway object lets the pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to access the same spark session across java and pyspark. So how can we share the spark context and in turn spark session, across java and pyspark.

Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen <[hidden email]> wrote:
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

Khalid Mammadov

Hi Aditya,


I think you original question was as how to convert a DataFrame from Spark session created on Java/Scala to a DataFrame on a Spark session created from Python(PySpark). 

So, as I have answered on your SO question:


There is a missing call to entry_point before calling getDf() in your code

So, try this :

app = gateway.entry_point
j_df = app.getDf()

Additionally, I have create working copy using Python and Scala (hope you dont mind) below that shows how on Scala side py4j gateway is started with Spark session and a sample DataFrame and on Python side I have accessed that DataFrame and converted to Python List[Tuple] before converting back to a DataFrame for a Spark session on Python side:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

Scala:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}




Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:
Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow missed this mail before) but I am under the impression that passing the py4j.java_gateway.JavaGateway object lets the pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to access the same spark session across java and pyspark. So how can we share the spark context and in turn spark session, across java and pyspark.

Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen <[hidden email]> wrote:
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

Aditya Singh
Thanks a lot Khalid for replying. 

I have one question though. The approach tou showed needs an understanding on python side before hand about the data type of columns of dataframe. Can we implement a generic approach where this info is not required and we just have the java dataframe as input on python side?

Also one more question, in my use case I will sending a dataframe from java to python and then on python side there will be some transformation done on the dataframe(including using python udfs) but no actions will be performed here and then will send it back to java where actions will be performed. So also wanted to ask if this is feasible and if yes do we need to send some special jars to executors so that it can execute udfs on the dataframe.

On Wed, 31 Mar 2021 at 3:37 AM, Khalid Mammadov <[hidden email]> wrote:

Hi Aditya,


I think you original question was as how to convert a DataFrame from Spark session created on Java/Scala to a DataFrame on a Spark session created from Python(PySpark). 

So, as I have answered on your SO question:


There is a missing call to entry_point before calling getDf() in your code

So, try this :

app = gateway.entry_point
j_df = app.getDf()

Additionally, I have create working copy using Python and Scala (hope you dont mind) below that shows how on Scala side py4j gateway is started with Spark session and a sample DataFrame and on Python side I have accessed that DataFrame and converted to Python List[Tuple] before converting back to a DataFrame for a Spark session on Python side:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

Scala:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}




Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:
Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow missed this mail before) but I am under the impression that passing the py4j.java_gateway.JavaGateway object lets the pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to access the same spark session across java and pyspark. So how can we share the spark context and in turn spark session, across java and pyspark.

Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen <[hidden email]> wrote:
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

Khalid Mammadov

I think what you want to achieve is what PySpark is actually doing in it's API under the hood.

So, specifically you need to look at PySpark's implementation of DataFrame, SparkSession and SparkContext API. Under the hood that what is happening, it start a py4j gateway and delegates all Spark operations and objects creation to JVM.

Look for example here, here and here where SparkSession/SparkContext (Python) communicates with JVM and creates SparkSession/SparkContext on JVM side. And rest of the PySpark code will be delegating execution to them.

Having these objects created by you and custom Java/Scala application which holds Spark objects then you can play around with rest of DataFrame creation and passing back and forward. But, I must admit, this is not part of official documentation and playing around internals of Spark and which are subject to change (often).

So, I am not sure what your actual requirement is but you will need to implement your custom version of PySpark API to get all functionality you need and control on JVM side.


On 31/03/2021 06:49, Aditya Singh wrote:
Thanks a lot Khalid for replying. 

I have one question though. The approach tou showed needs an understanding on python side before hand about the data type of columns of dataframe. Can we implement a generic approach where this info is not required and we just have the java dataframe as input on python side?

Also one more question, in my use case I will sending a dataframe from java to python and then on python side there will be some transformation done on the dataframe(including using python udfs) but no actions will be performed here and then will send it back to java where actions will be performed. So also wanted to ask if this is feasible and if yes do we need to send some special jars to executors so that it can execute udfs on the dataframe.

On Wed, 31 Mar 2021 at 3:37 AM, Khalid Mammadov <[hidden email]> wrote:

Hi Aditya,


I think you original question was as how to convert a DataFrame from Spark session created on Java/Scala to a DataFrame on a Spark session created from Python(PySpark). 

So, as I have answered on your SO question:


There is a missing call to entry_point before calling getDf() in your code

So, try this :

app = gateway.entry_point
j_df = app.getDf()

Additionally, I have create working copy using Python and Scala (hope you dont mind) below that shows how on Scala side py4j gateway is started with Spark session and a sample DataFrame and on Python side I have accessed that DataFrame and converted to Python List[Tuple] before converting back to a DataFrame for a Spark session on Python side:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

Scala:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}




Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:
Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow missed this mail before) but I am under the impression that passing the py4j.java_gateway.JavaGateway object lets the pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to access the same spark session across java and pyspark. So how can we share the spark context and in turn spark session, across java and pyspark.

Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen <[hidden email]> wrote:
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: convert java dataframe to pyspark dataframe

Aditya Singh
Thanks a lot, this was really helpful.

On Wed, 31 Mar 2021 at 4:13 PM, Khalid Mammadov <[hidden email]> wrote:

I think what you want to achieve is what PySpark is actually doing in it's API under the hood.

So, specifically you need to look at PySpark's implementation of DataFrame, SparkSession and SparkContext API. Under the hood that what is happening, it start a py4j gateway and delegates all Spark operations and objects creation to JVM.

Look for example here, here and here where SparkSession/SparkContext (Python) communicates with JVM and creates SparkSession/SparkContext on JVM side. And rest of the PySpark code will be delegating execution to them.

Having these objects created by you and custom Java/Scala application which holds Spark objects then you can play around with rest of DataFrame creation and passing back and forward. But, I must admit, this is not part of official documentation and playing around internals of Spark and which are subject to change (often).

So, I am not sure what your actual requirement is but you will need to implement your custom version of PySpark API to get all functionality you need and control on JVM side.


On 31/03/2021 06:49, Aditya Singh wrote:
Thanks a lot Khalid for replying. 

I have one question though. The approach tou showed needs an understanding on python side before hand about the data type of columns of dataframe. Can we implement a generic approach where this info is not required and we just have the java dataframe as input on python side?

Also one more question, in my use case I will sending a dataframe from java to python and then on python side there will be some transformation done on the dataframe(including using python udfs) but no actions will be performed here and then will send it back to java where actions will be performed. So also wanted to ask if this is feasible and if yes do we need to send some special jars to executors so that it can execute udfs on the dataframe.

On Wed, 31 Mar 2021 at 3:37 AM, Khalid Mammadov <[hidden email]> wrote:

Hi Aditya,


I think you original question was as how to convert a DataFrame from Spark session created on Java/Scala to a DataFrame on a Spark session created from Python(PySpark). 

So, as I have answered on your SO question:


There is a missing call to entry_point before calling getDf() in your code

So, try this :

app = gateway.entry_point
j_df = app.getDf()

Additionally, I have create working copy using Python and Scala (hope you dont mind) below that shows how on Scala side py4j gateway is started with Spark session and a sample DataFrame and on Python side I have accessed that DataFrame and converted to Python List[Tuple] before converting back to a DataFrame for a Spark session on Python side:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

Scala:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}




Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:
Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow missed this mail before) but I am under the impression that passing the py4j.java_gateway.JavaGateway object lets the pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to access the same spark session across java and pyspark. So how can we share the spark context and in turn spark session, across java and pyspark.

Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen <[hidden email]> wrote:
The problem is that both of these are not sharing a SparkContext as far as I can see, so there is no way to share the object across them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session from two languages and register the DataFrame as a temp view in Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh <[hidden email]> wrote:
Hi All,

I am a newbie to spark and trying to pass a java dataframe to pyspark. Foloowing link has details about what I am trying to do:-


Can someone please help me with this?

Thanks,