Reading as Parquet a directory created by Spark Structured Streaming - problems

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading as Parquet a directory created by Spark Structured Streaming - problems

Phillip Henry
Hi,

I write a stream of (String, String) tuples to HDFS partitioned by the first ("_1") member of the pair.

Everything looks great when I list the directory via "hadoop fs -ls ...".

However, when I try to read all the data as a single dataframe, I get unexpected results (see below).

I notice that if I remove the metadata directory as so:

$ hadoop fs -rmr  hdfs://---/MY_DIRECTORY/_spark_metadata

then I can load all the data in the directory as a single Parquet file as desired with:

scala> spark.read.parquet("hdfs://---/ MY_DIRECTORY/").show()
2019-01-09 08:34:45 WARN  SharedInMemoryCache:66 - Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
+--------------------+------------+
|                  _2|          _1|
+--------------------+------------+
|ba1ca2dc033440125...|201901031200|
|ba1ca2dc033440125...|201901031200|
|ba1ca2dc033440125...|201901031200|
.

But I'm not sure I can stream without _spark_metadata and it makes me nervous to delete it. Can anybody advise?

I'm using Spark 2.4.0, Hadoop 2.7.3.

Thanks!

Phillip

==================================

If I don't delete _spark_metadata then these are the errors I am getting:

scala> spark.read.parquet("hdfs://---/MY_DIRECTORY").show()
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet at . It must be specified manually;
.
.

So, explicitly adding a schema gives:

scala> spark.read.schema(StructType(Seq(StructField("_1",StringType,false), StructField("_2",StringType,true)))).parquet("hdfs://---/MY_DIRECTORY")  .show()
+---+---+
| _1| _2|
+---+---+
+---+---+

Well, that's not what I am expecting as I can see lots of data in that directory. In fact, I can do read the subdirectories of MY_DIRECTORY:

cala> spark.read.schema(StructType(Seq(StructField("_1",StringType,false), StructField("_2",StringType,true)))).parque ("hdfs://---/MY_DIRECTORY/_1=201812030900").show()
+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|

which is not quite correct as the _1 field is null, but the _2 is indeed my data.

If I try to avoid _spark_metadata by using a wild star _1=*, I get:

scala> spark.read.schema(StructType(Seq(StructField("_1",StringType,false), StructField("_2",StringType,true)))).parquet(" hdfs://---/MY_DIRECTORY/_1=*").show()
+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|

OK, that's all the data (not just a subdirectory) but _1 is always null.

Or, without the explicit schema:

scala> spark.read.parquet("hdfs://---/ MY_DIRECTORY/_1=*").show()
+--------------------+
|                  _2|
+--------------------+
|ba1ca2dc033440125...|
|ba1ca2dc033440125...|
|ba1ca2dc033440125...|

Again, all the data but no _1 field.





Reply | Threaded
Open this post in threaded view
|

Re: Reading as Parquet a directory created by Spark Structured Streaming - problems

ddebarbieux
cala> spark.read.schema(StructType(Seq(StructField("_1",StringType,false), StructField("_2",StringType,true)))).parque ("hdfs://---/MY_DIRECTORY/_1=201812030900").show()
+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|

Is the column '_1' a String or a Number?

Denis

Reply | Threaded
Open this post in threaded view
|

Re: Reading as Parquet a directory created by Spark Structured Streaming - problems

Phillip Henry
Hi, Denis.

It should be a String. Even if it looks like a number when you do hadoop fs -ls ..., it's a String representation of a date/time.

Phillip

On Thu, Jan 10, 2019 at 2:00 PM ddebarbieux <[hidden email]> wrote:
cala> spark.read.schema(StructType(Seq(StructField("_1",StringType,false), StructField("_2",StringType,true)))).parque ("hdfs://---/MY_DIRECTORY/_1=201812030900").show()
+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|
|null|ba1ca2dc033440125...|

Is the column '_1' a String or a Number?

Denis