Database insert happening two times

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Database insert happening two times

Harsh Choudhary
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

Reply | Threaded
Open this post in threaded view
|

Re: Database insert happening two times

ayan guha
Can you share your code?

On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <[hidden email]> wrote:
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Database insert happening two times

Harsh Choudhary
This is the code - 
hdfs_path=<path to a file in hdfs>
if(hdfs_path.contains(".avro")){
      data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_path)
    }else if(hdfs_path.contains(".tsv")){
      data_df = spark.read.option("delimiter","\t").option("header","true").csv(hdfs_path)
    }else if(hdfs_path.contains(".scsv")){
      data_df = spark.read.option("delimiter",";").option("header","true").csv(hdfs_path)
    }else{
      System.exit(1)
    }
    data_df = data_df.withColumn("edl_created_by", lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
    data_df.write.mode("append").parquet(dest_file)
    val status1 = AddLogToDynamo(Json.toJson(fileLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelineage.dynamodb.update.function.name"), GetAuth.getLambdaClient)

    def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: String,lambdaClient: AWSLambdaClient):String = {
      System.out.println("new metadata to be updated: "+updatedLambdaJson);
      val updatelambdaReq:InvokeRequest = new InvokeRequest();
      updatelambdaReq.setFunctionName(updateFunctionName);
      updatelambdaReq.setPayload(updatedLambdaJson.toString());
      System.out.println("Calling lambda to add log");
      val updateLambdaResult = byteBufferToString(lambdaClient.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
      return updateLambdaResult;
  }


Harsh Choudhary


On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <[hidden email]> wrote:
Can you share your code?

On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <[hidden email]> wrote:
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

--
Best Regards,
Ayan Guha

Reply | Threaded
Open this post in threaded view
|

Re: Database insert happening two times

Marco Mistroni
Hi
 Uh if the problem is really with parallel exec u can try to call repartition(1) before u save
Alternatively try to store data in a csv file and see if u have same behaviour, to exclude dynamodb issues
Also ..are the multiple rows being written dupes (they have all same fields/values)?
Hth


On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <[hidden email]> wrote:
This is the code - 
hdfs_path=<path to a file in hdfs>
if(hdfs_path.contains(".avro")){
      data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_path)
    }else if(hdfs_path.contains(".tsv")){
      data_df = spark.read.option("delimiter","\t").option("header","true").csv(hdfs_path)
    }else if(hdfs_path.contains(".scsv")){
      data_df = spark.read.option("delimiter",";").option("header","true").csv(hdfs_path)
    }else{
      System.exit(1)
    }
    data_df = data_df.withColumn("edl_created_by", lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
    data_df.write.mode("append").parquet(dest_file)
    val status1 = AddLogToDynamo(Json.toJson(fileLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelineage.dynamodb.update.function.name"), GetAuth.getLambdaClient)

    def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: String,lambdaClient: AWSLambdaClient):String = {
      System.out.println("new metadata to be updated: "+updatedLambdaJson);
      val updatelambdaReq:InvokeRequest = new InvokeRequest();
      updatelambdaReq.setFunctionName(updateFunctionName);
      updatelambdaReq.setPayload(updatedLambdaJson.toString());
      System.out.println("Calling lambda to add log");
      val updateLambdaResult = byteBufferToString(lambdaClient.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
      return updateLambdaResult;
  }


Harsh Choudhary


On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <[hidden email]> wrote:
Can you share your code?

On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <[hidden email]> wrote:
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

--
Best Regards,
Ayan Guha

Reply | Threaded
Open this post in threaded view
|

Re: Database insert happening two times

ayan guha
It should not be parallel exec as the logging code is called in driver. Have you checked if your driver is reran by cluster manager due to any failure or error situation>

On Tue, Oct 17, 2017 at 11:52 PM, Marco Mistroni <[hidden email]> wrote:
Hi
 Uh if the problem is really with parallel exec u can try to call repartition(1) before u save
Alternatively try to store data in a csv file and see if u have same behaviour, to exclude dynamodb issues
Also ..are the multiple rows being written dupes (they have all same fields/values)?
Hth


On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <[hidden email]> wrote:
This is the code - 
hdfs_path=<path to a file in hdfs>
if(hdfs_path.contains(".avro")){
      data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_path)
    }else if(hdfs_path.contains(".tsv")){
      data_df = spark.read.option("delimiter","\t").option("header","true").csv(hdfs_path)
    }else if(hdfs_path.contains(".scsv")){
      data_df = spark.read.option("delimiter",";").option("header","true").csv(hdfs_path)
    }else{
      System.exit(1)
    }
    data_df = data_df.withColumn("edl_created_by", lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
    data_df.write.mode("append").parquet(dest_file)
    val status1 = AddLogToDynamo(Json.toJson(fileLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelineage.dynamodb.update.function.name"), GetAuth.getLambdaClient)

    def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: String,lambdaClient: AWSLambdaClient):String = {
      System.out.println("new metadata to be updated: "+updatedLambdaJson);
      val updatelambdaReq:InvokeRequest = new InvokeRequest();
      updatelambdaReq.setFunctionName(updateFunctionName);
      updatelambdaReq.setPayload(updatedLambdaJson.toString());
      System.out.println("Calling lambda to add log");
      val updateLambdaResult = byteBufferToString(lambdaClient.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
      return updateLambdaResult;
  }


Harsh Choudhary


On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <[hidden email]> wrote:
Can you share your code?

On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <[hidden email]> wrote:
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

--
Best Regards,
Ayan Guha




--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Database insert happening two times

Harsh Choudhary
Hi

@Marco, the multiple rows written are not dupes as current timestamp field is different in each of them.

@Ayan I checked and found that my whole code is rerun twice. Although there seems to be no error, is it configurable to re-run by cluster manager?



On Tue, Oct 17, 2017 at 6:45 PM, ayan guha <[hidden email]> wrote:
It should not be parallel exec as the logging code is called in driver. Have you checked if your driver is reran by cluster manager due to any failure or error situation>

On Tue, Oct 17, 2017 at 11:52 PM, Marco Mistroni <[hidden email]> wrote:
Hi
 Uh if the problem is really with parallel exec u can try to call repartition(1) before u save
Alternatively try to store data in a csv file and see if u have same behaviour, to exclude dynamodb issues
Also ..are the multiple rows being written dupes (they have all same fields/values)?
Hth


On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <[hidden email]> wrote:
This is the code - 
hdfs_path=<path to a file in hdfs>
if(hdfs_path.contains(".avro")){
      data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_path)
    }else if(hdfs_path.contains(".tsv")){
      data_df = spark.read.option("delimiter","\t").option("header","true").csv(hdfs_path)
    }else if(hdfs_path.contains(".scsv")){
      data_df = spark.read.option("delimiter",";").option("header","true").csv(hdfs_path)
    }else{
      System.exit(1)
    }
    data_df = data_df.withColumn("edl_created_by", lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
    data_df.write.mode("append").parquet(dest_file)
    val status1 = AddLogToDynamo(Json.toJson(fileLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelineage.dynamodb.update.function.name"), GetAuth.getLambdaClient)

    def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: String,lambdaClient: AWSLambdaClient):String = {
      System.out.println("new metadata to be updated: "+updatedLambdaJson);
      val updatelambdaReq:InvokeRequest = new InvokeRequest();
      updatelambdaReq.setFunctionName(updateFunctionName);
      updatelambdaReq.setPayload(updatedLambdaJson.toString());
      System.out.println("Calling lambda to add log");
      val updateLambdaResult = byteBufferToString(lambdaClient.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
      return updateLambdaResult;
  }


Harsh Choudhary


On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <[hidden email]> wrote:
Can you share your code?

On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <[hidden email]> wrote:
Hi

I'm running a Spark job in which I am appending new data into Parquet file. At last, I make a log entry in my Dynamodb table stating the number of records appended, time etc. Instead of one single entry in the database, multiple entries are being made to it. Is it because of parallel execution of code in workers? If it is so then how can I solve it so that it only writes once.

Thanks!

Cheers!

Harsh Choudhary

--
Best Regards,
Ayan Guha




--
Best Regards,
Ayan Guha