Persisting RDD to Redis

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Persisting RDD to Redis

Luis Ángel Vicente Sánchez
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.



build.sbt (3K) Download Attachment
TwitterWordCount.scala (1K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Persisting RDD to Redis

Ewen Cheslack-Postava
If you use anything created on the driver program within functions run on workers, it needs to be serializable, but your pool of Redis connections is not. Normally, the simple way to fix this is to use the *With methods of RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to instantiate a connection to Redis on a per-partition basis.

But your logic for outputting the data doesn't look right. cntWords.foreach's function parameter would be getting each (word,count) element, *not the whole RDD*. You probably want to share a single Redis pipeline for each RDD partition, which you can accomplish with foreachPartition. It gives you an iterator over all elements in each partition. It would look something like this:

cntWords.foreachPartition(it =>
  val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
  pool.withJedisClient { client =>
    val pipeline = client.pipeline()
    it.foreach { case (word,count) => pipeline.incrBy(word,count) }
    pipeline.sync()
  }
)

Of course you wouldn't actually need the pool in that case, but I'm not familiar with the Jedis library so I'm not sure how you'd create just a single connection.

-Ewen
February 2, 2014 at 7:18 AM
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.


Reply | Threaded
Open this post in threaded view
|

Re: Persisting RDD to Redis

Luis Ángel Vicente Sánchez
Thank Ewen! Now I understand why I was getting the error message. It seems that foreachPartition doesn't exists as part of the DStream class :-\ I will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <[hidden email]>:
If you use anything created on the driver program within functions run on workers, it needs to be serializable, but your pool of Redis connections is not. Normally, the simple way to fix this is to use the *With methods of RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to instantiate a connection to Redis on a per-partition basis.

But your logic for outputting the data doesn't look right. cntWords.foreach's function parameter would be getting each (word,count) element, *not the whole RDD*. You probably want to share a single Redis pipeline for each RDD partition, which you can accomplish with foreachPartition. It gives you an iterator over all elements in each partition. It would look something like this:

cntWords.foreachPartition(it =>

  val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
  pool.withJedisClient { client =>
    val pipeline = client.pipeline()
    it.foreach { case (word,count) => pipeline.incrBy(word,count) }
    pipeline.sync()
  }
)

Of course you wouldn't actually need the pool in that case, but I'm not familiar with the Jedis library so I'm not sure how you'd create just a single connection.

-Ewen
February 2, 2014 at 7:18 AM
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.



Reply | Threaded
Open this post in threaded view
|

Re: Persisting RDD to Redis

Ewen Cheslack-Postava
Ah, sorry, I read too quickly and missed that this was for Spark Streaming. In that case you will get RDDs from foreach, so I guess you want to use RDD.foreachPartition inside the call to DStream.foreach.

-Ewen

Luis Ángel Vicente Sánchez wrote:
Thank Ewen! Now I understand why I was getting the error message. It seems that foreachPartition doesn't exists as part of the DStream class :-\ I will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <[hidden email]>:
If you use anything created on the driver program within functions run on workers, it needs to be serializable, but your pool of Redis connections is not. Normally, the simple way to fix this is to use the *With methods of RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to instantiate a connection to Redis on a per-partition basis.

But your logic for outputting the data doesn't look right. cntWords.foreach's function parameter would be getting each (word,count) element, *not the whole RDD*. You probably want to share a single Redis pipeline for each RDD partition, which you can accomplish with foreachPartition. It gives you an iterator over all elements in each partition. It would look something like this:

cntWords.foreachPartition(it =>

  val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
  pool.withJedisClient { client =>
    val pipeline = client.pipeline()
    it.foreach { case (word,count) => pipeline.incrBy(word,count) }
    pipeline.sync()
  }
)

Of course you wouldn't actually need the pool in that case, but I'm not familiar with the Jedis library so I'm not sure how you'd create just a single connection.

-Ewen
February 2, 2014 at 7:18 AM
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.



Reply | Threaded
Open this post in threaded view
|

Re: Persisting RDD to Redis

Luis Ángel Vicente Sánchez
Yes, I found that after sending my response; the final program is:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    System.setProperty("spark.cleaner.ttl", "600")

    val (master, filters) = (args.head, args.tail)

    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).countByValue()
    val countWords = words.reduceByKey(_ + _)

    countWords.foreachRDD(rdd ⇒
      rdd.foreachPartition { iterator ⇒
        val client = new Jedis("localhost")
        val pipeline = client.pipelined()
        iterator.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
        client.quit()
      }
    )

    ssc.start()
  }
}



2014-02-02 Ewen Cheslack-Postava <[hidden email]>:
Ah, sorry, I read too quickly and missed that this was for Spark Streaming. In that case you will get RDDs from foreach, so I guess you want to use RDD.foreachPartition inside the call to DStream.foreach.

-Ewen


Luis Ángel Vicente Sánchez wrote:
Thank Ewen! Now I understand why I was getting the error message. It seems that foreachPartition doesn't exists as part of the DStream class :-\ I will check API docs to find other alternatives.




2014-02-02 Ewen Cheslack-Postava <[hidden email]>:
If you use anything created on the driver program within functions run on workers, it needs to be serializable, but your pool of Redis connections is not. Normally, the simple way to fix this is to use the *With methods of RDD (mapWith, flatMapWith, filterWith, and in this case, foreachWith) to instantiate a connection to Redis on a per-partition basis.

But your logic for outputting the data doesn't look right. cntWords.foreach's function parameter would be getting each (word,count) element, *not the whole RDD*. You probably want to share a single Redis pipeline for each RDD partition, which you can accomplish with foreachPartition. It gives you an iterator over all elements in each partition. It would look something like this:

cntWords.foreachPartition(it =>

  val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
  pool.withJedisClient { client =>
    val pipeline = client.pipeline()
    it.foreach { case (word,count) => pipeline.incrBy(word,count) }
    pipeline.sync()
  }
)

Of course you wouldn't actually need the pool in that case, but I'm not familiar with the Jedis library so I'm not sure how you'd create just a single connection.

-Ewen
February 2, 2014 at 7:18 AM
I'm trying to create a simple twitter word counter with spark-streaming and I would like to store the word counts in redis. The program looks like this:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._

object TwitterWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: TwitterWordCount <master> [filter1] [filter2] ... [filterN]")
      System.exit(1)
    }

    val (master, filters) = (args.head, args.tail)

    val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)

    val words = stream.flatMap(status ⇒ status.getText.toLowerCase.split(" ")).map(word ⇒ (word, 1l))

    val cntWords = words.reduceByKey(_ + _)

    cntWords.foreach(rdd ⇒
      pool.withJedisClient { client ⇒
        val pipeline = client.pipelined()
        rdd.foreach {
          case (word, count) ⇒
            pipeline.incrBy(word, count)
        }
        pipeline.sync()
      }
    )

    ssc.start()
  }
}

Everytime I run this program, I get this error:

[error] 14/02/02 15:16:20 ERROR scheduler.JobScheduler: Error running job streaming job 1391354180000 ms.0
[error] org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: redis.clients.jedis.Pipeline
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
[error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:915)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$2.apply(DAGScheduler.scala:912)
[error] at scala.Option.foreach(Option.scala:236)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:912)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:911)
[error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[error] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:911)
[error] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
[error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
[error] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
[error] at akka.actor.ActorCell.invoke(ActorCell.scala:456)
[error] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
[error] at akka.dispatch.Mailbox.run(Mailbox.scala:219)
[error] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
[error] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[error] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[error] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[error] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have tried to not use redis pipelines and then I get the same error but related to the Jedis client.

Have anybody done something similar?

Kind regards,

Luis

P.S. I have attached my build.sbt and the scala source code to this file.