Behavior of Fetching File using local cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Behavior of Fetching File using local cluster

Fabrizio Milo aka misto
Hello everyone,

I am launching a cluster using local[16] on a single machine.
In the logs I see the following entries:

14/02/16 07:39:57 INFO Executor: Fetching
http://172.16.8.90:33074/files/live_score_00001-00500 with timestamp
1392565196629
14/02/16 07:39:57 INFO Utils: Fetching
http://172.16.8.90:33074/files/live_score_00001-00500 to
/tmp/fetchFileTemp5957197159047481909.tmp
14/02/16 07:40:11 INFO Executor: Fetching
http://172.16.8.90:33074/files/live_score_00501-01000 with timestamp
1392565196077
14/02/16 07:40:11 INFO Utils: Fetching
http://172.16.8.90:33074/files/live_score_00501-01000 to
/tmp/fetchFileTemp6587513385976219356.tmp
14/02/16 07:40:53 INFO Executor: Fetching
http://172.16.8.90:33074/files/live_score_01001-01500 with timestamp
1392565195274
14/02/16 07:40:53 INFO Utils: Fetching
http://172.16.8.90:33074/files/live_score_01001-01500 to
/tmp/fetchFileTemp3395098906540533732.tmp

I was wondering why if the files are local to the machine the executor
still needs to copy them over to tmp.

Is there a way to avoid that ?

this is the code that I am using:

  val conf = new SparkConf()
      .setMaster(master)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryoserializer.buffer.mb", "512")
      .set("spark.executor.memory", "32g")
      .set("spark.local.dir", "/tmp")
      .set("spark.cores.max", "64")
      .set("spark.default.parallelism", "16")
      .set("spark.akka.timeout", "240")
      .set("spark.storage.memoryFraction", "0.8")
      .set("spark.storage.blockManagerTimeoutIntervalMs", "120000")
      .set("spark.storage.blockManagerHeartBeatMs", "60000")
      .setJars(SparkContext.jarOfClass(this.getClass))

    val sc = new SparkContext(conf)

    val invalidLineCounter = sc.accumulator(0)
    val validLineCounter = sc.accumulator(0)

    var rddList = Seq[RDD[String]]()

    for (inputFile <- new File(inputDir).listFiles) {

      val filePath = inputDir + "/" + inputFile.getName

      sc.addFile(filePath)

      rddList = rddList :+ sc.textFile(filePath)

    }

    val bigdata = sc.union(rddList)

    val dataset = bigdata.flatMap(line => {
      try {
        validLineCounter += 1
        var fields = line.split("|")
        if (fields.length > 1) {
          fields = line.split("\\^")
        }
        Some(new Record(fields))
      } catch {
        case ex: Exception => {
          println(line)
          println(line.split("|").length)
          println(line.split("\\^").length)
          println(line.split("[|\\^]").mkString("\n"))
          println("\n" + ex)
          println("\n" + ex.getStackTrace.mkString("\n") + "\n")
          validLineCounter += -1
          invalidLineCounter += 1
          None
        }
      }
    })

Thanks

Fabrizio
--
LinkedIn: http://linkedin.com/in/fmilo
Twitter: @fabmilo
Github: http://github.com/Mistobaan/
-----------------------
Simplicity, consistency, and repetition - that's how you get through.
(Jack Welch)
Perfection must be reached by degrees; she requires the slow hand of
time (Voltaire)
The best way to predict the future is to invent it (Alan Kay)
Reply | Threaded
Open this post in threaded view
|

Re: Behavior of Fetching File using local cluster

Matei Zaharia
Administrator
This is mostly done to make sure that your program will execute the same way as in distributed mode, so that you don’t get a surprise when you run it on a cluster. If you’d like to avoid this, just don’t call addFile when you’re in local mode. addFile is supposed to copy a file to the cluster in cluster mode so that your executors can see it.

Actually the way you are using addFile will probably not be a great fit in cluster mode anyway.. it will cause every file to be copied to every executor. It would be better to put the files in a shared filesystem, such as an NFS mount or HDFS, and just call textFile on them. As long as executors can see that same filesystem they’ll be able to read from there and only work with the parts of the file that they require.

Matei

On Feb 16, 2014, at 1:45 PM, Fabrizio Milo aka misto <[hidden email]> wrote:

> Hello everyone,
>
> I am launching a cluster using local[16] on a single machine.
> In the logs I see the following entries:
>
> 14/02/16 07:39:57 INFO Executor: Fetching
> http://172.16.8.90:33074/files/live_score_00001-00500 with timestamp
> 1392565196629
> 14/02/16 07:39:57 INFO Utils: Fetching
> http://172.16.8.90:33074/files/live_score_00001-00500 to
> /tmp/fetchFileTemp5957197159047481909.tmp
> 14/02/16 07:40:11 INFO Executor: Fetching
> http://172.16.8.90:33074/files/live_score_00501-01000 with timestamp
> 1392565196077
> 14/02/16 07:40:11 INFO Utils: Fetching
> http://172.16.8.90:33074/files/live_score_00501-01000 to
> /tmp/fetchFileTemp6587513385976219356.tmp
> 14/02/16 07:40:53 INFO Executor: Fetching
> http://172.16.8.90:33074/files/live_score_01001-01500 with timestamp
> 1392565195274
> 14/02/16 07:40:53 INFO Utils: Fetching
> http://172.16.8.90:33074/files/live_score_01001-01500 to
> /tmp/fetchFileTemp3395098906540533732.tmp
>
> I was wondering why if the files are local to the machine the executor
> still needs to copy them over to tmp.
>
> Is there a way to avoid that ?
>
> this is the code that I am using:
>
>  val conf = new SparkConf()
>      .setMaster(master)
>      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>      .set("spark.kryoserializer.buffer.mb", "512")
>      .set("spark.executor.memory", "32g")
>      .set("spark.local.dir", "/tmp")
>      .set("spark.cores.max", "64")
>      .set("spark.default.parallelism", "16")
>      .set("spark.akka.timeout", "240")
>      .set("spark.storage.memoryFraction", "0.8")
>      .set("spark.storage.blockManagerTimeoutIntervalMs", "120000")
>      .set("spark.storage.blockManagerHeartBeatMs", "60000")
>      .setJars(SparkContext.jarOfClass(this.getClass))
>
>    val sc = new SparkContext(conf)
>
>    val invalidLineCounter = sc.accumulator(0)
>    val validLineCounter = sc.accumulator(0)
>
>    var rddList = Seq[RDD[String]]()
>
>    for (inputFile <- new File(inputDir).listFiles) {
>
>      val filePath = inputDir + "/" + inputFile.getName
>
>      sc.addFile(filePath)
>
>      rddList = rddList :+ sc.textFile(filePath)
>
>    }
>
>    val bigdata = sc.union(rddList)
>
>    val dataset = bigdata.flatMap(line => {
>      try {
>        validLineCounter += 1
>        var fields = line.split("|")
>        if (fields.length > 1) {
>          fields = line.split("\\^")
>        }
>        Some(new Record(fields))
>      } catch {
>        case ex: Exception => {
>          println(line)
>          println(line.split("|").length)
>          println(line.split("\\^").length)
>          println(line.split("[|\\^]").mkString("\n"))
>          println("\n" + ex)
>          println("\n" + ex.getStackTrace.mkString("\n") + "\n")
>          validLineCounter += -1
>          invalidLineCounter += 1
>          None
>        }
>      }
>    })
>
> Thanks
>
> Fabrizio
> --
> LinkedIn: http://linkedin.com/in/fmilo
> Twitter: @fabmilo
> Github: http://github.com/Mistobaan/
> -----------------------
> Simplicity, consistency, and repetition - that's how you get through.
> (Jack Welch)
> Perfection must be reached by degrees; she requires the slow hand of
> time (Voltaire)
> The best way to predict the future is to invent it (Alan Kay)