How to properly execute `foreachPartition` in Spark 2.2

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How to properly execute `foreachPartition` in Spark 2.2

Liana Napalkova

Hi,


I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.


In Spark 1.6 I was doing something similar to this:


DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...


However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
extends Serializable
{

val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
.map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
.writeStream
.foreach(writer)
.start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

var producer: KafkaProducer[String,String] = _

def this(producer: KafkaProducer[String,String])
{
this()
this.producer = producer
}

override def process(row: String): Unit =
{
// ...
}

override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean = {
true
}

}


Liana Napalkova, PhD

Big Data Analytics Unit



T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Silvio Fiorito

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Liana Napalkova

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2
 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Timur Shenkao
Spark Dataset / Dataframe has foreachPartition() as well. Its implementation is much more efficient than RDD's.
There is ton of code snippets, say https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova <[hidden email]> wrote:

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2
 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Silvio Fiorito
In reply to this post by Liana Napalkova

Couldn’t you readStream from Kafka, do your transformations, map your rows from the transformed input into what you want need to send to Kafka, then writeStream to Kafka?

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.

 


From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Liana Napalkova

If there is no other way, then I will follow this recommendation.



From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:20:03
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2
 

Couldn’t you readStream from Kafka, do your transformations, map your rows from the transformed input into what you want need to send to Kafka, then writeStream to Kafka?

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.

 


From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Cody Koeninger
In reply to this post by Silvio Fiorito
You can't create a network connection to kafka on the driver and then serialize it to send it the executor.  That's likely why you're getting serialization errors.

Kafka producers are thread safe and designed for use as a singleton.

Use a lazy singleton instance of the producer on the executor, don't pass it in.

On Mon, Dec 18, 2017 at 9:20 AM, Silvio Fiorito <[hidden email]> wrote:

Couldn’t you readStream from Kafka, do your transformations, map your rows from the transformed input into what you want need to send to Kafka, then writeStream to Kafka?

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <[hidden email]>, "[hidden email]" <[hidden email]>


Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.

 


From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


Reply | Threaded
Open this post in threaded view
|

Re: How to properly execute `foreachPartition` in Spark 2.2

Liana Napalkova
In reply to this post by Timur Shenkao

Thanks, Timur.

The problem is that if I run `foreachPartitions`, then I cannot ` start` the streaming query. Or perhaps I miss something.



From: Timur Shenkao <[hidden email]>
Sent: 18 December 2017 16:11:06
To: Liana Napalkova
Cc: Silvio Fiorito; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2
 
Spark Dataset / Dataframe has foreachPartition() as well. Its implementation is much more efficient than RDD's.
There is ton of code snippets, say https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova <[hidden email]> wrote:

I need to firstly read from Kafka queue into a DataFrame. Then I should perform some transformations with the data. Finally, for each row in the DataFrame I should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito <[hidden email]>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; [hidden email]
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2
 

Why don’t you just use the Kafka sink for Spark 2.2?

 

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries

 

 

 

From: Liana Napalkova <[hidden email]>
Date: Monday, December 18, 2017 at 9:45 AM
To: "[hidden email]" <[hidden email]>
Subject: How to properly execute `foreachPartition` in Spark 2.2

 

Hi,

 

I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help.

 

In Spark 1.6 I was doing something similar to this:

 

DstreamFromKafka.foreachRDD(session => {
        session.foreachPartition { partitionOfRecords =>
          println("Setting the producer.")
          val producer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                                                    mySet.value("batchSize"),
                                                                    mySet.value("lingerMS"))
          partitionOfRecords.foreach(s => {

             //...

 

However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`.

class MyTestClass(
                            //
val inputparams: String)
 
extends Serializable
{

 
val spark = SparkSession
    .builder()
    .appName(
"TEST")
   
//.config("spark.sql.warehouse.dir", kafkaData)
   
.enableHiveSupport()
    .getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
     .format(
"kafka")
     .option(
"kafka.bootstrap.servers", "localhost:9092")
     .option(
"subscribe", "test")
     .option("startingOffsets", "latest")
     .option(
"failOnDataLoss", "true")
     .load()
     .selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes
     .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
                          .writeStream
                          .foreach(writer)
                          .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

 
var producer: KafkaProducer[String,String] = _

 
def this(producer: KafkaProducer[String,String])
  {
   
this()
   
this.producer = producer
  }

 
override def process(row: String): Unit =
  {
    // ...
  }

 
override def close(errorOrNull: Throwable): Unit = {}

 
override def open(partitionId: Long, version: Long): Boolean = {
   
true
 
}

}

 

 

Liana Napalkova, PhD

Big Data Analytics Unit


 

 

T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

[hidden email]


Carrer <a href="https://maps.google.com/?q=Cam%C3%AD&#43;Antic&#43;de&#43;Val%C3%A8ncia&#43;54&amp;entry=gmail&amp;source=g"> Camí Antic de València 54-56, Edifici A - 08005 - Barcelona 
www.eurecat.org 

Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat

 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 


DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.


 



DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: [hidden email] Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: [hidden email] Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: [hidden email]. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately.