Reading mongoDB collection in Spark with arrays

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading mongoDB collection in Spark with arrays

Mich Talebzadeh
Hi,

I am reading MongoDB collection into spark with Scala.

In general it works.

import com.mongodb.spark._
val rdd = MongoSpark.load(sc)
val inventory = rdd.toDF
scala> inventory.printSchema
root
 |-- Audience: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- column1: string (nullable = true)
 |-- column2: string (nullable = true)
 |-- column3: string (nullable = true)
 |-- item: string (nullable = true)
 |-- qty: double (nullable = true)
 |-- size: struct (nullable = true)
 |    |-- h: double (nullable = true)
 |    |-- w: double (nullable = true)
 |    |-- uom: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

A typical collection looks like this

{
    "_id" : ObjectId("5b8b9f77b5e7ecfa90c3825a"),
    "item" : "second new item",
    "qty" : 2.0,
    "status" : "A",
    "size" : {
        "h" : 20.0,
        "w" : 51.0,
        "uom" : "cm"
    },
    "tags" : [
        "green",
        "red"
    ],
    "Audience" : [
        "Rich",
        "Powerful"
    ],
    "column1" : "final",
    "column2" : "new",
    "column3" : "something"
}

Regardless whether DF is practical for unstructured data I want to do simple filtering on data

I am trying to filter on column tags which is an array

scala> inventory.filter(col("tags").contains("red")).show
org.apache.spark.sql.AnalysisException: cannot resolve 'contains(`tags`, 'red')' due to data type mismatch: argument 1 requires string type, however, '`tags`' is of array<string> type.;;
'Filter Contains(tags#637, red)


I can try to explode the array like below

val tags = inventory.select(explode(inventory("tags"))).toDF

However, is there a way of filtering on element of an array in this case?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Reading mongoDB collection in Spark with arrays

Mich Talebzadeh
Hi,

This is a way to solve it using function (array_contains(...))

scala> inventory.where (array_contains(inventory("tags"),"blue")).show
+--------+--------------------+-------+-------+-------+--------+----+-----------------+------+------+
|Audience|                 _id|column1|column2|column3|    item| qty|             size|status|  tags|
+--------+--------------------+-------+-------+-------+--------+----+-----------------+------+------+
|    null|[5b86585f1ef86d8b...|   null|   null|   null|postcard|45.0|[10.0, 15.25, cm]|     A|[blue]|
+--------+--------------------+-------+-------+-------+--------+----+-----------------+------+------+

HTH

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 2 Sep 2018 at 11:09, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I am reading MongoDB collection into spark with Scala.

In general it works.

import com.mongodb.spark._
val rdd = MongoSpark.load(sc)
val inventory = rdd.toDF
scala> inventory.printSchema
root
 |-- Audience: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- column1: string (nullable = true)
 |-- column2: string (nullable = true)
 |-- column3: string (nullable = true)
 |-- item: string (nullable = true)
 |-- qty: double (nullable = true)
 |-- size: struct (nullable = true)
 |    |-- h: double (nullable = true)
 |    |-- w: double (nullable = true)
 |    |-- uom: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

A typical collection looks like this

{
    "_id" : ObjectId("5b8b9f77b5e7ecfa90c3825a"),
    "item" : "second new item",
    "qty" : 2.0,
    "status" : "A",
    "size" : {
        "h" : 20.0,
        "w" : 51.0,
        "uom" : "cm"
    },
    "tags" : [
        "green",
        "red"
    ],
    "Audience" : [
        "Rich",
        "Powerful"
    ],
    "column1" : "final",
    "column2" : "new",
    "column3" : "something"
}

Regardless whether DF is practical for unstructured data I want to do simple filtering on data

I am trying to filter on column tags which is an array

scala> inventory.filter(col("tags").contains("red")).show
org.apache.spark.sql.AnalysisException: cannot resolve 'contains(`tags`, 'red')' due to data type mismatch: argument 1 requires string type, however, '`tags`' is of array<string> type.;;
'Filter Contains(tags#637, red)


I can try to explode the array like below

val tags = inventory.select(explode(inventory("tags"))).toDF

However, is there a way of filtering on element of an array in this case?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.