Split a row into multiple rows Java

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Split a row into multiple rows Java

nookala
I'm trying to generate multiple rows from a single row

I have schema

Name Id Date 0100 0200 0300 0400

and would like to make it into a vertical format with schema

Name Id Date Time

I have the code below and get the error

Caused by: java.lang.RuntimeException:
org.apache.spark.sql.catalyst.expressions.GenericRow is not a valid external
type for schema of string
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

   StructType schemata = DataTypes.createStructType(                                                                        
           new StructField[]{                                                                                              
                   DataTypes.createStructField("Name", DataTypes.StringType,
false),                                      
                   DataTypes.createStructField("Id", DataTypes.StringType,
false),                                  
                    DataTypes.createStructField("Date",
DataTypes.StringType, false),                                        
                   DataTypes.createStructField("Time", DataTypes.StringType,
false)                                                                  
           }                                                                                                                
   );                                                                                                                      
   ExpressionEncoder<Row> encoder = RowEncoder.apply(schemata);                                                            
   Dataset<Row> modifiedRDD = intervalDF.flatMap(new
FlatMapFunction<Row,Row>() {                                          
        @Override                                                                                                          
       public Iterator<Row> call (Row row) throws Exception {                                                              
           List<Row> rowList = new ArrayList<Row>();                                                                        
           String[] timeList = {"0100", "0200", "0300", "0400"}
           for (String time : timeList) {                                                                                  
                                                                                                                           
               Row r1 = RowFactory.create(row.<String>getAs("sdp_id"),                                                      
                       "WGL",                                                                                              
                       row.<String>getAs("Name"),                                                                            
                       row.<String>getAs("Id"),                                                                    
                       row.<String>getAs("Date"),                                                                      
                       timeList[0],                                                                                        
                       row.<String>getAs(timeList[0]));                                                                    
                                                                                                                           
                                                                                                                           
                   //updated row by creating new Row                                                                        
                   rowList.add(RowFactory.create(r1));                                                                      
                                                                                                                           
                                                                                                                           
           }                                                                                                                
           return rowList.iterator();                                                                                      
       }                                                                                                                    
   }, encoder);                                                
modifiedRDD.write().csv("file:///Users/mod/out");                                                            



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: Split a row into multiple rows Java

Patil, Prashasth
Hi,
Have you tried using spark dataframe's Pivot feature ?


-----Original Message-----
From: nookala [mailto:[hidden email]]
Sent: Thursday, July 26, 2018 7:33 AM
To: [hidden email]
Subject: Split a row into multiple rows Java

I'm trying to generate multiple rows from a single row

I have schema

Name Id Date 0100 0200 0300 0400

and would like to make it into a vertical format with schema

Name Id Date Time

I have the code below and get the error

Caused by: java.lang.RuntimeException:
org.apache.spark.sql.catalyst.expressions.GenericRow is not a valid external type for schema of string
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

   StructType schemata = DataTypes.createStructType(
           new StructField[]{
                   DataTypes.createStructField("Name", DataTypes.StringType,
false),
                   DataTypes.createStructField("Id", DataTypes.StringType,
false),
                    DataTypes.createStructField("Date",
DataTypes.StringType, false),
                   DataTypes.createStructField("Time", DataTypes.StringType,
false)
           }
   );
   ExpressionEncoder<Row> encoder = RowEncoder.apply(schemata);
   Dataset<Row> modifiedRDD = intervalDF.flatMap(new
FlatMapFunction<Row,Row>() {
        @Override
       public Iterator<Row> call (Row row) throws Exception {
           List<Row> rowList = new ArrayList<Row>();
           String[] timeList = {"0100", "0200", "0300", "0400"}
           for (String time : timeList) {

               Row r1 = RowFactory.create(row.<String>getAs("sdp_id"),
                       "WGL",
                       row.<String>getAs("Name"),
                       row.<String>getAs("Id"),
                       row.<String>getAs("Date"),
                       timeList[0],
                       row.<String>getAs(timeList[0]));


                   //updated row by creating new Row
                   rowList.add(RowFactory.create(r1));


           }
           return rowList.iterator();
       }
   }, encoder);
modifiedRDD.write().csv("file:///Users/mod/out");



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


________________________________

The information contained in this message is intended only for the recipient, and may be a confidential attorney-client communication or may otherwise be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, please be aware that any dissemination or copying of this communication is strictly prohibited. If you have received this communication in error, please immediately notify us by replying to the message and deleting it from your computer. S&P Global Inc. reserves the right, subject to applicable local law, to monitor, review and process the content of any electronic message or information sent to or from S&P Global Inc. e-mail addresses without informing the sender or recipient of the message. By sending electronic message or information to S&P Global Inc. e-mail addresses you, as the sender, are consenting to S&P Global Inc. processing any of your personal data therein.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: Split a row into multiple rows Java

nookala
Pivot seems to do the opposite of what I want, convert rows to columns.

I was able to get this done in python, but would like to do this in Java

idfNew = idf.rdd.flatMap((lambda row: [(row.Name, row.Id, row.Date,
"0100",row.0100),(row.Name, row.Id, row.Date, "0200",row.0200),row.Name,
row.Id, row.Date, "0300",row.0300),row.Name, row.Id, row.Date,
"0400",row.0400)])).toDF()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Split a row into multiple rows Java

Anton Puzanov
you can always use array+explode, I don't know if its the most elegant/optimal solution (would be happy to hear from the experts)

code example:
//create data
Dataset<Row> test= spark.createDataFrame(Arrays.asList(new InternalData("bob", "b1", 1,2,3),
new InternalData("alive", "c1", 3,4,6),
new InternalData("eve", "e1", 7,8,9)
), InternalData.class);

+-----+---------+----+----+----+
| name|otherName|val1|val2|val3|
+-----+---------+----+----+----+
| bob| b1| 1| 2| 3|
|alive| c1| 3| 4| 6|
| eve| e1| 7| 8| 9|
+-----+---------+----+----+----+

Dataset<Row> expandedTest = test.selectExpr("name", "otherName", "explode(array(val1, val2, val3)) as time");
expandedTest.show();
+-----+---------+----+
| name|otherName|time|
+-----+---------+----+
| bob| b1| 1|
| bob| b1| 2|
| bob| b1| 3|
|alive| c1| 3|
|alive| c1| 4|
|alive| c1| 6|
| eve| e1| 7|
| eve| e1| 8|
| eve| e1| 9|
+-----+---------+----+

On Wed, Aug 1, 2018 at 11:05 PM, nookala <[hidden email]> wrote:
Pivot seems to do the opposite of what I want, convert rows to columns.

I was able to get this done in python, but would like to do this in Java

idfNew = idf.rdd.flatMap((lambda row: [(row.Name, row.Id, row.Date,
"0100",row.0100),(row.Name, row.Id, row.Date, "0200",row.0200),row.Name,
row.Id, row.Date, "0300",row.0300),row.Name, row.Id, row.Date,
"0400",row.0400)])).toDF()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Split a row into multiple rows Java

nookala
+-----+---------+----+----+----+
| name|otherName|val1|val2|val3|
+-----+---------+----+----+----+
|  bob|       b1|   1|   2|   3|
|alive|       c1|   3|   4|   6|
|  eve|       e1|   7|   8|   9|
+-----+---------+----+----+----+

I need this to become

+-----+---------+----+---------
| name|otherName|time|value
+-----+---------+----+---------
|  bob|       b1|   val1|    1
|  bob|       b1|   val2|    2
|  bob|       b1|   val3|    3
|alive|       c1|   val1|     3
|alive|       c1|   val2|     4
|alive|       c1|   val3|     6
|  eve|       e1|   val1|    7
|  eve|       e1|   val2|    8
|  eve|       e1|   val3|    9
+-----+---------+----+-----



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Split a row into multiple rows Java

Manu Zhang
The following may help although in Scala. The idea is to firstly concat each value with time, assembly all time_value into an array and explode, and finally split time_value into time and value.

 val ndf = df.select(col("name"), col("otherName"),
    explode(
      array(concat_ws(":", col("v1"), lit("v1")).alias("v1"),
        concat_ws(":", col("v2"), lit("v2")).alias("v2"),
        concat_ws(":", col("v3"), lit("v3")).alias("v3"))
    ).alias("temp"))

  val fields = split(col("temp"), ":")
  ndf.select(col("name"), col("otherName"),
    fields.getItem(1).alias("time"),
    fields.getItem(0).alias("value"))

Regards,
Manu Zhang

On Wed, Aug 8, 2018 at 11:41 AM nookala <[hidden email]> wrote:
+-----+---------+----+----+----+
| name|otherName|val1|val2|val3|
+-----+---------+----+----+----+
|  bob|       b1|   1|   2|   3|
|alive|       c1|   3|   4|   6|
|  eve|       e1|   7|   8|   9|
+-----+---------+----+----+----+

I need this to become

+-----+---------+----+---------
| name|otherName|time|value
+-----+---------+----+---------
|  bob|       b1|   val1|    1
|  bob|       b1|   val2|    2
|  bob|       b1|   val3|    3
|alive|       c1|   val1|     3
|alive|       c1|   val2|     4
|alive|       c1|   val3|     6
|  eve|       e1|   val1|    7
|  eve|       e1|   val2|    8
|  eve|       e1|   val3|    9
+-----+---------+----+-----



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]