Use case advice

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Use case advice

andraskolbert
Hi,
I would like to get your advice on my use case.
I have a few spark streaming applications where I need to keep updating a dataframe after each batch. Each batch probably affects a small fraction of the dataframe (5k out of 200k records).

The options I have been considering so far:
1) keep dataframe on the driver, and update that after each batch
2) keep dataframe distributed, and use checkpointing to mitigate lineage

I solved previous use cases with option 2, but I am not sure if it is the most optimal as checkpointing is relatively expensive. I also wondered about HBASE or some sort of quick access memory storage, however it is currently not in my stack.

Curious to hear your thoughts

Andras

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

Artemis User
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:

> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

andraskolbert
Sorry if my terminology is misleading. 

What I meant under driver only is to use a local pandas dataframe (collect the data to the master), and keep updating that instead of dealing with a spark distributed dataframe for holding this data. 

For example, we have a dataframe with all users and their corresponding latest activity timestamp. After each streaming batch, aggregations are performed and the calculation is collected to the driver to update a subset of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User, <[hidden email]> wrote:
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:
> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

mmuru
You could try Delta Lake or Apache Hudi for this use case.

On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <[hidden email]> wrote:
Sorry if my terminology is misleading. 

What I meant under driver only is to use a local pandas dataframe (collect the data to the master), and keep updating that instead of dealing with a spark distributed dataframe for holding this data. 

For example, we have a dataframe with all users and their corresponding latest activity timestamp. After each streaming batch, aggregations are performed and the calculation is collected to the driver to update a subset of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User, <[hidden email]> wrote:
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:
> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

andraskolbert
Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely changed a few of my projects!

One question regarding that.
When I use the following statement, all works fine and I can use delta properly, in the spark context that jupyter initiates automatically.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark \
        --master yarn \
        --deploy-mode client \
        --driver-memory 4g \
        --executor-memory 16G \
        --executor-cores 1 \
        --num-executors 8 \
        --conf spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
        --jars hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar


However, I would like to have a local pyspark initially, and only connect to YARN when the specific notebook is configured in that way.

1)

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark

2)
conf = spark.sparkContext._conf.setAll([
    ('spark.app.name', 'Delta Demo'),
    ('spark.yarn.jars', 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
    ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
    ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
    ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    ])
spark.sparkContext.stop()

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()
sc = spark.sparkContext

spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
from delta.tables import *
delta_path = "/data/delta-table"
data = spark.range(0, 5)
data.show()
data.write.format("delta").mode("overwrite").save(delta_path)


This way, I keep facing with the ' Error: java.lang.ClassNotFoundException: Failed to find data source: delta. ' error message.

What did I miss in my configuration/env variables? 

Thanks
Andras



On Sun, 10 Jan 2021, 3:33 am muru, <[hidden email]> wrote:
You could try Delta Lake or Apache Hudi for this use case.

On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <[hidden email]> wrote:
Sorry if my terminology is misleading. 

What I meant under driver only is to use a local pandas dataframe (collect the data to the master), and keep updating that instead of dealing with a spark distributed dataframe for holding this data. 

For example, we have a dataframe with all users and their corresponding latest activity timestamp. After each streaming batch, aggregations are performed and the calculation is collected to the driver to update a subset of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User, <[hidden email]> wrote:
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:
> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

andraskolbert
sorry missed out a bit. Added, highlighted with yellow.

On Thu, 14 Jan 2021 at 13:54, András Kolbert <[hidden email]> wrote:
Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely changed a few of my projects!

One question regarding that.
When I use the following statement, all works fine and I can use delta properly, in the spark context that jupyter initiates automatically.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark \
        --master yarn \
        --deploy-mode client \
        --driver-memory 4g \
        --executor-memory 16G \
        --executor-cores 1 \
        --num-executors 8 \
        --conf spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
        --jars hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar


However, I would like to have a local pyspark initially, and only connect to YARN when the specific notebook is configured in that way.

1)

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark

2)
conf = spark.sparkContext._conf.setAll([
    ('spark.app.name', 'Delta Demo'),
    ('spark.yarn.jars', 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
        ('spark.master', 'yarn-client'),
        ('spark.executor.memory', '16g'),
        ('spark.executor.instances', '8'),
        ('spark.executor.cores', '1'),
        ('spark.driver.memory', '4g'), 
    ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
    ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
    ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    ])
spark.sparkContext.stop()

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()
sc = spark.sparkContext

spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
from delta.tables import *
delta_path = "/data/delta-table"
data = spark.range(0, 5)
data.show()
data.write.format("delta").mode("overwrite").save(delta_path)


This way, I keep facing with the ' Error: java.lang.ClassNotFoundException: Failed to find data source: delta. ' error message.

What did I miss in my configuration/env variables? 

Thanks
Andras



On Sun, 10 Jan 2021, 3:33 am muru, <[hidden email]> wrote:
You could try Delta Lake or Apache Hudi for this use case.

On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <[hidden email]> wrote:
Sorry if my terminology is misleading. 

What I meant under driver only is to use a local pandas dataframe (collect the data to the master), and keep updating that instead of dealing with a spark distributed dataframe for holding this data. 

For example, we have a dataframe with all users and their corresponding latest activity timestamp. After each streaming batch, aggregations are performed and the calculation is collected to the driver to update a subset of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User, <[hidden email]> wrote:
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:
> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

mmuru
You need to make sure the delta-core_2.11-0.6.1. jar file in your $SPARK_HOME/jars folder. 

On Thu, Jan 14, 2021 at 4:59 AM András Kolbert <[hidden email]> wrote:
sorry missed out a bit. Added, highlighted with yellow.

On Thu, 14 Jan 2021 at 13:54, András Kolbert <[hidden email]> wrote:
Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely changed a few of my projects!

One question regarding that.
When I use the following statement, all works fine and I can use delta properly, in the spark context that jupyter initiates automatically.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark \
        --master yarn \
        --deploy-mode client \
        --driver-memory 4g \
        --executor-memory 16G \
        --executor-cores 1 \
        --num-executors 8 \
        --conf spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
        --jars hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar


However, I would like to have a local pyspark initially, and only connect to YARN when the specific notebook is configured in that way.

1)

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark

2)
conf = spark.sparkContext._conf.setAll([
    ('spark.app.name', 'Delta Demo'),
    ('spark.yarn.jars', 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
        ('spark.master', 'yarn-client'),
        ('spark.executor.memory', '16g'),
        ('spark.executor.instances', '8'),
        ('spark.executor.cores', '1'),
        ('spark.driver.memory', '4g'), 
    ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
    ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
    ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    ])
spark.sparkContext.stop()

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()
sc = spark.sparkContext

spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
from delta.tables import *
delta_path = "/data/delta-table"
data = spark.range(0, 5)
data.show()
data.write.format("delta").mode("overwrite").save(delta_path)


This way, I keep facing with the ' Error: java.lang.ClassNotFoundException: Failed to find data source: delta. ' error message.

What did I miss in my configuration/env variables? 

Thanks
Andras



On Sun, 10 Jan 2021, 3:33 am muru, <[hidden email]> wrote:
You could try Delta Lake or Apache Hudi for this use case.

On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <[hidden email]> wrote:
Sorry if my terminology is misleading. 

What I meant under driver only is to use a local pandas dataframe (collect the data to the master), and keep updating that instead of dealing with a spark distributed dataframe for holding this data. 

For example, we have a dataframe with all users and their corresponding latest activity timestamp. After each streaming batch, aggregations are performed and the calculation is collected to the driver to update a subset of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User, <[hidden email]> wrote:
Could you please clarify what do you mean by 1)? Driver is only
responsible for submitting Spark job, not performing.

-- ND

On 1/9/21 9:35 AM, András Kolbert wrote:
> Hi,
> I would like to get your advice on my use case.
> I have a few spark streaming applications where I need to keep
> updating a dataframe after each batch. Each batch probably affects a
> small fraction of the dataframe (5k out of 200k records).
>
> The options I have been considering so far:
> 1) keep dataframe on the driver, and update that after each batch
> 2) keep dataframe distributed, and use checkpointing to mitigate lineage
>
> I solved previous use cases with option 2, but I am not sure if it is
> the most optimal as checkpointing is relatively expensive. I also
> wondered about HBASE or some sort of quick access memory storage,
> however it is currently not in my stack.
>
> Curious to hear your thoughts
>
> Andras
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

Dilip Desavali
In reply to this post by andraskolbert
Unsubscribe
Reply | Threaded
Open this post in threaded view
|

Re: Use case advice

purav aggarwal
Unsubscribe

On Fri, Jan 15, 2021 at 9:52 AM Dilip Desavali <[hidden email]> wrote:
Unsubscribe