[PySpark] How to write HFiles as an 'append' to the same directory?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[PySpark] How to write HFiles as an 'append' to the same directory?

Gautham Acharya

I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I’m getting a ‘directory already exists error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as an ‘append’, like I can do via a CSV?

 

The batch writing function looks like this:


for col_group in split_cols:
    processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe
, col_group, pandas_udf_func, group_by_args)

    hfile_writer.write_hfiles(processed_chunk
, output_path,
                             
zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY)

 

The actual function to write the Hfiles is this:


rdd.saveAsNewAPIHadoopFile(output_path
,
                          
constants.OUTPUT_FORMAT_CLASS,
                          
keyClass=constants.KEY_CLASS,
                          
valueClass=constants.VALUE_CLASS,
                           
keyConverter=constants.KEY_CONVERTER,
                          
valueConverter=constants.VALUE_CONVERTER,
                          
conf=conf)

 

The exception I’m getting:

 

Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations')
job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]
Traceback (most recent call last):
  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module>
    job_module.transform(spark, **job_args)
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)
        at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584)
        at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

 

 

Reply | Threaded
Open this post in threaded view
|

回复: [PySpark] How to write HFiles as an 'append' to the same directory?

张 帅
Maybe set spark.hadoop.validateOutputSpecs=false?


发件人: Gautham Acharya <[hidden email]>
发送时间: 2020年3月15日 3:23
收件人: [hidden email] <[hidden email]>
主题: [PySpark] How to write HFiles as an 'append' to the same directory?
 

I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I’m getting a ‘directory already exists error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as an ‘append’, like I can do via a CSV?

 

The batch writing function looks like this:


for col_group in split_cols:
    processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe
, col_group, pandas_udf_func, group_by_args)

    hfile_writer.write_hfiles(processed_chunk
, output_path,
                             
zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY)

 

The actual function to write the Hfiles is this:


rdd.saveAsNewAPIHadoopFile(output_path
,
                          
constants.OUTPUT_FORMAT_CLASS,
                          
keyClass=constants.KEY_CLASS,
                          
valueClass=constants.VALUE_CLASS,
                           
keyConverter=constants.KEY_CONVERTER,
                          
valueConverter=constants.VALUE_CONVERTER,
                          
conf=conf)

 

The exception I’m getting:

 

Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations')
job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]
Traceback (most recent call last):
  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module>
    job_module.transform(spark, **job_args)
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)
        at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584)
        at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [PySpark] How to write HFiles as an 'append' to the same directory?

Stephen Coy
In reply to this post by Gautham Acharya
I encountered a similar problem when trying to:

ds.write().save(“<a href="s3a://some-bucket/some/path/table" class="">s3a://some-bucket/some/path/table”);

which writes the content as a bunch of parquet files in the “folder” named “table”.

I am using a Flintrock cluster with the Spark 3.0 preview FWIW.

Anyway, I just used the AWS SDK to remove it (and any “subdirectories”) before kicking off the spark machinery.

I can show you how to do this in Java, but I think the Python SDK maybe significantly different.

Steve C


On 15 Mar 2020, at 6:23 am, Gautham Acharya <[hidden email]> wrote:

I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I’m getting a ‘directory already exists error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as an ‘append’, like I can do via a CSV?
 
The batch writing function looks like this:

for col_group in split_cols:
    processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe
, col_group, pandas_udf_func, group_by_args)

    hfile_writer.write_hfiles(processed_chunk
, output_path,
                              
zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY)
 
The actual function to write the Hfiles is this:

rdd.saveAsNewAPIHadoopFile(output_path
,
                          
constants.OUTPUT_FORMAT_CLASS,
                          
keyClass=constants.KEY_CLASS,
                          
valueClass=constants.VALUE_CLASS,
                           
keyConverter=constants.KEY_CONVERTER,
                          
valueConverter=constants.VALUE_CONVERTER,
                          
conf=conf)
 
The exception I’m getting:
 
Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=<a href="s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'" style="color: rgb(149, 79, 114); text-decoration: underline;" class="">s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations')
job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', '<a href="s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'" style="color: rgb(149, 79, 114); text-decoration: underline;" class="">s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]
Traceback (most recent call last):
  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module>
    job_module.transform(spark, **job_args)
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform
  File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory <a href="s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median" style="color: rgb(149, 79, 114); text-decoration: underline;" class="">s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)
        at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584)
        at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/