ReliableDeliverySupervisor: Association with remote system

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

ReliableDeliverySupervisor: Association with remote system

SamyaMaiti
This post has NOT been accepted by the mailing list yet.
Hi All,


I am new to both Scala & Spark, so please expect some mistakes.

Setup :

Scala : 2.10.2
Spark : Apache 1.1.0
Hadoop : Apache 2.4

Intend of the code : To read from kafka topic & do some processing.

Below are the code details and error am getting. :



import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext._
import scala.collection.IndexedSeq._
import org.apache.spark.streaming.dstream
import java.io.File
import java.util.Properties

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by samyamaiti on 12/25/14.
 */
object Driver {

  def main(args: Array[String]) {

    //CheckPoint dir in HDFS
    val checkpointDirectory = "hdfs://localhost:8020/user/samyamaiti/SparkCheckpoint1"


    //functionToCreateContext
    def functionToCreateContext(): StreamingContext = {
      //Setting conf object
      val conf = new SparkConf()
      conf.setMaster("spark://SamyaMac.local:7077")
      conf.setAppName("SparkStreamingFileProcessor")

      val ssc = new StreamingContext(conf, Seconds(1))

      //Create Check pointing
      ssc.checkpoint(checkpointDirectory)
      ssc
    }

    // Get StreamingContext from checkpoint data or create a new one
    val sscContext = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

    //Accumulator to keep track of number of messages
    val numInputMessages = sscContext.sparkContext.accumulator(0L, "Kafka messages consumed")

    //Number of consumer threads Input DStream
    val consumerThreadsPerInputDStream = 1

    //Setting the topic
    val topics = Map("testTopic" -> consumerThreadsPerInputDStream)

    //Zookeeper Qurom address
    val zkQurom = "http://localhost:2181"

    //Setting up the DStream
    val kafkaDStreams = {

      val numPartitionsOfInputTopic = 1
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createStream(sscContext, zkQurom, kafkaParams, topics).map(_._2)
      }

      val unifiedStream = sscContext.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }

    //Setting the stream processing pipeline
    //Printing the file name in HDFS as received from Kafka & saving the same to HDFS
    kafkaDStreams.map {
      case bytes => numInputMessages += 1
    }.foreachRDD(rdd => {
      println("2")
    })

    // Run the streaming job
    sscContext.start()
    sscContext.awaitTermination()

  }
}


Build.sbt
---------

name := "SparkFileProcessor"

version := "1.0"

scalaVersion := "2.10.2"


libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.0",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0",
  "org.apache.hadoop" % "hadoop-client" % "2.4.0"
)



Error
-----

14/12/25 23:55:06 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
14/12/25 23:55:06 INFO NettyBlockTransferService: Server created on 56078
14/12/25 23:55:06 INFO BlockManagerMaster: Trying to register BlockManager
14/12/25 23:55:06 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@***.***.***.***:56065] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
14/12/25 23:55:36 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
        at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
        at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
       
       
Regards,
Sam
Reply | Threaded
Open this post in threaded view
|

Re: ReliableDeliverySupervisor: Association with remote system

SamyaMaiti
This post has NOT been accepted by the mailing list yet.
Sorry for the typo.

Apache Hadoop version is 2.6.0

Regards,
Sam
Reply | Threaded
Open this post in threaded view
|

Re: ReliableDeliverySupervisor: Association with remote system

SamyaMaiti
This post has NOT been accepted by the mailing list yet.
Resolved.

I changed to Apache Hadoop 2.4.0 & Apache spark 1.2.0 combination, all works fine.

Must be because the 1.2.0 version of spark was compiled with hadoop 2.4.0