Building SparkML vectors from long data

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Building SparkML vectors from long data

Patrick McCarthy
I work with a lot of data in a long format, cases in which an ID column is repeated, followed by a variable and a value column like so:

+---+-----+-------+
|ID | var | value |
+---+-----+-------+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-----+-------+

It seems to me that Spark doesn't provide any clear native way to transform data of this format into a Vector() or VectorUDT() type suitable for machine learning algorithms. 

The best solution I've found so far (which isn't very good) is to group by ID, perform a collect_list, and then use a UDF to translate the resulting array into a vector datatype. 

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need to be parsed. 

What's the current preferred way to solve a problem like this?
Reply | Threaded
Open this post in threaded view
|

Re: Building SparkML vectors from long data

Nathan Kronenfeld-2
I don't know if this is the best way or not, but:
val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
val indexModel = indexer.fit(data)
val indexedData = indexModel.transform(data)
val variables = indexModel.labels.length

val toSeq = udf((a: Double, b: Double) => Seq(a, b))
val toVector = udf((seq: Seq[Seq[Double]]) => {
new SparseVector(variables, seq.map(_(0).toInt).toArray, seq.map(_(1)).toArray)
})
val result = indexedData
.withColumn("val", toSeq(col("vrIdx"), col("value")))
.groupBy("ID")
.agg(collect_set(col("val")).name("collected_val"))
.withColumn("collected_val", toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))

at least works.  The indices still aren't in order in the vector - I don't know if this matters much, but if it does, it's easy enough to sort them in toVector (and to remove duplicates)


On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy <[hidden email]> wrote:
I work with a lot of data in a long format, cases in which an ID column is repeated, followed by a variable and a value column like so:

+---+-----+-------+
|ID | var | value |
+---+-----+-------+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-----+-------+

It seems to me that Spark doesn't provide any clear native way to transform data of this format into a Vector() or VectorUDT() type suitable for machine learning algorithms. 

The best solution I've found so far (which isn't very good) is to group by ID, perform a collect_list, and then use a UDF to translate the resulting array into a vector datatype. 

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need to be parsed. 

What's the current preferred way to solve a problem like this?

Reply | Threaded
Open this post in threaded view
|

Re: Building SparkML vectors from long data

Patrick McCarthy-2
I'm still validating my results, but my solution for the moment looks like the below. I'm presently dealing with one-hot encoded values, so all the numbers in my array are 1:

def udfMaker(feature_len):
   
    return F.udf(lambda x: SparseVector(feature_len, sorted(x), [1.0]*len(x)), VectorUDT())

indexer = StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df)

makeVec = udfMaker(len(indexer.labels))

indexed_data = indexer.transform(source_df)

sparse_content = (indexed_data.groupBy('ID').
    .agg(F.collect_set('indexedContent').alias('contentIdx'))
    .withColumn('content', makeVec(F.col('contentIdx')))
    .drop('contentIdx')
)

On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld <[hidden email]> wrote:
I don't know if this is the best way or not, but:
val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
val indexModel = indexer.fit(data)
val indexedData = indexModel.transform(data)
val variables = indexModel.labels.length

val toSeq = udf((a: Double, b: Double) => Seq(a, b))
val toVector = udf((seq: Seq[Seq[Double]]) => {
new SparseVector(variables, seq.map(_(0).toInt).toArray, seq.map(_(1)).toArray)
})
val result = indexedData
.withColumn("val", toSeq(col("vrIdx"), col("value")))
.groupBy("ID")
.agg(collect_set(col("val")).name("collected_val"))
.withColumn("collected_val", toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))

at least works.  The indices still aren't in order in the vector - I don't know if this matters much, but if it does, it's easy enough to sort them in toVector (and to remove duplicates)


On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy <[hidden email]> wrote:
I work with a lot of data in a long format, cases in which an ID column is repeated, followed by a variable and a value column like so:

+---+-----+-------+
|ID | var | value |
+---+-----+-------+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-----+-------+

It seems to me that Spark doesn't provide any clear native way to transform data of this format into a Vector() or VectorUDT() type suitable for machine learning algorithms. 

The best solution I've found so far (which isn't very good) is to group by ID, perform a collect_list, and then use a UDF to translate the resulting array into a vector datatype. 

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need to be parsed. 

What's the current preferred way to solve a problem like this?