compile error: No classtag available while calling RDD.zip()

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

compile error: No classtag available while calling RDD.zip()

沈志宏
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip two datasets: 

1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: Dataset[X], n: Dataset[Y]) = {
2 val rdd = m.rdd.zip(n.rdd);
3 import spark.implicits._
4 spark.createDataset(rdd);
5 }

However, in the m.rdd.zip(…) call, compile error is reported: No ClassTag available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
        Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder and a ClassTag?

Many thanks!

Best regards,
bluejoe
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: compile error: No classtag available while calling RDD.zip()

Anastasios Zouzias
Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val schema = df.schema
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <[hidden email]> wrote:
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip two datasets: 

1       def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: Dataset[X], n: Dataset[Y]) = {
2               val rdd = m.rdd.zip(n.rdd);
3               import spark.implicits._
4               spark.createDataset(rdd);
5       }

However, in the m.rdd.zip(…) call, compile error is reported:   No ClassTag available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1       def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
        Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder and a ClassTag?

Many thanks!

Best regards,
bluejoe
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
-- Anastasios Zouzias
Reply | Threaded
Open this post in threaded view
|

Re: compile error: No classtag available while calling RDD.zip()

沈志宏
Thanks for your reply!

Actually, It is Ok when I use RDD.zip() like this:

1 def zipDatasets(m:Dataset[String], n:Dataset[Int])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


But in my project, the type of Dataset is designated by the caller, so I introduce X,Y:


1 def zipDatasets[X: Encoder, Y: Encoder](m:Dataset[X], n:Dataset[Y])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


It reports error because Y is unknown to the compiler, while the compiler needs ClassTag information of Y
Now I have no idea to fix it.

Regards,
bluejoe

发件人: Anastasios Zouzias
答复: <[hidden email]>
日期: 2017年9月14日 星期四 上午2:10
至: bluejoe
抄送: user
主题: Re: compile error: No classtag available while calling RDD.zip()

Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val schema = df.schema
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <[hidden email]> wrote:
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip two datasets: 

1       def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: Dataset[X], n: Dataset[Y]) = {
2               val rdd = m.rdd.zip(n.rdd);
3               import spark.implicits._
4               spark.createDataset(rdd);
5       }

However, in the m.rdd.zip(…) call, compile error is reported:   No ClassTag available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1       def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
        Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder and a ClassTag?

Many thanks!

Best regards,
bluejoe
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
-- Anastasios Zouzias