Building SparkML vectors from long data

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Building SparkML vectors from long data

Patrick McCarthy
I work with a lot of data in a long format, cases in which an ID column is repeated, followed by a variable and a value column like so:

+---+-----+-------+
|ID | var | value |
+---+-----+-------+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-----+-------+

It seems to me that Spark doesn't provide any clear native way to transform data of this format into a Vector() or VectorUDT() type suitable for machine learning algorithms. 

The best solution I've found so far (which isn't very good) is to group by ID, perform a collect_list, and then use a UDF to translate the resulting array into a vector datatype. 

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need to be parsed. 

What's the current preferred way to solve a problem like this?
Reply | Threaded
Open this post in threaded view
|

Re: Building SparkML vectors from long data

Nathan Kronenfeld-2
I don't know if this is the best way or not, but:
val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
val indexModel = indexer.fit(data)
val indexedData = indexModel.transform(data)
val variables = indexModel.labels.length

val toSeq = udf((a: Double, b: Double) => Seq(a, b))
val toVector = udf((seq: Seq[Seq[Double]]) => {
new SparseVector(variables, seq.map(_(0).toInt).toArray, seq.map(_(1)).toArray)
})
val result = indexedData
.withColumn("val", toSeq(col("vrIdx"), col("value")))
.groupBy("ID")
.agg(collect_set(col("val")).name("collected_val"))
.withColumn("collected_val", toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))

at least works.  The indices still aren't in order in the vector - I don't know if this matters much, but if it does, it's easy enough to sort them in toVector (and to remove duplicates)


On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy <[hidden email]> wrote:
I work with a lot of data in a long format, cases in which an ID column is repeated, followed by a variable and a value column like so:

+---+-----+-------+
|ID | var | value |
+---+-----+-------+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-----+-------+

It seems to me that Spark doesn't provide any clear native way to transform data of this format into a Vector() or VectorUDT() type suitable for machine learning algorithms. 

The best solution I've found so far (which isn't very good) is to group by ID, perform a collect_list, and then use a UDF to translate the resulting array into a vector datatype. 

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need to be parsed. 

What's the current preferred way to solve a problem like this?