Spark Streaming - Set Parallelism and Optimize driver

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming - Set Parallelism and Optimize driver

forece85
I am new to spark streaming and trying to understand spark ui and to do
optimizations.

1. Processing at executors took less time than at driver. How to optimize to
make driver tasks fast ?
2. We are using dstream.repartition(defaultParallelism*3) to increase
parallelism which is causing high shuffles. Is there any option to avoid
repartition manually to reduce data shuffles.
3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
got created?

*hardware configuration:* executor-cores: 3; driver-cores: 3;
dynamicAllocation is true;
initial,min,maxExecutors: 25

StackOverFlow link for screenshots:
https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming - Set Parallelism and Optimize driver

Russell Spitzer
Without your code this is hard to determine but a few notes.

The number of partitions is usually dictated by the input source, see if it has any configuration which allows you to increase input splits.

I'm not sure why you think some of the code is running on the driver. All methods on dataframes and rdds will be executed on executors. For each partition is not local.

The difference in partitions is probably the shuffle you added with repartition. I would actually be not surprised if your code ran faster without the repartitioning. But again with the real code it would be very hard to say.

On Mon, Jul 20, 2020, 6:33 AM forece85 <[hidden email]> wrote:
I am new to spark streaming and trying to understand spark ui and to do
optimizations.

1. Processing at executors took less time than at driver. How to optimize to
make driver tasks fast ?
2. We are using dstream.repartition(defaultParallelism*3) to increase
parallelism which is causing high shuffles. Is there any option to avoid
repartition manually to reduce data shuffles.
3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
got created?

*hardware configuration:* executor-cores: 3; driver-cores: 3;
dynamicAllocation is true;
initial,min,maxExecutors: 25

StackOverFlow link for screenshots:
https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming - Set Parallelism and Optimize driver

forece85
This post was updated on .
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming - Set Parallelism and Optimize driver

forece85
In reply to this post by Russell Spitzer
Thanks for reply. Please find sudo code below. We are fetching Dstreams from
kinesis stream for every 10sec and performing transformations and finally
persisting to hbase tables using batch insertions.

dStream = dStream.repartition(jssc.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
                Connection hbaseConnection =
ConnectionUtil.getHbaseConnection();
                List<byte[]> listOfRecords = new ArrayList<>();
                while (partitionOfRecords.hasNext()) {
                    try {
                        listOfRecords.add(partitionOfRecords.next());

                        if (listOfRecords.size() < 10 &&
partitionOfRecords.hasNext())
                            continue;

                        List<byte[]> finalListOfRecords = listOfRecords;
                        doJob(finalListOfRecords, primaryConnection,
lookupsConnection);
                        listOfRecords = new ArrayList<>();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }));

We are batching every 10 records and sending to doJob method where actual
transformations happen and every batch will get batch inserted to hbase
table.

With above code can we guess whats happening at Job 1 => 6 tasks and how to
reduce that time.
Mainly how to effectively set parallelism avoiding repartition() method.

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming - Set Parallelism and Optimize driver

Russell Spitzer
In reply to this post by forece85
Without seeing the rest (and you can confirm this by looking at the DAG visualization in the Spark UI) I would say your first stage with 6 partitions is:

Stage 1: Read data from kinesis (or read blocks from receiver not sure what method you are using) and write shuffle files for repartition
Stage 2 : Read shuffle files and do everything else

In general I think the biggest issue here is probably not the distribution of tasks which based on your UI reading were quite small, but instead the parallelization of the write operation since it is done synchronously. I would suggest instead of trying to increase your parallelism by partitioning, you attempt to have "doJob" run asynchronously and allow for more parallelism within an executor rather than using multiple executor threads/jvms.

That said you probably would run faster if you just skipped the repartition based on the speed of second stage.

On Mon, Jul 20, 2020 at 8:23 AM forece85 <[hidden email]> wrote:
Thanks for reply. Please find sudo code below. Its Dstreams reading for every
10secs from kinesis stream and after transformations, pushing into hbase.
Once got Dstream, we are using below code to repartition and do processing:

dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
   Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
   List<byte[]> listOfRecords = new ArrayList<>();
   while (partitionOfRecords.hasNext()) {
         listOfRecords.add(partitionOfRecords.next());

         if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
                            continue;

         List<byte[]> finalListOfRecords = listOfRecords;
         doJob(finalListOfRecords, hbaseConnection);
         listOfRecords = new ArrayList<>();
   }
}));


We are batching every 10 records and pass to doJob method where we batch
process and bulk insert to hbase.

With above code, will it be able to tell what is happening at job 1 -> 6
tasks? and how to replace repartition method efficiently.

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]