How does preprocessing fit into Spark MLlib pipeline

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

How does preprocessing fit into Spark MLlib pipeline

aATv
This post has NOT been accepted by the mailing list yet.
I want to start using PySpark Mllib pipelines, but I don't understand how/where preprocessing fits into the pipeline.

My preprocessing steps are generally in the following form:
   1) Load log files(from s3) and parse into a spark Dataframe with columns user_id, event_type, timestamp, etc
   2) Group by a column, then pivot and count another column
      - e.g. df.groupby("user_id").pivot("event_type").count()
      - We can think of the columns that this creates besides user_id as features, where the number of each event type is a different feature
   3) Join the data from step 1 with other metadata, usually stored in Cassandra. Then perform a transformation similar to one from step 2), where the column that is pivoted and counted is a column that came from the data stored in Cassandra.

After this preprocessing, I would use transformers to create other features and feed it into a model, lets say Logistic Regression for example.

I would like to make at lease step 2 a custom transformer and add that to a pipeline, but it doesn't fit the transformer abstraction. This is because it takes a single input column and outputs multiple columns.  It also has a different number of input rows than output rows due to the group by operation.

Given that, how do I fit this into a Mllib pipeline, and it if doesn't fit as part of a pipeline, what is the best way to include it in my code so that it can easily be reused both for training and testing, as well as in production.

I'm using pyspark 2.1 and here is an example of 2)

df = spark.createDataFrame([("id1", "jump"),("id1","jump"),("id1", "run"),("id2", "jump"),("id2", "fly"),("id1", "fly"),("id1", "fly")], ("user_id", "event_type"))

df.show()
+-------+----------+
|user_id|event_type|
+-------+----------+
|    id1|      jump|
|    id1|      jump|
|    id1|       run|
|    id2|      jump|
|    id2|       fly|
|    id1|       fly|
|    id1|       fly|
+-------+----------+

df.groupby("user_id").pivot("event_type").count().show()
+-------+---+----+----+
|user_id|fly|jump| run|
+-------+---+----+----+
|    id1|  2|   2|   1|
|    id2|  1|   1|null|
+-------+---+----+----+


Note: My question is in some way related to this question, but I don't think it is answered here:
http://apache-spark-developers-list.1001551.n3.nabble.com/Why-can-t-a-Transformer-have-multiple-output-columns-td18689.html

Thanks
Adrian
Loading...