An open HDFS connection fails RDD.take()

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

An open HDFS connection fails RDD.take()

Mingyu Kim
Here’s a snippet of code that throws exception. I create a FileSystem object to an HDFS and tries to read a csv in the HDFS as RDD and do take().

public static void main(String[] args) throws IOException {
    Configuration conf = new Configuration(false);
    conf.set("fs.default.name", "hdfs://localhost:8020");
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
    FileSystem fileSystem = FileSystem.get(conf);
    // fileSystem.close();
 
 
    JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077", “MySpark", "/path/to/spark", new String[]{});
    JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
    System.out.println(rdd.take(300));
}

It throws the following exception.

Exception in thread "main" java.lang.IllegalStateException: Must not use direct buffers with InputStream API

at com.google.common.base.Preconditions.checkState(Preconditions.java:149)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)

at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:164)

at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)

at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:559)

at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)

at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665)

at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)

at java.io.DataInputStream.read(DataInputStream.java:100)

at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)

at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)

at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)

at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)

at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)

at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)

at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)

at scala.collection.Iterator$class.foreach(Iterator.scala:772)

at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)

at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)

at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)

at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)

at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)

at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)

at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)

at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:484)

at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)


However, if I comment back in “fileSystem.close() in the original code, take() finishes successfully.

This happens not only on my local machine. It also happens on EC2. Is this a bug in Spark or am I using spark and HDFS in a wrong way?

Thanks,
Mingyu

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: An open HDFS connection fails RDD.take()

Mingyu Kim
Pinging again… Does anyone have clue? Is this a bug on Spark?

Mingyu

From: Mingyu Kim <[hidden email]>
Reply-To: <[hidden email]>
Date: Thursday, January 9, 2014 at 4:25 PM
To: "[hidden email]" <[hidden email]>
Subject: An open HDFS connection fails RDD.take()

Here’s a snippet of code that throws exception. I create a FileSystem object to an HDFS and tries to read a csv in the HDFS as RDD and do take().

public static void main(String[] args) throws IOException {
    Configuration conf = new Configuration(false);
    conf.set("fs.default.name", "hdfs://localhost:8020");
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
    FileSystem fileSystem = FileSystem.get(conf);
    // fileSystem.close();
 
 
    JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077", “MySpark", "/path/to/spark", new String[]{});
    JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
    System.out.println(rdd.take(300));
}

It throws the following exception.

Exception in thread "main" java.lang.IllegalStateException: Must not use direct buffers with InputStream API

at com.google.common.base.Preconditions.checkState(Preconditions.java:149)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)

at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)

at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:164)

at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)

at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:559)

at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)

at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665)

at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)

at java.io.DataInputStream.read(DataInputStream.java:100)

at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)

at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)

at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)

at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)

at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)

at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)

at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)

at scala.collection.Iterator$class.foreach(Iterator.scala:772)

at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)

at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)

at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)

at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)

at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)

at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)

at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)

at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:484)

at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)


However, if I comment back in “fileSystem.close() in the original code, take() finishes successfully.

This happens not only on my local machine. It also happens on EC2. Is this a bug in Spark or am I using spark and HDFS in a wrong way?

Thanks,
Mingyu

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: An open HDFS connection fails RDD.take()

Matei Zaharia
Administrator
Why are you creating the FileSystem object? You should be able to just pass a full hdfs:// URL to textFile.

It might be that HDFS initialization is not thread-safe, or you can’t have two connections to the same filesystem somehow.

Matei

On Jan 14, 2014, at 11:54 AM, Mingyu Kim <[hidden email]> wrote:

Pinging again… Does anyone have clue? Is this a bug on Spark?

Mingyu

From: Mingyu Kim <[hidden email]>
Reply-To: <[hidden email]>
Date: Thursday, January 9, 2014 at 4:25 PM
To: "[hidden email]" <[hidden email]>
Subject: An open HDFS connection fails RDD.take()

Here’s a snippet of code that throws exception. I create a FileSystem object to an HDFS and tries to read a csv in the HDFS as RDD and do take().

public static void main(String[] args) throws IOException {
    Configuration conf = new Configuration(false);
    conf.set("fs.default.name", "<a href="hdfs://localhost:8020">hdfs://localhost:8020");
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
    FileSystem fileSystem = FileSystem.get(conf);
    // fileSystem.close();
 
 
    JavaSparkContext sc = new JavaSparkContext("<a href="spark://localhost:7077">spark://localhost:7077", “MySpark", "/path/to/spark", new String[]{});
    JavaRDD<String> rdd = sc.textFile("<a href="hdfs://localhost:8020/path/to/csv">hdfs://localhost:8020/path/to/csv");
    System.out.println(rdd.take(300));
}

It throws the following exception.

Exception in thread "main" java.lang.IllegalStateException: Must not use direct buffers with InputStream API
at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:164)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:559)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:484)
at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)

However, if I comment back in “fileSystem.close() in the original code, take() finishes successfully.

This happens not only on my local machine. It also happens on EC2. Is this a bug in Spark or am I using spark and HDFS in a wrong way?

Thanks,
Mingyu

Reply | Threaded
Open this post in threaded view
|

Re: An open HDFS connection fails RDD.take()

Mingyu Kim
Apparently, I was creating the hadoop Configuration object with “loadDefaults = false” and the problem went away when I initialize it with “loadDefaults = true”, which is how Spark is creating the Configuration objects as well. I don’t understand exactly why that’s the case, though.

Mingyu

From: Matei Zaharia <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, January 14, 2014 at 12:20 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: An open HDFS connection fails RDD.take()

Why are you creating the FileSystem object? You should be able to just pass a full hdfs:// URL to textFile.

It might be that HDFS initialization is not thread-safe, or you can’t have two connections to the same filesystem somehow.

Matei

On Jan 14, 2014, at 11:54 AM, Mingyu Kim <[hidden email]> wrote:

Pinging again… Does anyone have clue? Is this a bug on Spark?

Mingyu

From: Mingyu Kim <[hidden email]>
Reply-To: <[hidden email]>
Date: Thursday, January 9, 2014 at 4:25 PM
To: "[hidden email]" <[hidden email]>
Subject: An open HDFS connection fails RDD.take()

Here’s a snippet of code that throws exception. I create a FileSystem object to an HDFS and tries to read a csv in the HDFS as RDD and do take().

public static void main(String[] args) throws IOException {
    Configuration conf = new Configuration(false);
    conf.set("fs.default.name", "<a href="hdfs://localhost:8020">hdfs://localhost:8020");
    conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
    FileSystem fileSystem = FileSystem.get(conf);
    // fileSystem.close();
 
 
    JavaSparkContext sc = new JavaSparkContext("<a href="spark://localhost:7077">spark://localhost:7077", “MySpark", "/path/to/spark", new String[]{});
    JavaRDD<String> rdd = sc.textFile("<a href="hdfs://localhost:8020/path/to/csv">hdfs://localhost:8020/path/to/csv");
    System.out.println(rdd.take(300));
}

It throws the following exception.

Exception in thread "main" java.lang.IllegalStateException: Must not use direct buffers with InputStream API
at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:164)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:559)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:484)
at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)

However, if I comment back in “fileSystem.close() in the original code, take() finishes successfully.

This happens not only on my local machine. It also happens on EC2. Is this a bug in Spark or am I using spark and HDFS in a wrong way?

Thanks,
Mingyu


smime.p7s (7K) Download Attachment