Spark 2.3.0 Structured Streaming Kafka Timestamp

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark 2.3.0 Structured Streaming Kafka Timestamp

Yuta Morisawa
Hi All

I'm trying to extract Kafka-timestamp from Kafka topics.

The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10
supports milli-second timestamp.

How can I get milli-second timestamp from Kafka topics?


These are websites I refer to.
 
https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
 
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html


And this is my code.
----
val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
   .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
   .as[(Long, String)]
----

Regards,
Yuta


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Spark 2.3.0 --files vs. addFile()

Marius
Hey,

i am using Spark to distribute the execution of a binary tool and to do some further calculation further down stream. I want to distribute the binary tool using either the --files or the addFile option from spark to make it available on each worker node. However although he tells my that he added the file:
2018-05-09 07:42:19 INFO  SparkContext:54 - Added file s3a://executables/blastp at s3a://executables/foo with timestamp 1525851739972
2018-05-09 07:42:20 INFO  Utils:54 - Fetching s3a://executables/foo to /tmp/spark-54931ea6-b3d6-419b-997b-a498da898b77/userFiles-5e4b66e5-de4a-4420-a641-4453b9ea2ead/fetchFileTemp3437582648265876247.tmp

However when i want to execute the tool using pipe it does not work. I currently assume that the file is only downloaded to the master node. However i am not sure if i misunderstood the concept of adding files in spark or if i did something wrong.
I am getting the path with Sparkfiles.get(). It does work but the bin is not there.

This is my call:

spark-submit \
--class de.jlu.bioinfsys.sparkBlast.run.Run \
--master $master \
--jars ${awsPath},${awsJavaSDK} \
--files s3a://database/a.a.z,s3a://database/a.a.y,s3a://database/a.a.x,s3a://executables/tool \
--conf spark.executor.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.driver.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.hadoop.fs.s3a.endpoint=https://s3.computational.bio.uni-giessen.de/ \
--conf spark.hadoop.fs.s3a.access.key=$s3Access \
--conf spark.hadoop.fs.s3a.secret.key=$s3Secret \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
${execJarPath}
I am using Spark v 2.3.0 along with scala in Standalone cluster node with three workers.

Cheers
Marius



Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.3.0 --files vs. addFile()

JayeshLalwani

This is a long standing bug in Spark. –jars and –files doesn’t work in Standalone mode

https://issues.apache.org/jira/browse/SPARK-4160

 

From: Marius <[hidden email]>
Date: Wednesday, May 9, 2018 at 3:51 AM
To: "[hidden email]" <[hidden email]>
Subject: Spark 2.3.0 --files vs. addFile()

 

Hey,

i am using Spark to distribute the execution of a binary tool and to do some further calculation further down stream. I want to distribute the binary tool using either the --files or the addFile option from spark to make it available on each worker node. However although he tells my that he added the file:
2018-05-09 07:42:19 INFO  SparkContext:54 - Added file s3a://executables/blastp at s3a://executables/foo with timestamp 1525851739972
2018-05-09 07:42:20 INFO  Utils:54 - Fetching s3a://executables/foo to /tmp/spark-54931ea6-b3d6-419b-997b-a498da898b77/userFiles-5e4b66e5-de4a-4420-a641-4453b9ea2ead/fetchFileTemp3437582648265876247.tmp

However when i want to execute the tool using pipe it does not work. I currently assume that the file is only downloaded to the master node. However i am not sure if i misunderstood the concept of adding files in spark or if i did something wrong.
I am getting the path with Sparkfiles.get(). It does work but the bin is not there.

This is my call:

spark-submit \
--class de.jlu.bioinfsys.sparkBlast.run.Run \
--master $master \
--jars ${awsPath},${awsJavaSDK} \
--files s3a://database/a.a.z,s3a://database/a.a.y,s3a://database/a.a.x,s3a://executables/tool \
--conf spark.executor.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.driver.extraClassPath=${awsPath}:${awsJavaSDK} \
--conf spark.hadoop.fs.s3a.endpoint=https://s3.computational.bio.uni-giessen.de/ \
--conf spark.hadoop.fs.s3a.access.key=$s3Access \
--conf spark.hadoop.fs.s3a.secret.key=$s3Secret \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
${execJarPath}

I am using Spark v 2.3.0 along with scala in Standalone cluster node with three workers.

Cheers
Marius






The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

Yuta Morisawa
In reply to this post by Yuta Morisawa
The problem is solved.
The actual schema of Kafka message is different from documentation.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

The documentation says the format of "timestamp" column is Long type,
but the actual format is timestamp.


The followings are my code and result to check schema.

-code
val df = spark
       .read
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .load()
       .printSchema()

-result
root
  |-- key: binary (nullable = true)
  |-- value: binary (nullable = true)
  |-- topic: string (nullable = true)
  |-- partition: integer (nullable = true)
  |-- offset: long (nullable = true)
  |-- timestamp: timestamp (nullable = true)
  |-- timestampType: integer (nullable = true)


Regards,
Yuta

On 2018/05/09 16:14, Yuta Morisawa wrote:

> Hi All
>
> I'm trying to extract Kafka-timestamp from Kafka topics.
>
> The timestamp does not contain milli-seconds information,
> but it should contain because ConsumerRecord class of Kafka 0.10
> supports milli-second timestamp.
>
> How can I get milli-second timestamp from Kafka topics?
>
>
> These are websites I refer to.
>
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html 
>
>
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html 
>
>
>
> And this is my code.
> ----
> val df = spark
>    .readStream
>    .format("kafka")
>    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>    .option("subscribe", "topic1,topic2")
>    .load()
>    .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
>    .as[(Long, String)]
> ----
>
> Regards,
> Yuta
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

Michael Armbrust
Hmm yeah that does look wrong.  Would be great if someone opened a PR to correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa <[hidden email]> wrote:
The problem is solved.
The actual schema of Kafka message is different from documentation.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

The documentation says the format of "timestamp" column is Long type,
but the actual format is timestamp.


The followings are my code and result to check schema.

-code
val df = spark
       .read
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .load()
       .printSchema()

-result
root
  |-- key: binary (nullable = true)
  |-- value: binary (nullable = true)
  |-- topic: string (nullable = true)
  |-- partition: integer (nullable = true)
  |-- offset: long (nullable = true)
  |-- timestamp: timestamp (nullable = true)
  |-- timestampType: integer (nullable = true)


Regards,
Yuta

On 2018/05/09 16:14, Yuta Morisawa wrote:
> Hi All
>
> I'm trying to extract Kafka-timestamp from Kafka topics.
>
> The timestamp does not contain milli-seconds information,
> but it should contain because ConsumerRecord class of Kafka 0.10
> supports milli-second timestamp.
>
> How can I get milli-second timestamp from Kafka topics?
>
>
> These are websites I refer to.
>
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
>
>
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
>
>
>
> And this is my code.
> ----
> val df = spark
>    .readStream
>    .format("kafka")
>    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>    .option("subscribe", "topic1,topic2")
>    .load()
>    .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
>    .as[(Long, String)]
> ----
>
> Regards,
> Yuta
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]