Using Spark as an ETL tool for moving data from Hive tables to BigQuery

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Using Spark as an ETL tool for moving data from Hive tables to BigQuery

Mich Talebzadeh

Hi,

To move data from Hive to Google BigQuery, one needs to create a staging table in Hive in a storage format that can be read in BigQuery. Both AVRO and ORC file format in Hive work but the files cannot be compressed.

In addition, to handle both data types and Dounble types, best to convert these into String in the staging table.

For example I have the following Hive table (ORC compressed) 

0: jdbc:hive2://rhes75:10099/default> desc ll_18201960;
+-------------------------+------------+----------+
|        col_name         | data_type  | comment  |
+-------------------------+------------+----------+
| transactiondate         | date       |          |
| transactiontype         | string     |          |
| sortcode                | string     |          |
| accountnumber           | string     |          |
| transactiondescription  | string     |          |
| debitamount             | double     |          |
| creditamount            | double     |          |
| balance                 | double     |          |
+-------------------------+------------+----------+


Note the date and Double types.

I now create a normal ORC table in Hive as follows:

drop table if exists accounts.ll_18201960_gcp;
CREATE TABLE accounts.ll_18201960_gcp
STORED AS ORC
AS SELECT
          CAST(transactiondate AS String) AS transactiondate
        , transactiontype
        , sortcode, accountnumber
        , transactiondescription
        , CAST(debitamount AS String) AS debitamount
        , CAST(creditamount AS String) AS creditamount
        , CAST(balance AS String) as balance
FROM ll_18201960;

Note the casting o String type.

I then extract the table files from HDFS 

hdfs dfs -get /user/hive/warehouse/accounts.db/ll_18201960_gcp .

and move them to GCP bucket

gsutil rm -r gs://xxxx/accounts/ll_18201960_gcp
gsutil cp -r ll_18201960_gcp gs://xxxx/accounts/ll_18201960_gcp


I then create a BigQuery table with autodetect and load the data

bq load --autodetect --replace=true --source_format=ORC accounts.ll_18201960_gcp "gs://xxxx/accounts/ll_18201960_gcp/*"
bq query --use_legacy_sql=false "select * from accounts.ll_18201960_gcp"

So a staging table in BiqQuery is created.

I then use Spark as an etl tool to move data from the BigQuery staging table into final format, taking care of Casting etc. For Spark I create an Uber JAR file

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import java.util.Calendar
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.fs.Path
import scala.util.Random
import org.apache.spark.sql.functions._
//
//
import org.apache.log4j.Logger
import org.apache.log4j.Level
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.io.LongWritable
import org.apache.avro.generic.GenericData
import com.google.cloud.hadoop.io.bigquery.AvroBigQueryInputFormat
import com.samelamin.spark.bigquery._

object simple
{
  private var sparkAppName = "simple"
  private var sparkDefaultParllelism = null
  private var sparkDefaultParallelismValue = "12"
  private var sparkSerializer = null
  private var sparkSerializerValue = "org.apache.spark.serializer.KryoSerializer"
  private var sparkNetworkTimeOut = null
  private var sparkNetworkTimeOutValue = "3600"
  private var sparkStreamingUiRetainedBatches = null
  private var sparkStreamingUiRetainedBatchesValue = "5"
  private var sparkWorkerUiRetainedDrivers = null
  private var sparkWorkerUiRetainedDriversValue = "5"
  private var sparkWorkerUiRetainedExecutors = null
  private var sparkWorkerUiRetainedExecutorsValue = "30"
  private var sparkWorkerUiRetainedStages = null
  private var sparkWorkerUiRetainedStagesValue = "100"
  private var sparkUiRetainedJobs = null
  private var sparkUiRetainedJobsValue = "100"
  private var sparkJavaStreamingDurationsInSeconds = "10"
  private var sparkNumberOfSlaves = 14
  private var sparkRequestTopicShortName = null
  private var sparkImpressionTopicShortName = null
  private var sparkClickTopicShortName = null
  private var sparkConversionTopicShortName = null
  private var sparkNumberOfPartitions = 30
  private var sparkClusterDbIp = null
  private var clusterDbPort = null
  private var insertQuery = null
  private var insertOnDuplicateQuery = null
  private var sqlDriverName = null
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  def main(args: Array[String])
  {

   var startTimeQuery = System.currentTimeMillis
  // Create a SparkSession. No need to create SparkContext. In Spark 2.0 the same effects can be achieved through SparkSession, without explicitly creating SparkConf, SparkContext or SQLContext as they are encapsulated within the SparkSession
  val spark =  SparkSession.
               builder().
               appName(sparkAppName).
               config("spark.driver.allowMultipleContexts", "true").
               config("spark.hadoop.validateOutputSpecs", "false").
               getOrCreate()
             // change the values accordingly.
             spark.conf.set("sparkDefaultParllelism", sparkDefaultParallelismValue)
             spark.conf.set("sparkSerializer", sparkSerializerValue)
             spark.conf.set("sparkNetworkTimeOut", sparkNetworkTimeOutValue)
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
println ("\nStarted at"); spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
val HadoopConf = spark.sparkContext.hadoopConfiguration
//get and set the env variables
val bucket = HadoopConf.get("fs.gs.system.bucket")
val projectId = HadoopConf.get("fs.gs.project.id")
val jobName = "simplejob"
HadoopConf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
HadoopConf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
val inputTable = "accounts.ll_18201960_gcp"
val fullyQualifiedInputTableId = projectId+":"+inputTable
val outputTable = "test.ll_18201960"
val fullyQualifiedOutputTableId = projectId+":"+outputTable
val jsonKeyFile="xxxxxx.json"
val datasetLocation="xxxxx"
val writedisposition="WRITE_TRUNCATE"
// Set up GCP credentials
spark.sqlContext.setGcpJsonKeyFile(jsonKeyFile)
// Set up BigQuery project and bucket
spark.sqlContext.setBigQueryProjectId(projectId)
spark.sqlContext.setBigQueryGcsBucket(bucket)
spark.sqlContext.setBigQueryDatasetLocation(datasetLocation)

// Delete existing data in BigQuery table if any
println("\nDeleting data from output table " + outputTable)
var sqltext = "DELETE from " + outputTable + " WHERE True"
spark.sqlContext.runDMLQuery(sqltext)
//read data from the staging (input) Table
println("\nreading data from " + inputTable)
val df = spark.sqlContext
          .read
          .format("com.samelamin.spark.bigquery")
          .option("tableReferenceSource",fullyQualifiedInputTableId)
          .load()
println("\nInput table schema")
df.printSchema
// Create a new DF based on CAST columns
//
val df2 = df.select('transactiondate.cast("DATE").as("transactiondate"), 'transactiontype.as("transactiontype"), substring('sortcode,2,8).as("sortcode"), 'accountnumber.as("accountnumber"),
     'transactiondescription.as("transactiondescription"), 'debitamount.cast("DOUBLE").as("debitamount"), 'creditamount.cast("DOUBLE").as("creditamount"), 'balance.cast("DOUBLE").as("balance"))
println("\nModified table schema for output storage")
df2.printSchema
// Save data to a BigQuery table
println("\nsaving data to " + outputTable)
df2.saveAsBigQueryTable(fullyQualifiedOutputTableId)
println("\nreading data from " + outputTable + ", and counting rows")
// Load everything from the table and count the number of rows
val ll_18201960 = spark.sqlContext.bigQueryTable(fullyQualifiedOutputTableId)
ll_18201960.agg(count("*").as("Rows in the output table " + outputTable)).show

I have tested this and it works fine with Uber Jar file created using SBT. 

I would like to share the views and experience with other members. I am sure there may be better ways of making this work so happy to hear the views.

Regards,

Mich

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.