Loading a large parquet file how much memory do I need

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Loading a large parquet file how much memory do I need

Alexander Czech
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks
Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

geoHeil
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?
Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks
Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Alexander Czech
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks


Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Gourav Sengupta
Hi,

it would be much simpler in case you just provide two tables with the samples of input and output. Going through the verbose text and trying to read and figure out what is happening is a bit daunting. 

Personally, given that you have your entire data in Parquet, I do not think that you will need to have a large cluster size at all. You can do it with a small size cluster as well, but depending on the cluster size, you might want to create intermediate staging tables or persist the data. 

Also it will be of help if you could kindly provide the EMR version that you are using.


On another note also mention the AWS Region you are in. If Redshift Spectrum is available, or you can use Athena, or you can use Presto, then running massive aggregates over huge data sets at fraction of cost and at least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks



Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Alexander Czech
I don't use EMR I spin my clusters up using flintrock (beeing a student my budget is slim), my code is writen in pyspark and my data is in the us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+-----------+--------------------------+-------+------+-------+
|        uri|                 link_list|lang_id|vector|content|
+-----------+--------------------------+-------+------+-------+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|

(link_list is a  ArrayType(StringType()))

Step1 : I only load the uri and link_list columns (but they make up the bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id', func.monotonically_increasing_id())
resulting in a dataframe looking like this

DF_A:
+-----------+--------------------------+-------+
|        uri|                 link_list| uri_id|
+-----------+--------------------------+-------+
|www.123.com|[www.123.com,www.abc.com,]|      1|
|www.abc.com|[www.opq.com,www.456.com,]|      2|
|www.456.com|[www.xyz.com,www.abc.com,]|      3|

Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields 

DF_B
:

+-----------+--------+ | uri| link_id| +-----------+--------+ |www.123.com| 1| |www.abc.com| 2| |www.456.com| 3|
Step 3: Now I exploded the link_list field in DF_A with  DF_A.select("uri_id", func.explode("link_list").alias("link"))
This gives me

DF_C
:
+-----------+-------+ | link| uri_id| +-----------+-------+
|
www.123.com| 1|
|
www.abc.com| 1|
|
www.opq.com| 2|
|
www.456.com| 2|
|www.xyz.com| 3|
|www.abc.com| 3|


Lastly I Join DF_B DF_C DF_C.join(DF_B, DF_C.link==DF_B.uri, "left_outer").drop("uri") Which results in the final dataframe:


+-----------+-------+--------+ | link| uri_id| link_id| +-----------+-------+--------+
|
www.123.com| 1| 1|
|
www.abc.com| 1| 2|
|
www.opq.com| 2| null|
|
www.456.com| 2| 3|
|www.xyz.com| 3| null|
|www.abc.com| 3| 1|

(in code the field link is also dropped but this makes it hopefully more intelligible this way)

the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it. 
I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.

thanks!

On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

it would be much simpler in case you just provide two tables with the samples of input and output. Going through the verbose text and trying to read and figure out what is happening is a bit daunting. 

Personally, given that you have your entire data in Parquet, I do not think that you will need to have a large cluster size at all. You can do it with a small size cluster as well, but depending on the cluster size, you might want to create intermediate staging tables or persist the data. 

Also it will be of help if you could kindly provide the EMR version that you are using.


On another note also mention the AWS Region you are in. If Redshift Spectrum is available, or you can use Athena, or you can use Presto, then running massive aggregates over huge data sets at fraction of cost and at least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks




Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Gourav Sengupta
Hi,

I think that I have mentioned all the required alternatives. However I am quite curious as to how did you conclude that processing using EMR is going to be more expensive than using any other stack. I have been using EMR since last 6 years (almost about the time it came out), and have always found it cheap, reliable, safe and stable (ofcourse its like fire, if you are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <[hidden email]> wrote:
I don't use EMR I spin my clusters up using flintrock (beeing a student my budget is slim), my code is writen in pyspark and my data is in the us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+-----------+--------------------------+-------+------+-------+
|        uri|                 link_list|lang_id|vector|content|
+-----------+--------------------------+-------+------+-------+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|

(link_list is a  ArrayType(StringType()))

Step1 : I only load the uri and link_list columns (but they make up the bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id', func.monotonically_increasing_id())
resulting in a dataframe looking like this

DF_A:
+-----------+--------------------------+-------+
|        uri|                 link_list| uri_id|
+-----------+--------------------------+-------+
|www.123.com|[www.123.com,www.abc.com,]|      1|
|www.abc.com|[www.opq.com,www.456.com,]|      2|
|www.456.com|[www.xyz.com,www.abc.com,]|      3|

Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields 

DF_B
:

+-----------+--------+ | uri| link_id| +-----------+--------+ |www.123.com| 1| |www.abc.com| 2| |www.456.com| 3|
Step 3: Now I exploded the link_list field in DF_A with  DF_A.select("uri_id", func.explode("link_list").alias("link"))
This gives me

DF_C
:
+-----------+-------+ | link| uri_id| +-----------+-------+
|
www.123.com| 1|
|
www.abc.com| 1|
|
www.opq.com| 2|
|
www.456.com| 2|
|www.xyz.com| 3|
|www.abc.com| 3|


Lastly I Join DF_B DF_C DF_C.join(DF_B, DF_C.link==DF_B.uri, "left_outer").drop("uri") Which results in the final dataframe:


+-----------+-------+--------+ | link| uri_id| link_id| +-----------+-------+--------+
|
www.123.com| 1| 1|
|
www.abc.com| 1| 2|
|
www.opq.com| 2| null|
|
www.456.com| 2| 3|
|www.xyz.com| 3| null|
|www.abc.com| 3| 1|

(in code the field link is also dropped but this makes it hopefully more intelligible this way)

the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it. 
I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.

thanks!

On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

it would be much simpler in case you just provide two tables with the samples of input and output. Going through the verbose text and trying to read and figure out what is happening is a bit daunting. 

Personally, given that you have your entire data in Parquet, I do not think that you will need to have a large cluster size at all. You can do it with a small size cluster as well, but depending on the cluster size, you might want to create intermediate staging tables or persist the data. 

Also it will be of help if you could kindly provide the EMR version that you are using.


On another note also mention the AWS Region you are in. If Redshift Spectrum is available, or you can use Athena, or you can use Presto, then running massive aggregates over huge data sets at fraction of cost and at least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks





Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Alexander Czech
Hi
Simply because you have to pay on top of every instance hour. I currently need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be 864$ just in EMR costs (spot prices are around 0.12$/h).

Just to stay on topic I thought about getting 40 i2.xlarge instances which have about 1TB of combined ram and 32TB of combined SSD space would this be enough to load a 10TB parquet or do I need more RAM/Disk spill space?

On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

I think that I have mentioned all the required alternatives. However I am quite curious as to how did you conclude that processing using EMR is going to be more expensive than using any other stack. I have been using EMR since last 6 years (almost about the time it came out), and have always found it cheap, reliable, safe and stable (ofcourse its like fire, if you are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <[hidden email]> wrote:
I don't use EMR I spin my clusters up using flintrock (beeing a student my budget is slim), my code is writen in pyspark and my data is in the us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+-----------+--------------------------+-------+------+-------+
|        uri|                 link_list|lang_id|vector|content|
+-----------+--------------------------+-------+------+-------+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|

(link_list is a  ArrayType(StringType()))

Step1 : I only load the uri and link_list columns (but they make up the bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id', func.monotonically_increasing_id())
resulting in a dataframe looking like this

DF_A:
+-----------+--------------------------+-------+
|        uri|                 link_list| uri_id|
+-----------+--------------------------+-------+
|www.123.com|[www.123.com,www.abc.com,]|      1|
|www.abc.com|[www.opq.com,www.456.com,]|      2|
|www.456.com|[www.xyz.com,www.abc.com,]|      3|

Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields 

DF_B
:

+-----------+--------+ | uri| link_id| +-----------+--------+ |www.123.com| 1| |www.abc.com| 2| |www.456.com| 3|
Step 3: Now I exploded the link_list field in DF_A with  DF_A.select("uri_id", func.explode("link_list").alias("link"))
This gives me

DF_C
:
+-----------+-------+ | link| uri_id| +-----------+-------+
|
www.123.com| 1|
|
www.abc.com| 1|
|
www.opq.com| 2|
|
www.456.com| 2|
|www.xyz.com| 3|
|www.abc.com| 3|


Lastly I Join DF_B DF_C DF_C.join(DF_B, DF_C.link==DF_B.uri, "left_outer").drop("uri") Which results in the final dataframe:


+-----------+-------+--------+ | link| uri_id| link_id| +-----------+-------+--------+
|
www.123.com| 1| 1|
|
www.abc.com| 1| 2|
|
www.opq.com| 2| null|
|
www.456.com| 2| 3|
|www.xyz.com| 3| null|
|www.abc.com| 3| 1|

(in code the field link is also dropped but this makes it hopefully more intelligible this way)

the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it. 
I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.

thanks!

On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

it would be much simpler in case you just provide two tables with the samples of input and output. Going through the verbose text and trying to read and figure out what is happening is a bit daunting. 

Personally, given that you have your entire data in Parquet, I do not think that you will need to have a large cluster size at all. You can do it with a small size cluster as well, but depending on the cluster size, you might want to create intermediate staging tables or persist the data. 

Also it will be of help if you could kindly provide the EMR version that you are using.


On another note also mention the AWS Region you are in. If Redshift Spectrum is available, or you can use Athena, or you can use Presto, then running massive aggregates over huge data sets at fraction of cost and at least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks






Reply | Threaded
Open this post in threaded view
|

Re: Loading a large parquet file how much memory do I need

Gourav Sengupta
Hi,

10 TB in Athena would cost $50. If your data is in Parquet, then it will cost even less because of columnar striping. So I am genuinely not quite sure what you are speaking about? Also what do you mean by "I currently need"? Are you already processing the data?

Since you mentioned that you are a student, may I please ask which University and College you are studying at? I may provide some additional information regarding the same.


Regards,
Gourav Sengupta


On Mon, Nov 27, 2017 at 7:55 PM, Alexander Czech <[hidden email]> wrote:
Hi
Simply because you have to pay on top of every instance hour. I currently need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be 864$ just in EMR costs (spot prices are around 0.12$/h).

Just to stay on topic I thought about getting 40 i2.xlarge instances which have about 1TB of combined ram and 32TB of combined SSD space would this be enough to load a 10TB parquet or do I need more RAM/Disk spill space?

On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

I think that I have mentioned all the required alternatives. However I am quite curious as to how did you conclude that processing using EMR is going to be more expensive than using any other stack. I have been using EMR since last 6 years (almost about the time it came out), and have always found it cheap, reliable, safe and stable (ofcourse its like fire, if you are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <[hidden email]> wrote:
I don't use EMR I spin my clusters up using flintrock (beeing a student my budget is slim), my code is writen in pyspark and my data is in the us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+-----------+--------------------------+-------+------+-------+
|        uri|                 link_list|lang_id|vector|content|
+-----------+--------------------------+-------+------+-------+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|

(link_list is a  ArrayType(StringType()))

Step1 : I only load the uri and link_list columns (but they make up the bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id', func.monotonically_increasing_id())
resulting in a dataframe looking like this

DF_A:
+-----------+--------------------------+-------+
|        uri|                 link_list| uri_id|
+-----------+--------------------------+-------+
|www.123.com|[www.123.com,www.abc.com,]|      1|
|www.abc.com|[www.opq.com,www.456.com,]|      2|
|www.456.com|[www.xyz.com,www.abc.com,]|      3|

Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields 

DF_B
:

+-----------+--------+ | uri| link_id| +-----------+--------+ |www.123.com| 1| |www.abc.com| 2| |www.456.com| 3|
Step 3: Now I exploded the link_list field in DF_A with  DF_A.select("uri_id", func.explode("link_list").alias("link"))
This gives me

DF_C
:
+-----------+-------+ | link| uri_id| +-----------+-------+
|
www.123.com| 1|
|
www.abc.com| 1|
|
www.opq.com| 2|
|
www.456.com| 2|
|www.xyz.com| 3|
|www.abc.com| 3|


Lastly I Join DF_B DF_C DF_C.join(DF_B, DF_C.link==DF_B.uri, "left_outer").drop("uri") Which results in the final dataframe:


+-----------+-------+--------+ | link| uri_id| link_id| +-----------+-------+--------+
|
www.123.com| 1| 1|
|
www.abc.com| 1| 2|
|
www.opq.com| 2| null|
|
www.456.com| 2| 3|
|www.xyz.com| 3| null|
|www.abc.com| 3| 1|

(in code the field link is also dropped but this makes it hopefully more intelligible this way)

the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it. 
I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.

thanks!

On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <[hidden email]> wrote:
Hi,

it would be much simpler in case you just provide two tables with the samples of input and output. Going through the verbose text and trying to read and figure out what is happening is a bit daunting. 

Personally, given that you have your entire data in Parquet, I do not think that you will need to have a large cluster size at all. You can do it with a small size cluster as well, but depending on the cluster size, you might want to create intermediate staging tables or persist the data. 

Also it will be of help if you could kindly provide the EMR version that you are using.


On another note also mention the AWS Region you are in. If Redshift Spectrum is available, or you can use Athena, or you can use Presto, then running massive aggregates over huge data sets at fraction of cost and at least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <[hidden email]> wrote:
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The bulk of data is in url_list and at the moment I can only guess how large url_list is. I want to give an ID to every url and then this ID to every url in url_list to have a ID to ID graph.The columns language,vector and text only have values for 1% of all rows so they only play a very minor roll.

The idea at the moment is to load the URL and URL_list column from the parquet and give ever row an ID. Then exploded the URL_list and join the IDs to this on the now exploded rows. After that I drop the URLs from URL_list column. For the rest of the computation I only load those rows from the parquet that have values in (language,vector and text) and join them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <[hidden email]> wrote:
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?

Alexander Czech <[hidden email]> schrieb am Mo. 27. Nov. 2017 um 10:57:
I want to load a 10TB parquet File from S3 and I'm trying to decide what EC2 instances to use.

Should I go for instances that in total have a larger memory size than 10TB? Or is it enough that they have in total enough SSD storage so that everything can be spilled to disk?

thanks