Spark parse fixed length file [Java]

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark parse fixed length file [Java]

lsn24
Hello,

 We are running into issues while trying to process fixed length files using
spark.

The approach we took is as follows:

1. Read the .bz2 file  into a dataset from hdfs using
spark.read().textFile() API.Create a temporary view.

     Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
     rawDataset.createOrReplaceTempView(tempView);

2. Run a sql query on the view, to slice and dice the data the way we need
it (using substring).

 (SELECT
                     TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
                     TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
                     TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
                     TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
                     CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
                     CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
                     CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
                     CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
                     TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
                     TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
                     TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
                     TRIM(SUBSTRING(value,161 ,19)) AS record12,
                     TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
                     TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
                     TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
                     CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
                     CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
                     FROM tempView)

3.Write the output of sql query to a parquet file.
     loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);

Problem :

  The step #2 takes a longer time , if the length of line is ~2000
characters. If each line in the file is only 1000 characters then it takes
only 4 minutes to process 20 million lines. If we increase the line length
to 2000 characters it takes 20 minutes to process 20 million lines.


Is there a better way in spark to parse fixed length lines?


*Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark parse fixed length file [Java]

geoHeil
I am not 100% sure if spark is smart enough to achieve this using a single pass over the data. If not you could create a java udf for this which correctly parses all the columns at once.


Otherwise you could enable Tungsten off heap memory which might speed things up.
lsn24 <[hidden email]> schrieb am Fr. 13. Apr. 2018 um 19:02:
Hello,

 We are running into issues while trying to process fixed length files using
spark.

The approach we took is as follows:

1. Read the .bz2 file  into a dataset from hdfs using
spark.read().textFile() API.Create a temporary view.

     Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
     rawDataset.createOrReplaceTempView(tempView);

2. Run a sql query on the view, to slice and dice the data the way we need
it (using substring).

 (SELECT
                     TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
                     TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
                     TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
                     TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
                     CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
                     CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
                     CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
                     CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
                     TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
                     TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
                     TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
                     TRIM(SUBSTRING(value,161 ,19)) AS record12,
                     TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
                     TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
                     TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
                     CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
                     CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
                     FROM tempView)

3.Write the output of sql query to a parquet file.
     loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);

Problem :

  The step #2 takes a longer time , if the length of line is ~2000
characters. If each line in the file is only 1000 characters then it takes
only 4 minutes to process 20 million lines. If we increase the line length
to 2000 characters it takes 20 minutes to process 20 million lines.


Is there a better way in spark to parse fixed length lines?


*Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark parse fixed length file [Java]

JayeshLalwani
In reply to this post by lsn24
Is your input data partitioned? How much memory have you assigned to your executor? Have you looked at how much time is being spent in GC in the executor? Is Spark spilling the data into disk?

It is likely that the partition is too big. Spark tries to read the whole partition into the memory of one executor node.  If the partition is too big, it might be causing Spark to run out of memory. One of the side effects of how the JVM does garbage collection is that when applications use too much memory, they just might run very slowly instead of crashing.

If the problem is that the partition is too big, increasing executor memory, or reducing size of partition will do the trick
On 4/13/18, 1:03 PM, "lsn24" <[hidden email]> wrote:

    Hello,
   
     We are running into issues while trying to process fixed length files using
    spark.
   
    The approach we took is as follows:
   
    1. Read the .bz2 file  into a dataset from hdfs using
    spark.read().textFile() API.Create a temporary view.
   
         Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
         rawDataset.createOrReplaceTempView(tempView);
   
    2. Run a sql query on the view, to slice and dice the data the way we need
    it (using substring).
   
     (SELECT
                         TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
                         TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
                         TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
                         TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
                         CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
                         CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
                         CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
                         CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
                         TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
                         TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
                         TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
                         TRIM(SUBSTRING(value,161 ,19)) AS record12,
                         TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
                         TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
                         TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
                         CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
                         CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
                         FROM tempView)
   
    3.Write the output of sql query to a parquet file.
         loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);
   
    Problem :
   
      The step #2 takes a longer time , if the length of line is ~2000
    characters. If each line in the file is only 1000 characters then it takes
    only 4 minutes to process 20 million lines. If we increase the line length
    to 2000 characters it takes 20 minutes to process 20 million lines.
   
   
    Is there a better way in spark to parse fixed length lines?
   
   
    *Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.
   
   
   
   
    --
    Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=F2RNeGILvLdBxn7RJ4effes_QFIiEsoVM2rPi9qX1DKow5HQSjq0_WhIW109SXQ4&m=ORIxa-UsHhr60x9Hbkh4BdKPgKY-bRmOtdbI7O2kaD8&s=McmJKRfol4LsD8u1kSP3gNdK5tMH8fDPvPKKRpOUQ1w&e=
   
    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]
   
   

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Spark parse fixed length file [Java]

lsn24
In reply to this post by geoHeil
I was able to solve it by writing a java method (to slice and dice data) and
invoking the method/function from spark.map. This transformed the data way
faster than my previous approach.

Thanks geoHeil for the pointer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark parse fixed length file [Java]

lsn24
In reply to this post by JayeshLalwani
Thanks for the response JayeshLalwani. Clearly in my case the issue was with
my approach, not with the memory.

The job was taking much longer time even for smaller dataset.

Thanks again!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]