Non-deterministic behavior in spark

classic Classic list List threaded Threaded
8 messages Options
od
Reply | Threaded
Open this post in threaded view
|

Non-deterministic behavior in spark

od
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

yinxusen
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

Ognen Duzlevski-2
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

yinxusen
1. Does there any in-place operation in you code? Such as addi() for DoubleMatrix. This kind of operation will affect the original data. 

2. You could try to use Spark replay debugger, there is a assert function. Hope that helpful. http://spark-replay-debugger-overview.readthedocs.org/en/latest/


2014/1/24 Ognen Duzlevski <[hidden email]>
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

Ognen Duzlevski-2
Thanks.

This is a VERY simple example.

I have two 20 GB json files. Each line in the files has the same format.
I run: val events = filter(_split(something)(get the field)).map(field => (field, 0)) on the first file
I then run val events1 = the same filter on the second file and do map(field => (field, 1))

This ensures that events has form of (field, 0) and events1 has form of (field, 1)

I then to val ret=events.union(events1) - this will put all the fields in the same RDD

Then I do val r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0) to make sure all groups with key field have at least two elements and the first one is a zero (so, for example, an entry in this structure will have form (field, (0, 1. 1, 1....))

I then just do a simple r.count

Ognen



On Fri, Jan 24, 2014 at 1:29 PM, 尹绪森 <[hidden email]> wrote:
1. Does there any in-place operation in you code? Such as addi() for DoubleMatrix. This kind of operation will affect the original data. 

2. You could try to use Spark replay debugger, there is a assert function. Hope that helpful. http://spark-replay-debugger-overview.readthedocs.org/en/latest/


2014/1/24 Ognen Duzlevski <[hidden email]>
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

Ognen Duzlevski-2
I can confirm that there is something seriously wrong with this.

If I run the spark-shell with local[4] on the same cluster and run the same task on the same hdfs:// files I get an output like

res0: Long = 58177

If I run the spark-shell on the cluster with 15 nodes, same task I get

res0: Long = 14137

This is just crazy.

Ognen


On Fri, Jan 24, 2014 at 1:39 PM, Ognen Duzlevski <[hidden email]> wrote:
Thanks.

This is a VERY simple example.

I have two 20 GB json files. Each line in the files has the same format.
I run: val events = filter(_split(something)(get the field)).map(field => (field, 0)) on the first file
I then run val events1 = the same filter on the second file and do map(field => (field, 1))

This ensures that events has form of (field, 0) and events1 has form of (field, 1)

I then to val ret=events.union(events1) - this will put all the fields in the same RDD

Then I do val r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0) to make sure all groups with key field have at least two elements and the first one is a zero (so, for example, an entry in this structure will have form (field, (0, 1. 1, 1....))

I then just do a simple r.count

Ognen



On Fri, Jan 24, 2014 at 1:29 PM, 尹绪森 <[hidden email]> wrote:
1. Does there any in-place operation in you code? Such as addi() for DoubleMatrix. This kind of operation will affect the original data. 

2. You could try to use Spark replay debugger, there is a assert function. Hope that helpful. http://spark-replay-debugger-overview.readthedocs.org/en/latest/


2014/1/24 Ognen Duzlevski <[hidden email]>
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

yinxusen
r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0)

Why choosing `e._2(0) == 0` ? How about e._2(0) != 0 ? I am not very sure of whether goupByKey will keep the order of elements. How about sample a subset from your dataset, and log some information out, e.g. logInfo(e._2) ?


2014/1/24 Ognen Duzlevski <[hidden email]>
I can confirm that there is something seriously wrong with this.

If I run the spark-shell with local[4] on the same cluster and run the same task on the same hdfs:// files I get an output like

res0: Long = 58177

If I run the spark-shell on the cluster with 15 nodes, same task I get

res0: Long = 14137

This is just crazy.

Ognen


On Fri, Jan 24, 2014 at 1:39 PM, Ognen Duzlevski <[hidden email]> wrote:
Thanks.

This is a VERY simple example.

I have two 20 GB json files. Each line in the files has the same format.
I run: val events = filter(_split(something)(get the field)).map(field => (field, 0)) on the first file
I then run val events1 = the same filter on the second file and do map(field => (field, 1))

This ensures that events has form of (field, 0) and events1 has form of (field, 1)

I then to val ret=events.union(events1) - this will put all the fields in the same RDD

Then I do val r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0) to make sure all groups with key field have at least two elements and the first one is a zero (so, for example, an entry in this structure will have form (field, (0, 1. 1, 1....))

I then just do a simple r.count

Ognen



On Fri, Jan 24, 2014 at 1:29 PM, 尹绪森 <[hidden email]> wrote:
1. Does there any in-place operation in you code? Such as addi() for DoubleMatrix. This kind of operation will affect the original data. 

2. You could try to use Spark replay debugger, there is a assert function. Hope that helpful. http://spark-replay-debugger-overview.readthedocs.org/en/latest/


2014/1/24 Ognen Duzlevski <[hidden email]>
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China
Reply | Threaded
Open this post in threaded view
|

Re: Non-deterministic behavior in spark

Ognen Duzlevski-2
Ahhh. That explains it (not keeping order). I was counting on the order so made perfect sense to have an extra check where the first day you get a zero, the next day you get a 1 (this is retention analysis day to day).

Thank you!
Ognen


On Fri, Jan 24, 2014 at 2:44 PM, 尹绪森 <[hidden email]> wrote:
r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0)

Why choosing `e._2(0) == 0` ? How about e._2(0) != 0 ? I am not very sure of whether goupByKey will keep the order of elements. How about sample a subset from your dataset, and log some information out, e.g. logInfo(e._2) ?


2014/1/24 Ognen Duzlevski <[hidden email]>
I can confirm that there is something seriously wrong with this.

If I run the spark-shell with local[4] on the same cluster and run the same task on the same hdfs:// files I get an output like

res0: Long = 58177

If I run the spark-shell on the cluster with 15 nodes, same task I get

res0: Long = 14137

This is just crazy.

Ognen


On Fri, Jan 24, 2014 at 1:39 PM, Ognen Duzlevski <[hidden email]> wrote:
Thanks.

This is a VERY simple example.

I have two 20 GB json files. Each line in the files has the same format.
I run: val events = filter(_split(something)(get the field)).map(field => (field, 0)) on the first file
I then run val events1 = the same filter on the second file and do map(field => (field, 1))

This ensures that events has form of (field, 0) and events1 has form of (field, 1)

I then to val ret=events.union(events1) - this will put all the fields in the same RDD

Then I do val r = ret.groupByKey().filter(e => e._2.length > 1 && e._2(0)==0) to make sure all groups with key field have at least two elements and the first one is a zero (so, for example, an entry in this structure will have form (field, (0, 1. 1, 1....))

I then just do a simple r.count

Ognen



On Fri, Jan 24, 2014 at 1:29 PM, 尹绪森 <[hidden email]> wrote:
1. Does there any in-place operation in you code? Such as addi() for DoubleMatrix. This kind of operation will affect the original data. 

2. You could try to use Spark replay debugger, there is a assert function. Hope that helpful. http://spark-replay-debugger-overview.readthedocs.org/en/latest/


2014/1/24 Ognen Duzlevski <[hidden email]>
No. It is a filter that splits a line in a json file and extracts a position for it - every run is the same.

That's what bothers me about this.

Ognen


On Fri, Jan 24, 2014 at 12:40 PM, 尹绪森 <[hidden email]> wrote:
 Does there are some non-deterministic codes in filter ? Such as Random.nextInt(). If so, the program lost the idempotent feature. You should specify a seed to it.


2014/1/24 Ognen Duzlevski <[hidden email]>
Hello,

(Sorry for the sensationalist title) :)

If I run Spark on files from S3 and do basic transformation like:

textfile()
filter
groupByKey
count

I get one number (e.g. 40,000).

If I do the same on the same files from HDFS, the number spat out is completely different (VERY different - something like 13,000).

What would one do in a situation like this? How do I even go about figuring out what the problem is? This is run on a cluster of 15 instances on Amazon.

Thanks,
Ognen



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac



--
Best Regards
-----------------------------------
Xusen Yin    尹绪森
Beijing Key Laboratory of Intelligent Telecommunications Software and Multimedia
Beijing University of Posts & Telecommunications
Intel Labs China



--
"Le secret des grandes fortunes sans cause apparente est un crime oublié, parce qu'il a été proprement fait" - Honore de Balzac