how to split RDD by key and save to different path

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

how to split RDD by key and save to different path

诺铁
hi,


in short, I would like to separate raw data and divide by some key, for example, create date, and put the in directory named by date, so that I can easily access portion of data later. 

for now I have to extract all keys and then filter by key and save to file repeatly. are there any good way to do this?  or maybe I shouldn't do such thing? 
Reply | Threaded
Open this post in threaded view
|

Fwd: how to split RDD by key and save to different path

Fengyun RAO
1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition 
    }
}


2014-08-12 21:34 GMT+08:00 Fengyun RAO <[hidden email]>:
1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeAvro(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition 
    }
}


2014-08-11 20:42 GMT+08:00 诺铁 <[hidden email]>:

hi,


in short, I would like to separate raw data and divide by some key, for example, create date, and put the in directory named by date, so that I can easily access portion of data later. 

for now I have to extract all keys and then filter by key and save to file repeatly. are there any good way to do this?  or maybe I shouldn't do such thing? 


Reply | Threaded
Open this post in threaded view
|

Re: how to split RDD by key and save to different path

诺铁
understand, thank you
small file is a problem, I am considering process data before put them in hdfs.


On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO <[hidden email]> wrote:
1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition 
    }
}


2014-08-12 21:34 GMT+08:00 Fengyun RAO <[hidden email]>:

1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeAvro(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition 
    }
}


2014-08-11 20:42 GMT+08:00 诺铁 <[hidden email]>:

hi,


in short, I would like to separate raw data and divide by some key, for example, create date, and put the in directory named by date, so that I can easily access portion of data later. 

for now I have to extract all keys and then filter by key and save to file repeatly. are there any good way to do this?  or maybe I shouldn't do such thing?