Spark App Write nothing on HDFS

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark App Write nothing on HDFS

Soheil Pourbafrani
Hi, I submit an app on Spark2 cluster using standalone scheduler on client mode.
The app saves the results of the processing on the HDFS. There is no error on output logs and the app finished successfully.
But the problem is it just create _SUCSSES and empty part-00000 file on the saving directory! I checked the app logic on local mode and it works correctly.

Here are the related parts of the logs:

18/12/17 20:57:10 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 78.0 (TID 350) in 13 ms on localhost (executor driver) (8/9)
18/12/17 20:57:10 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 9 blocks
18/12/17 20:57:10 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/12/17 20:57:10 INFO executor.Executor: Finished task 8.0 in stage 78.0 (TID 357). 1779 bytes result sent to driver
18/12/17 20:57:10 INFO scheduler.TaskSetManager: Finished task 8.0 in stage 78.0 (TID 357) in 6 ms on localhost (executor driver) (9/9)
18/12/17 20:57:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 78.0, whose tasks have all completed, from pool
18/12/17 20:57:10 INFO scheduler.DAGScheduler: ShuffleMapStage 78 (sortBy at SparkSum.scala:260) finished in 0.016 s
18/12/17 20:57:10 INFO scheduler.DAGScheduler: looking for newly runnable stages
18/12/17 20:57:10 INFO scheduler.DAGScheduler: running: Set()
18/12/17 20:57:10 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 79)
18/12/17 20:57:10 INFO scheduler.DAGScheduler: failed: Set()
18/12/17 20:57:10 INFO scheduler.DAGScheduler: Submitting ResultStage 79 (MapPartitionsRDD[240] at saveAsTextFile at SparkSum.scala:261), which has no missing parents
18/12/17 20:57:10 INFO memory.MemoryStore: Block broadcast_46 stored as values in memory (estimated size 73.0 KB, free 1987.9 MB)
18/12/17 20:57:10 INFO memory.MemoryStore: Block broadcast_46_piece0 stored as bytes in memory (estimated size 26.6 KB, free 1987.9 MB)
18/12/17 20:57:10 INFO storage.BlockManagerInfo: Added broadcast_46_piece0 in memory on 172.16.20.4:40007 (size: 26.6 KB, free: 2004.4 MB)
18/12/17 20:57:10 INFO spark.SparkContext: Created broadcast 46 from broadcast at DAGScheduler.scala:996
18/12/17 20:57:10 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 79 (MapPartitionsRDD[240] at saveAsTextFile at SparkSum.scala:261)
18/12/17 20:57:10 INFO scheduler.TaskSchedulerImpl: Adding task set 79.0 with 1 tasks
18/12/17 20:57:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 79.0 (TID 358, localhost, executor driver, partition 0, PROCESS_LOCAL, 5812 bytes)
18/12/17 20:57:10 INFO executor.Executor: Running task 0.0 in stage 79.0 (TID 358)
18/12/17 20:57:10 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 9 blocks
18/12/17 20:57:10 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/12/17 20:57:10 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
18/12/17 20:57:10 INFO output.FileOutputCommitter: Saved output of task 'attempt_20181217205709_0079_m_000000_358' to hdfs://master:9000/home/hduser/soheil/res/_temporary/0/task_20181217205709_0079_m_000000
18/12/17 20:57:10 INFO mapred.SparkHadoopMapRedUtil: attempt_20181217205709_0079_m_000000_358: Committed
18/12/17 20:57:10 INFO executor.Executor: Finished task 0.0 in stage 79.0 (TID 358). 1722 bytes result sent to driver
18/12/17 20:57:10 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 79.0 (TID 358) in 75 ms on localhost (executor driver) (1/1)
18/12/17 20:57:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 79.0, whose tasks have all completed, from pool
18/12/17 20:57:10 INFO scheduler.DAGScheduler: ResultStage 79 (saveAsTextFile at SparkSum.scala:261) finished in 0.076 s
18/12/17 20:57:10 INFO scheduler.DAGScheduler: Job 5 finished: saveAsTextFile at SparkSum.scala:261, took 0.150206 s
Stopping ...
Elapsed Time: 218 Secs

18/12/17 20:57:10 INFO spark.SparkContext: Invoking stop() from shutdown hook
18/12/17 20:57:10 INFO server.ServerConnector: Stopped ServerConnector@5cc5b667{HTTP/1.1}{0.0.0.0:4040}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5ae76500{/stages/stage/kill,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2fd1731c{/jobs/job/kill,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5ae81e1{/api,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@59fc684e{/,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@46c670a6{/static,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@782a4fff{/executors/threadDump/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@30ed9c6c{/executors/threadDump,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@53499d85{/executors/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@64712be{/executors,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@759fad4{/environment/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6ea1bcdc{/environment,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@26be6ca7{/storage/rdd/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@32057e6{/storage/rdd,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@62417a16{/storage/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1b0a7baf{/storage,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@70fab835{/stages/pool/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@22c86919{/stages/pool,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@72b16078{/stages/stage/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@21694e53{/stages/stage,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@7caa550{/stages/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@3b9d6699{/stages,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@47428937{/jobs/job/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2c715e84{/jobs/job,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@d78795{/jobs/json,null,UNAVAILABLE}
18/12/17 20:57:10 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@46074492{/jobs,null,UNAVAILABLE}
18/12/17 20:57:10 INFO ui.SparkUI: Stopped Spark web UI at http://master:4040
18/12/17 20:57:10 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/12/17 20:57:10 INFO memory.MemoryStore: MemoryStore cleared
18/12/17 20:57:10 INFO storage.BlockManager: BlockManager stopped
18/12/17 20:57:10 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
18/12/17 20:57:10 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/12/17 20:57:10 INFO spark.SparkContext: Successfully stopped SparkContext
18/12/17 20:57:10 INFO util.ShutdownHookManager: Shutdown hook called
18/12/17 20:57:10 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-811c8c87-54f7-45ea-8953-9c29f6438c5e

and here is the code of the writing part:

for (_ <- 1 to 30) {
      val contribs = links.join(ranks)       
        .values                      
        .flatMap { case (ids, rank) =>  
        val size = ids.size
        ids.map(id => (id, rank / size))  
      }

      ranks = contribs.reduceByKey(_ + _).mapValues( 0.15 + 0.85 * _)
    }

      ranks
        .mapValues(truncateAt(_, 5))
        .sortBy(row => row._2)
        .saveAsTextFile(hdfs://master:9000/res)
    }

What can be the problem?