spark 2.3 dataframe join bug

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

spark 2.3 dataframe join bug

李斌松
Hi, sparks: 
     I'm using spark2.3 and had found a bug in spark dataframe, here is my codes: 

        sc = sparkSession.sparkContext 
        tmp = sparkSession.createDataFrame(sc.parallelize([[1, 2, 3, 4], [1, 2, 5, 6], [2, 3, 4, 5], [2, 3, 5, 6]])).toDF('a', 'b', 'c', 'd') 
        tmp.createOrReplaceTempView('tdl_spark_test') 
        sparkSession.sql('cache table tdl_spark_test') 
        
        df = sparkSession.sql('select a, b from tdl_spark_test group by a, b') 
        df.printSchema() 
        
        df1 = sparkSession.sql('select a, b, collect_set(array(c)) as c from tdl_spark_test group by a, b') 
        df1 = df1.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1') 
        cond = [df.a==df1.a1, df.b==df1.b1] 
        df = df.join(df1, cond, 'inner').drop('a1', 'b1') 
        
        df2 = sparkSession.sql('select a, b, collect_set(array(d)) as d from tdl_spark_test group by a, b') 
        df2 = df2.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1') 
        cond = [df.a==df2.a1, df.b==df2.b1] 
        df = df.join(df2, cond, 'inner').drop('a1', 'b1') 
        
        df.show() 
        sparkSession.sql('uncache table tdl_spark_test') 
        

        as you can see, the above code is just create a dataframe and two child dataframe,the expected answer is that: 
       +---+---+----------+----------+ 
        |  a|  b  |         c   |         d   | 
       +---+---+----------+----------+ 
        |  2|  3  |[[5], [4]]|[[5], [6]] | 
        |  1|  2  |[[5], [3]]|[[6], [4]] | 
       +---+---+----------+----------+ 

        however,we had got the unexpected answer: 
        +---+---+----------+----------+ 
         |  a  |  b |         c   |         d  | 
        +---+---+----------+----------+ 
         |  2|  3  |[[5], [4]]|[[5], [4]] | 
         |  1|  2  |[[5], [3]]|[[5], [3]] | 
        +---+---+----------+----------+ 

         it seems that the column of the first dataframe had coverd the column of the second dataframe. 

         In addition, this error occurred as long as the following options occurred at the same time: 
         1. the first root table is cached. 
         2. the "group by" is used in child dataframe. 
         3. the "array" is used in "collect_set" in child dataframe. 
         4. the join condition is "df.a==df2.a1, df.b==df2.b1" instead of "['a', 'b']"