Extract value from streaming Dataframe to a variable

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Extract value from streaming Dataframe to a variable

Nick Dawes
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. 

I tried this code. 

agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. 

Any workaround for this?

Nick


Reply | Threaded
Open this post in threaded view
|

Re: Extract value from streaming Dataframe to a variable

Nick Dawes
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for verification and decision making. 

Thanks for your help. 

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <[hidden email]> wrote:
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. 

I tried this code. 

agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. 

Any workaround for this?

Nick


Reply | Threaded
Open this post in threaded view
|

Re: Extract value from streaming Dataframe to a variable

Jungtaek Lim-2
Hi,

you can try out foreachBatch to apply the batch query operation to the each output of micro-batch:

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <[hidden email]> wrote:
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for verification and decision making. 

Thanks for your help. 

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <[hidden email]> wrote:
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. 

I tried this code. 

agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. 

Any workaround for this?

Nick


Reply | Threaded
Open this post in threaded view
|

Re: Extract value from streaming Dataframe to a variable

Nick Dawes
Thanks for your reply. 

I'm using Spark 2.3.2. Looks like foreach operation is only supported for Java and Scala. Is there any alternative for Python?

On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim <[hidden email]> wrote:
Hi,

you can try out foreachBatch to apply the batch query operation to the each output of micro-batch:

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <[hidden email]> wrote:
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for verification and decision making. 

Thanks for your help. 

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <[hidden email]> wrote:
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. 

I tried this code. 

agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. 

Any workaround for this?

Nick


Reply | Threaded
Open this post in threaded view
|

Re: Extract value from streaming Dataframe to a variable

Jungtaek Lim-2
`foreachBatch` is being added in Spark 2.4.x if I understand correctly, so in any language you'll want to upgrade Spark to 2.4.x to use `foreachBatch`. PySpark is addressed as well.



On Wed, Jan 22, 2020 at 1:12 AM Nick Dawes <[hidden email]> wrote:
Thanks for your reply. 

I'm using Spark 2.3.2. Looks like foreach operation is only supported for Java and Scala. Is there any alternative for Python?

On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim <[hidden email]> wrote:
Hi,

you can try out foreachBatch to apply the batch query operation to the each output of micro-batch:

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <[hidden email]> wrote:
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for verification and decision making. 

Thanks for your help. 

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <[hidden email]> wrote:
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. 

I tried this code. 

agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. 

Any workaround for this?

Nick