Deleting columns within nested arrays/structs?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Deleting columns within nested arrays/structs?

Jeff Evans
The starting point for the code is the various answer to this StackOverflow question.  Fixing some of the issues there, I end up with the following:

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
        .flatMap(f => {
          if (colName.startsWith(s"${f.name}.")) {
            dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
              case Some(x) => Some((f.name, x))
              case None => None
            }
          } else {
            None
          }
        })
        .foldLeft(df) {
          case (df, (colName, column)) => df.withColumn(colName, column)
        }
  }

  def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              Some(array(struct(innerType.fields
                  .flatMap(f =>
                    dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                      case Some(x) => Some(x.alias(f.name))
                      case None => None
                    })
                  : _*)))
          }

        case _ => Some(col)
      }
    } else {
      Some(col)
    }
  }


Now, when I try this out on a simple nested JSON, it seems to work, in the sense that non-removed column names still exist.  However, the type of the "surviving" sibling field (i.e. the one not removed) has become wrapped in an array type.  I have spent a while stepping through the code and can't quite understand why this is happening.  Somehow, the GetArrayStructFields class is involved.

// read some nested JSON with structs/arrays

val json = """{
  "foo": "bar",
  "top": {
    "child1": 5,
    "child2": [{
      "child2First": "one",
      "child2Second": 2
    }]
  }
}""".stripMargin

val df = spark.read.option("multiLine", "true").json(Seq(json).toDS())

val resultDf = dropColumn(df, "top.child2.child2First")

resultDf.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
|       [[2]]|
+------------+
*/

// check the same from the original DataFrame

df.select("top.child2.child2Second")
/*
+------------+
|child2Second|
+------------+
|         [2]|
+------------+
*/

// check the field type for "child2Second"
resultDf.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(0).dataType.typeName
// prints array

// check the same from the original DataFrame (when it was index 1)
df.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType.typeName
// prints long


Is the code above incorrect, with regards to dropping nested fields (in this case, a field within a struct, which itself is in an array)?  Or is there some other consideration I'm missing?  Any insight is appreciated.