Rdd - zip with index

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Rdd - zip with index

khajaasmath786
Hi,

I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.

I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.

I am
Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.

Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .

Thanks,
Asmath

Sent from my iPhone
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
Hi, Mohammed
I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.

In order to achieve better parallelism I’d suggest to split the csv file.

Another problem is question: why are you using rdd?
Just Spark.read.option(“header”, true).load()..select(....).write.format(“avro”).save(...)


> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> Hi,
>
> I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.
>
> I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.
>
> I am
> Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.
>
> Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .
>
> Thanks,
> Asmath
>
> Sent from my iPhone
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

khajaasmath786
So spark by default doesn’t split the large 10gb file when loaded?

Sent from my iPhone

> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <[hidden email]> wrote:
>
> Hi, Mohammed
> I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.
>
> In order to achieve better parallelism I’d suggest to split the csv file.
>
> Another problem is question: why are you using rdd?
> Just Spark.read.option(“header”, true).load()..select(....).write.format(“avro”).save(...)
>
>
>> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed <[hidden email]> wrote:
>>
>> Hi,
>>
>> I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.
>>
>> I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.
>>
>> I am
>> Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.
>>
>> Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .
>>
>> Thanks,
>> Asmath
>>
>> Sent from my iPhone
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

srowen
In reply to this post by "Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
I don't think that would change partitioning? try .repartition(). It isn't necessary to write it out let alone in Avro.

‪On Tue, Mar 23, 2021 at 8:45 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <[hidden email]> wrote:‬
Hi, Mohammed
I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.

In order to achieve better parallelism I’d suggest to split the csv file.

Another problem is question: why are you using rdd?
Just Spark.read.option(“header”, true).load()..select(....).write.format(“avro”).save(...)


> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> Hi,
>
> I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.
>
> I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.
>
> I am
> Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.
>
> Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .
>
> Thanks,
> Asmath
>
> Sent from my iPhone
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
In reply to this post by khajaasmath786
I’m not Spark core developer and do not want to confuse you but it seems logical to me that just reading from single file (no matter what format of the file is used) gives no parallelism unless you do repartition by some column just after csv load, but the if you’re telling you’ve already tried repartition with no luck...


> On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> So spark by default doesn’t split the large 10gb file when loaded?
>
> Sent from my iPhone
>
>> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <[hidden email]> wrote:
>>
>> Hi, Mohammed
>> I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.
>>
>> In order to achieve better parallelism I’d suggest to split the csv file.
>>
>> Another problem is question: why are you using rdd?
>> Just Spark.read.option(“header”, true).load()..select(....).write.format(“avro”).save(...)
>>
>>
>>>> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.
>>>
>>> I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.
>>>
>>> I am
>>> Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.
>>>
>>> Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .
>>>
>>> Thanks,
>>> Asmath
>>>
>>> Sent from my iPhone
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: [hidden email]
>>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

srowen
It would split 10GB of CSV into multiple partitions by default, unless it's gzipped. Something else is going on here.

‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <[hidden email]> wrote:‬
I’m not Spark core developer and do not want to confuse you but it seems logical to me that just reading from single file (no matter what format of the file is used) gives no parallelism unless you do repartition by some column just after csv load, but the if you’re telling you’ve already tried repartition with no luck...


> On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> So spark by default doesn’t split the large 10gb file when loaded?
>
> Sent from my iPhone
>
>> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <[hidden email]> wrote:
>>
>> Hi, Mohammed
>> I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.
>>
>> In order to achieve better parallelism I’d suggest to split the csv file.
>>

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

ayan guha
Best case is use dataframe and df.columns will automatically give you column names. Are you sure your file is indeed in csv? maybe it is easier if you share the code?

On Wed, 24 Mar 2021 at 2:12 pm, Sean Owen <[hidden email]> wrote:
It would split 10GB of CSV into multiple partitions by default, unless it's gzipped. Something else is going on here.

‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <[hidden email]> wrote:‬
I’m not Spark core developer and do not want to confuse you but it seems logical to me that just reading from single file (no matter what format of the file is used) gives no parallelism unless you do repartition by some column just after csv load, but the if you’re telling you’ve already tried repartition with no luck...


> On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> So spark by default doesn’t split the large 10gb file when loaded?
>
> Sent from my iPhone
>
>> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <[hidden email]> wrote:
>>
>> Hi, Mohammed
>> I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.
>>
>> In order to achieve better parallelism I’d suggest to split the csv file.
>>

--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

Mich Talebzadeh
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 03:24, ayan guha <[hidden email]> wrote:
Best case is use dataframe and df.columns will automatically give you column names. Are you sure your file is indeed in csv? maybe it is easier if you share the code?

On Wed, 24 Mar 2021 at 2:12 pm, Sean Owen <[hidden email]> wrote:
It would split 10GB of CSV into multiple partitions by default, unless it's gzipped. Something else is going on here.

‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <[hidden email]> wrote:‬
I’m not Spark core developer and do not want to confuse you but it seems logical to me that just reading from single file (no matter what format of the file is used) gives no parallelism unless you do repartition by some column just after csv load, but the if you’re telling you’ve already tried repartition with no luck...


> On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed <[hidden email]> wrote:
>
> So spark by default doesn’t split the large 10gb file when loaded?
>
> Sent from my iPhone
>
>> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <[hidden email]> wrote:
>>
>> Hi, Mohammed
>> I think that the reason that only one executor is running and have single partition is because you have single file that might be read/loaded into memory.
>>
>> In order to achieve better parallelism I’d suggest to split the csv file.
>>

--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

srowen
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <[hidden email]> wrote:
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

Mich Talebzadeh
How does Spark establish there is a csv header as a matter of interest?

Example

val df = spark.read.option("header", true).csv(location)

I need to tell spark to ignore the header correct?


If you have a header with column names on file, you need to explicitly specify true for header option using option("header",true) not mentioning this, the API treats header as a data record.

Second point which may not be applicable to the newer versions of Spark. My understanding is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 12:40, Sean Owen <[hidden email]> wrote:
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <[hidden email]> wrote:
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

khajaasmath786
Thanks Mich. I understood what I am supposed to do now, will try these options. 

I still dont understand how the spark will split the large file. I have a 10 GB file which I want to split automatically after reading. I can split and load the file before reading but it is a very big requirement change for all our data pipeline.

Is there a way to split the file once it is read to achieve parallelism ?  I will group groupby on one column to see if that improves my job. 

On Wed, Mar 24, 2021 at 10:56 AM Mich Talebzadeh <[hidden email]> wrote:
How does Spark establish there is a csv header as a matter of interest?

Example

val df = spark.read.option("header", true).csv(location)

I need to tell spark to ignore the header correct?


If you have a header with column names on file, you need to explicitly specify true for header option using option("header",true) not mentioning this, the API treats header as a data record.

Second point which may not be applicable to the newer versions of Spark. My understanding is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 12:40, Sean Owen <[hidden email]> wrote:
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <[hidden email]> wrote:
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

srowen
In reply to this post by Mich Talebzadeh
Right, that's all you do to tell it to treat the first line of the files as a header defining col names.
Yes, .gz files still aren't splittable by nature. One huge CSV .csv file would be split into partitions, but one .gz file would not, which can be a problem.
To be clear, you do not need to do anything to let Spark read parts of a large file in parallel (assuming compression isn't the issue).

On Wed, Mar 24, 2021 at 11:00 AM Mich Talebzadeh <[hidden email]> wrote:
How does Spark establish there is a csv header as a matter of interest?

Example

val df = spark.read.option("header", true).csv(location)

I need to tell spark to ignore the header correct?


If you have a header with column names on file, you need to explicitly specify true for header option using option("header",true) not mentioning this, the API treats header as a data record.

Second point which may not be applicable to the newer versions of Spark. My understanding is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 12:40, Sean Owen <[hidden email]> wrote:
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <[hidden email]> wrote:
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

ayan guha
Hi

"I still dont understand how the spark will split the large file" -- This is actually achieved by something called InputFormat, which in turn depends on what type of file it is and what is the block size. Ex: If you have block size of 64MB, then a 10GB file will roughly translate to 10240/64 = 160 partitions. (Roughly because line boundaries are taken into account). Spark launches 1 task for each partitions, so you should see 160 tasks created. 

Because .gz is not splittable, Spark uses a different InputFormat, and hence number of tasks are same as number of files, not per split (aka partitions). Hence, a 10GB .gz file will incur only 1 task.

Now these tasks are unit of parallelism and they can be run in parallel. You can roughly translate this to number of cores available to the cluster during reading of the file. How many cores are available? Well that depends how are you launching the job. Ex: If you are launching like local(*) that means you want all of you local cores to be used. 

In a distributed setting, you can ask Spark to group cores (and RAM) and that is called an executor. Each executor can have 1 or more cores (SparkConf driven). So each executor takes some of the tasks created above and runs them in parallel.  Thats what you see in the Spark UI Executor Page. 

So depending on how you are launching the job, you should see 
(a) How many executors are running and with how many cores 
(b) How many tasks are scheduled to run
(c) Which executor is running those tasks 

As a framework, Spark does all of these without you need to do anything, as Sean said above. The question is why then you see no parallelism? Well, hope these pointers leads you to atleast look at the right places. Please share the format of the file, how are you launching the job and if possible screenshots of Spark UI pages and I am sure good people of this forum will help you out. 

HTH....

On Thu, Mar 25, 2021 at 3:54 AM Sean Owen <[hidden email]> wrote:
Right, that's all you do to tell it to treat the first line of the files as a header defining col names.
Yes, .gz files still aren't splittable by nature. One huge CSV .csv file would be split into partitions, but one .gz file would not, which can be a problem.
To be clear, you do not need to do anything to let Spark read parts of a large file in parallel (assuming compression isn't the issue).

On Wed, Mar 24, 2021 at 11:00 AM Mich Talebzadeh <[hidden email]> wrote:
How does Spark establish there is a csv header as a matter of interest?

Example

val df = spark.read.option("header", true).csv(location)

I need to tell spark to ignore the header correct?


If you have a header with column names on file, you need to explicitly specify true for header option using option("header",true) not mentioning this, the API treats header as a data record.

Second point which may not be applicable to the newer versions of Spark. My understanding is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 12:40, Sean Owen <[hidden email]> wrote:
No need to do that. Reading the header with Spark automatically is trivial.

On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <[hidden email]> wrote:
If it is a csv then it is a flat file somewhere in a directory I guess.

Get the header out by doing

/usr/bin/zcat csvfile.gz |head -n 1
Title Number,Tenure,Property Address,District,County,Region,Postcode,Multiple Address Indicator,Price Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company Registration No. (2),Proprietorship Category (2),Country Incorporated (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2) Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company Registration No. (4),Proprietorship Category (4),Country Incorporated (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) Address (3),Date Proprietor Added,Additional Proprietor Indicator


10GB is not much of a big CSV file

that will resolve the header anyway.


Also how are you running the spark, in a local mode (single jvm) or other distributed modes (yarn, standalone) ?


HTH



--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

Mich Talebzadeh
In reply to this post by khajaasmath786
Hi Asmath,

Have you actually managed to run this single file? Because Spark (as brought up a few times already) will pull the whole of the GZ file in a single partition in the driver, and can get an out of memory error.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 01:19, KhajaAsmath Mohammed <[hidden email]> wrote:
Hi,

I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.

I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.

I am
Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.

Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .

Thanks,
Asmath

Sent from my iPhone
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

khajaasmath786
Hi Mich,

Yes you are right. We were getting gz files and this is causing the issue. I will be changing it to bzip or other splittable formats and try running it again today. 

Thanks,
Asmath

Sent from my iPhone

On Mar 25, 2021, at 6:51 AM, Mich Talebzadeh <[hidden email]> wrote:


Hi Asmath,

Have you actually managed to run this single file? Because Spark (as brought up a few times already) will pull the whole of the GZ file in a single partition in the driver, and can get an out of memory error.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 01:19, KhajaAsmath Mohammed <[hidden email]> wrote:
Hi,

I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.

I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.

I am
Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.

Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .

Thanks,
Asmath

Sent from my iPhone
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Rdd - zip with index

Mich Talebzadeh
Pretty easy if you do it efficiently


gunzip --to-stdout csvfile.gz | bzip2 > csvfile.bz2


Just create a simple bash file to do it and print timings

cat convert_file.sh

#!/bin/bash
GZFILE="csvfile.gz"
FILE_NAME=`basename $GZFILE .gz`
BZFILE="$FILE_NAME.bz2"
echo `date` " ""=======  Started compressing file $GZFILE  ======"
gunzip --to-stdout $GZFILE | bzip2 > $BZFILE
if [ $? != 0 ]
then
  echo `date` " ""======= Could not process GZFILE, aborting ======"
  exit 1
else
  echo `date` " ""======= bz2 file $BZFILE created OK ======"
  ls -ltr $BZFILE
  exit 0
fi

./convert_file.sh
Thu Mar 25 12:51:55 GMT 2021  =======  Started compressing file csvfile.gz  ======
Thu Mar 25 12:52:00 GMT 2021  ======= bz2 file csvfile.bz2 created OK ======
-rw-r--r-- 1 hduser hadoop 4558525 Mar 25 12:52 csvfile.bz2

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 25 Mar 2021 at 12:16, KhajaAsmath Mohammed <[hidden email]> wrote:
Hi Mich,

Yes you are right. We were getting gz files and this is causing the issue. I will be changing it to bzip or other splittable formats and try running it again today. 

Thanks,
Asmath

Sent from my iPhone

On Mar 25, 2021, at 6:51 AM, Mich Talebzadeh <[hidden email]> wrote:


Hi Asmath,

Have you actually managed to run this single file? Because Spark (as brought up a few times already) will pull the whole of the GZ file in a single partition in the driver, and can get an out of memory error.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 24 Mar 2021 at 01:19, KhajaAsmath Mohammed <[hidden email]> wrote:
Hi,

I have 10gb file that should be loaded into spark dataframe. This file is csv with header and we were using rdd.zipwithindex to get column names and convert to avro accordingly.

I am assuming this is taking long time and only executor runs and never achieves parallelism. Is there a easy way to achieve parallelism after filtering out the header.

I am
Also interested in solution that can remove header from the file and I can give my own schema. This way I can split the files.

Rdd.partitions is always 1 for this even after repartitioning the dataframe after zip with index . Any help on this topic please .

Thanks,
Asmath

Sent from my iPhone
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]