Mulitple joins with same Dataframe throws Ambiguous reference error

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Mulitple joins with same Dataframe throws Ambiguous reference error

Nirav Patel
I am getting attribute missing error after joining dataframe 'df2' twice .

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) fid#49 missing from value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,fId#127 in operator !Join LeftOuter, (mgrId#15 = fid#49);;

!Join LeftOuter, (mgrId#15 = fid#49)

:- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS d31#109]

:  +- Join Inner, (df1Id#13 = fid#49)

:     :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15, _4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]

:     :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

:     +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49, _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]

:        +- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]

+- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127, _4#43 AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]

   +- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]


at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)

at 



As you can see "fid" is present but spark is looking for fid#49 while there is another one fid#127.

Physical Plan of original df2 is 

== Physical Plan ==

LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]


But by looking at physical plan looks like there are multiple versions of 'fid' gets generated (fid#49, fid#127). 

Here's the full code.


Code:

    val seq1 = Seq(

        (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

        (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

        (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))

    //val rdd1 = spark.sparkContext.parallelize(seq1)            

    val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")

    df1.show()

    

    val seq2 = Seq(

        (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

        (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

        (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

        (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))

        

    

    val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")

    df2.explain()

    df2.show()

    

    val join1 = df1

      .join(df2,

        df1("id") === df2("fid"))

      .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2("id").as("df2id"), df2("fid"), df2("value"))

    join1.printSchema()  

    join1.show()

    

    val join2 = join1

      .join(df2,

          join1("mgrId") === df2("fid"),

          "left")

       .select(join1("id"), join1("value"), join1("mgrId"), join1("name"), join1("df2id"), 

           join1("fid"), df2("fid").as("df2fid"))   

    join2.printSchema()      

    join2.show()  










What's New with Xactly