Spark Streaming ElasticSearch

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming ElasticSearch

Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into elastic via Http request. 

I want to validate each request from Kafka and change the payload as per business need and write into Elastic Search. 

I have used ES Http Request to push the data into Elastic Search. Can some guide me how to write the data into ES via a data frame? 

Code Snippet: 
 val dfInput = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .option("group.id", sourceTopicGroupId)
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()

    import spark.implicits._

    val resultDf = dfInput
      .withColumn("value", $"value".cast("string"))
      .select("value")

    resultDf.writeStream.foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, version: Long): Boolean = true

      override def process(value: Row): Unit = {
        processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
      }

      override def close(errorOrNull: Throwable): Unit = {
      }
    }).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
  }

Please suggest, is there any approach.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming ElasticSearch

jainshasha
Hi Siva

To emit data into ES using spark structured streaming job you need to used
ElasticSearch jar which has support for sink for spark structured streaming
job. For this you can use this one my branch where we have integrated ES
with spark 3.0 and scala 2.12 compatible
https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0

Also in this you need to build three jars
elasticsearch-hadoop-sql
elasticsearch-hadoop-core
elasticsearch-hadoop-mr
which help in writing data into ES through spark structured streaming.

And in your application job u can use this way to sink the data, remember
with ES there is only support of append mode of structured streaming.
val esDf = aggregatedDF
        .writeStream
        .outputMode("append")
        .format("org.elasticsearch.spark.sql")
        .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
        .start("aggregation-job-index-latest-1")


Let me know if you face any issues, will be happy to help you :)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming ElasticSearch

Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before inserting it into ES. 

Thanks
Siva

On Mon, Oct 5, 2020 at 8:06 PM jainshasha <[hidden email]> wrote:
Hi Siva

To emit data into ES using spark structured streaming job you need to used
ElasticSearch jar which has support for sink for spark structured streaming
job. For this you can use this one my branch where we have integrated ES
with spark 3.0 and scala 2.12 compatible
https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0

Also in this you need to build three jars
elasticsearch-hadoop-sql
elasticsearch-hadoop-core
elasticsearch-hadoop-mr
which help in writing data into ES through spark structured streaming.

And in your application job u can use this way to sink the data, remember
with ES there is only support of append mode of structured streaming.
val esDf = aggregatedDF
        .writeStream
        .outputMode("append")
        .format("org.elasticsearch.spark.sql")
        .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
        .start("aggregation-job-index-latest-1")


Let me know if you face any issues, will be happy to help you :)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming ElasticSearch

jainshasha
Hi Siva

In that case u can use structured streaming foreach / foreachBatch function
which can help you process each record and write it into some sink




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]