Reading configuration file in Spark Scala throws error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading configuration file in Spark Scala throws error

Mich Talebzadeh
Hi,

I have a config file application.conf that I am trying to read.

The skeleton code is as follows:

```
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters
  def main(args: Array[String]): Unit = {
    val globalConfig = ConfigFactory.load()  // pass in filename (without extension) to load additional config file in src/main/resources or CLASSPATH
    val conf       = globalConfig.getConfig("database")  // extract out top level key from top level namespace
    conf.entrySet().iterator().forEachRemaining { entry =>
      val key:    String = entry.getKey
      val value:  Any    = entry.getValue.unwrapped()  // access via entry
      val value2: Any    = conf.getAnyRef(key)         // access via hash lookup from config
      println( s"$key : $value | $value2" )              // string interpolation
    }
  }

```

But I am getting the following error

```
[info] Compiling 1 Scala source to /data6/hduser/scala/testconf/target/scala-2.11/classes...
[error] /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10: missing parameter type
[error]     conf.entrySet().iterator().forEachRemaining { entry =>
[error]                                                   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

```
The application.conf has the following layout

database = {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092"
}

I appreciate any hint

Thanks,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Reading configuration file in Spark Scala throws error

Chris Teoh
This seems to work-

val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] {

    override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {

      println(a.getKey)

    }

  }

 

conf.entrySet.iterator.forEachRemaining (printEntry)

 

// returns

scala> conf.entrySet.iterator.forEachRemaining (printEntry)

dbUsername

dbPassword

bootstrapServers

dbDatabase


I hope that helps.


On Sun, 4 Aug 2019 at 05:29, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have a config file application.conf that I am trying to read.

The skeleton code is as follows:

```
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters
  def main(args: Array[String]): Unit = {
    val globalConfig = ConfigFactory.load()  // pass in filename (without extension) to load additional config file in src/main/resources or CLASSPATH
    val conf       = globalConfig.getConfig("database")  // extract out top level key from top level namespace
    conf.entrySet().iterator().forEachRemaining { entry =>
      val key:    String = entry.getKey
      val value:  Any    = entry.getValue.unwrapped()  // access via entry
      val value2: Any    = conf.getAnyRef(key)         // access via hash lookup from config
      println( s"$key : $value | $value2" )              // string interpolation
    }
  }

```

But I am getting the following error

```
[info] Compiling 1 Scala source to /data6/hduser/scala/testconf/target/scala-2.11/classes...
[error] /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10: missing parameter type
[error]     conf.entrySet().iterator().forEachRemaining { entry =>
[error]                                                   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

```
The application.conf has the following layout

database = {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092"
}

I appreciate any hint

Thanks,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Chris
Reply | Threaded
Open this post in threaded view
|

Re: Reading configuration file in Spark Scala throws error

Mich Talebzadeh
Many thanks Chris.

In my Spark streaming I would like to use the config file to read the parameters in. Taking your example, I have

       val globalConfig = ConfigFactory.load()
    val conf       = globalConfig.getConfig(sparkAppName)  // extract out top level key from top level namespace
   val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] {
    override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {
      val key = a.getKey
      val value = a.getValue.unwrapped
      //val value2 = conf.getAnyRef(a.getKey)
       println( s"$key = $value")
    }
  }
conf.entrySet.iterator.forEachRemaining (printEntry)


And my config file for spark is like below

 cat md_AerospikeAerospike.conf
md_AerospikeAerospike {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092, rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094"
  schemaRegistryURL = "http://rhes75:8081"
  zookeeperConnect = "rhes75:2181, rhes564:2181, rhes76:2181"
  zookeeperConnectionTimeoutMs = "10000"
  rebalanceBackoffMS = "15000"
  zookeeperSessionTimeOutMs = "15000"
  autoCommitIntervalMS = "12000"
  topicsValue = "md"
  memorySet = "F"
  enableHiveSupport = null
  enableHiveSupportValue = "true"
  sparkStreamingReceiverMaxRateValue = "0"
  checkpointdir = "/checkpoint"
  mongodbHost = "rhes75"
  mongodbPort = "60100"
  zookeeperHost = "rhes75"
  zooKeeperClientPort = "2181"
  batchInterval = 2
  tickerWatch = "VOD"
  priceWatch = 300.0
  op_type = 1
  currency = "GBP"
  tickerType = "short"
  tickerClass = "asset"
  tickerStatus = "valid"
}

So I want them to be imported into Spark program.

Using the above println( s"$key = $value"), I get

zookeeperHost = rhes75
zookeeperSessionTimeOutMs = 15000
memorySet = F
topicsValue = md
currency = GBP
rebalanceBackoffMS = 15000
tickerStatus = valid
enableHiveSupportValue = true
autoCommitIntervalMS = 12000
mongodbPort = 60100
bootstrapServers = rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092, rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094
zookeeperConnect = rhes75:2181, rhes564:2181, rhes76:2181
zookeeperConnectionTimeoutMs = 10000
dbUsername = trading_user_RW
dbPassword = mongodb
tickerWatch = VOD
tickerClass = asset
checkpointdir = /checkpoint
mongodbHost = rhes75
schemaRegistryURL = http://rhes75:8081
tickerType = short
zooKeeperClientPort = 2181
priceWatch = 300
batchInterval = 2
op_type = 1
sparkStreamingReceiverMaxRateValue = 0
dbDatabase = trading

Two things please. They are read in in a different order and secondly the String values are not quoted. like currency = GBP as opposed to currency = "GBP"

What would be the easiest way of resolving the above please

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Aug 2019 at 01:55, Chris Teoh <[hidden email]> wrote:
This seems to work-

val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] {

    override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {

      println(a.getKey)

    }

  }

 

conf.entrySet.iterator.forEachRemaining (printEntry)

 

// returns

scala> conf.entrySet.iterator.forEachRemaining (printEntry)

dbUsername

dbPassword

bootstrapServers

dbDatabase


I hope that helps.


On Sun, 4 Aug 2019 at 05:29, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have a config file application.conf that I am trying to read.

The skeleton code is as follows:

```
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters
  def main(args: Array[String]): Unit = {
    val globalConfig = ConfigFactory.load()  // pass in filename (without extension) to load additional config file in src/main/resources or CLASSPATH
    val conf       = globalConfig.getConfig("database")  // extract out top level key from top level namespace
    conf.entrySet().iterator().forEachRemaining { entry =>
      val key:    String = entry.getKey
      val value:  Any    = entry.getValue.unwrapped()  // access via entry
      val value2: Any    = conf.getAnyRef(key)         // access via hash lookup from config
      println( s"$key : $value | $value2" )              // string interpolation
    }
  }

```

But I am getting the following error

```
[info] Compiling 1 Scala source to /data6/hduser/scala/testconf/target/scala-2.11/classes...
[error] /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10: missing parameter type
[error]     conf.entrySet().iterator().forEachRemaining { entry =>
[error]                                                   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

```
The application.conf has the following layout

database = {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092"
}

I appreciate any hint

Thanks,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Chris
Reply | Threaded
Open this post in threaded view
|

Re: Reading configuration file in Spark Scala throws error

Mich Talebzadeh
OK it turned up pretty simple once I established where to read the conf file in yarn mode and get the values for different keys

  val dbHost = conf.getString("dbHost")
  val dbPort = conf.getString("dbPort")
  val dbConnection = conf.getString("dbConnection")
  val namespace = conf.getString("namespace")
  val dbSetRead = conf.getString("dbSetRead")
  val dbSetWrite = conf.getString("dbSetWrite")
  val dbPassword = conf.getString("dbPassword")
  val bootstrapServers = conf.getString("bootstrapServers")
  val schemaRegistryURL = conf.getString("schemaRegistryURL")
  val zookeeperConnect = conf.getString("zookeeperConnect")
  val zookeeperConnectionTimeoutMs = conf.getString("zookeeperConnectionTimeoutMs")
  val rebalanceBackoffMS = conf.getString("zookeeperConnectionTimeoutMs")
  val zookeeperSessionTimeOutMs = conf.getString("zookeeperSessionTimeOutMs")
  val autoCommitIntervalMS = conf.getString("autoCommitIntervalMS")
  val topicsValue =  conf.getString("topicsValue")
  val memorySet = conf.getString("memorySet")
  val enableHiveSupportValue = conf.getString("enableHiveSupportValue")
  val sparkStreamingReceiverMaxRateValue = conf.getString("sparkStreamingReceiverMaxRateValue")
  val checkpointdir = conf.getString("checkpointdir")
  val hbaseHost = conf.getString("hbaseHost")
  val zookeeperHost = conf.getString("zookeeperHost")
  val zooKeeperClientPort = conf.getString("zooKeeperClientPort")
  val batchInterval = conf.getInt("batchInterval")
  val tickerWatch = conf.getString("tickerWatch")
  val priceWatch= conf.getDouble("priceWatch")
  val op_type = conf.getInt("op_type")
  val currency = conf.getString("currency")
  val tickerType = conf.getString("tickerType")
  val tickerClass = conf.getString("tickerClass")
  val tickerStatus = conf.getString("tickerStatus")
  val confidenceLevel = conf.getDouble("confidenceLevel")
  var hvTicker = conf.getInt("hvTicker")



Thanks again.


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Aug 2019 at 22:32, Mich Talebzadeh <[hidden email]> wrote:
Many thanks Chris.

In my Spark streaming I would like to use the config file to read the parameters in. Taking your example, I have

       val globalConfig = ConfigFactory.load()
    val conf       = globalConfig.getConfig(sparkAppName)  // extract out top level key from top level namespace
   val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] {
    override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {
      val key = a.getKey
      val value = a.getValue.unwrapped
      //val value2 = conf.getAnyRef(a.getKey)
       println( s"$key = $value")
    }
  }
conf.entrySet.iterator.forEachRemaining (printEntry)


And my config file for spark is like below

 cat md_AerospikeAerospike.conf
md_AerospikeAerospike {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092, rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094"
  schemaRegistryURL = "http://rhes75:8081"
  zookeeperConnect = "rhes75:2181, rhes564:2181, rhes76:2181"
  zookeeperConnectionTimeoutMs = "10000"
  rebalanceBackoffMS = "15000"
  zookeeperSessionTimeOutMs = "15000"
  autoCommitIntervalMS = "12000"
  topicsValue = "md"
  memorySet = "F"
  enableHiveSupport = null
  enableHiveSupportValue = "true"
  sparkStreamingReceiverMaxRateValue = "0"
  checkpointdir = "/checkpoint"
  mongodbHost = "rhes75"
  mongodbPort = "60100"
  zookeeperHost = "rhes75"
  zooKeeperClientPort = "2181"
  batchInterval = 2
  tickerWatch = "VOD"
  priceWatch = 300.0
  op_type = 1
  currency = "GBP"
  tickerType = "short"
  tickerClass = "asset"
  tickerStatus = "valid"
}

So I want them to be imported into Spark program.

Using the above println( s"$key = $value"), I get

zookeeperHost = rhes75
zookeeperSessionTimeOutMs = 15000
memorySet = F
topicsValue = md
currency = GBP
rebalanceBackoffMS = 15000
tickerStatus = valid
enableHiveSupportValue = true
autoCommitIntervalMS = 12000
mongodbPort = 60100
bootstrapServers = rhes75:9092, rhes75:9093, rhes75:9094, rhes564:9092, rhes564:9093, rhes564:9094, rhes76:9092, rhes76:9093, rhes76:9094
zookeeperConnect = rhes75:2181, rhes564:2181, rhes76:2181
zookeeperConnectionTimeoutMs = 10000
dbUsername = trading_user_RW
dbPassword = mongodb
tickerWatch = VOD
tickerClass = asset
checkpointdir = /checkpoint
mongodbHost = rhes75
schemaRegistryURL = http://rhes75:8081
tickerType = short
zooKeeperClientPort = 2181
priceWatch = 300
batchInterval = 2
op_type = 1
sparkStreamingReceiverMaxRateValue = 0
dbDatabase = trading

Two things please. They are read in in a different order and secondly the String values are not quoted. like currency = GBP as opposed to currency = "GBP"

What would be the easiest way of resolving the above please

Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Aug 2019 at 01:55, Chris Teoh <[hidden email]> wrote:
This seems to work-

val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] {

    override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {

      println(a.getKey)

    }

  }

 

conf.entrySet.iterator.forEachRemaining (printEntry)

 

// returns

scala> conf.entrySet.iterator.forEachRemaining (printEntry)

dbUsername

dbPassword

bootstrapServers

dbDatabase


I hope that helps.


On Sun, 4 Aug 2019 at 05:29, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have a config file application.conf that I am trying to read.

The skeleton code is as follows:

```
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters
  def main(args: Array[String]): Unit = {
    val globalConfig = ConfigFactory.load()  // pass in filename (without extension) to load additional config file in src/main/resources or CLASSPATH
    val conf       = globalConfig.getConfig("database")  // extract out top level key from top level namespace
    conf.entrySet().iterator().forEachRemaining { entry =>
      val key:    String = entry.getKey
      val value:  Any    = entry.getValue.unwrapped()  // access via entry
      val value2: Any    = conf.getAnyRef(key)         // access via hash lookup from config
      println( s"$key : $value | $value2" )              // string interpolation
    }
  }

```

But I am getting the following error

```
[info] Compiling 1 Scala source to /data6/hduser/scala/testconf/target/scala-2.11/classes...
[error] /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10: missing parameter type
[error]     conf.entrySet().iterator().forEachRemaining { entry =>
[error]                                                   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

```
The application.conf has the following layout

database = {
  dbDatabase = "trading"
  dbPassword = "mongodb"
  dbUsername = "trading_user_RW"
  bootstrapServers = "rhes75:9092"
}

I appreciate any hint

Thanks,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Chris