RDD order guarantees

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

RDD order guarantees

Ewan Higgs
Hi all,
Quick one: when reading files, are the orders of partitions guaranteed
to be preserved? I am finding some weird behaviour where I run
sortByKeys() on an RDD (which has 16 byte keys) and write it to disk. If
I open a python shell and run the following:

for part in range(29):
     print map(ord,
open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
'r').read(16))

Then each partition is in order based on the first value of each partition.

I can also call TeraValidate.validate from TeraSort and it is happy with
the results. It seems to be on loading the file that the reordering
happens. If this is expected, is there a way to ask Spark nicely to give
me the RDD in the order it was saved?

This is based on trying to fix my TeraValidate code on this branch:
https://github.com/ehiggs/spark/tree/terasort

Thanks,
Ewan

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

rxin
You are running on a local file system right? HDFS orders the file based on
names, but local file system often don't. I think that's why the difference.

We might be able to do a sort and order the partitions when we create a RDD
to make this universal though.

On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs <[hidden email]> wrote:

> Hi all,
> Quick one: when reading files, are the orders of partitions guaranteed to
> be preserved? I am finding some weird behaviour where I run sortByKeys() on
> an RDD (which has 16 byte keys) and write it to disk. If I open a python
> shell and run the following:
>
> for part in range(29):
>     print map(ord, open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
> 'r').read(16))
>
> Then each partition is in order based on the first value of each partition.
>
> I can also call TeraValidate.validate from TeraSort and it is happy with
> the results. It seems to be on loading the file that the reordering
> happens. If this is expected, is there a way to ask Spark nicely to give me
> the RDD in the order it was saved?
>
> This is based on trying to fix my TeraValidate code on this branch:
> https://github.com/ehiggs/spark/tree/terasort
>
> Thanks,
> Ewan
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

Ewan Higgs
Yes, I am running on a local file system.

Is there a bug open for this? Mingyu Kim reported the problem last April:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-reads-partitions-in-a-wrong-order-td4818.html

-Ewan

On 01/16/2015 07:41 PM, Reynold Xin wrote:

> You are running on a local file system right? HDFS orders the file
> based on names, but local file system often don't. I think that's why
> the difference.
>
> We might be able to do a sort and order the partitions when we create
> a RDD to make this universal though.
>
> On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi all,
>     Quick one: when reading files, are the orders of partitions
>     guaranteed to be preserved? I am finding some weird behaviour
>     where I run sortByKeys() on an RDD (which has 16 byte keys) and
>     write it to disk. If I open a python shell and run the following:
>
>     for part in range(29):
>         print map(ord,
>     open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
>     'r').read(16))
>
>     Then each partition is in order based on the first value of each
>     partition.
>
>     I can also call TeraValidate.validate from TeraSort and it is
>     happy with the results. It seems to be on loading the file that
>     the reordering happens. If this is expected, is there a way to ask
>     Spark nicely to give me the RDD in the order it was saved?
>
>     This is based on trying to fix my TeraValidate code on this branch:
>     https://github.com/ehiggs/spark/tree/terasort
>
>     Thanks,
>     Ewan
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: [hidden email]
>     <mailto:[hidden email]>
>     For additional commands, e-mail: [hidden email]
>     <mailto:[hidden email]>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

rxin
Hi Ewan,

Not sure if there is a JIRA ticket (there are too many that I lose track).

I chatted briefly with Aaron on this. The way we can solve it is to create
a new FileSystem implementation that overrides the listStatus method, and
then in Hadoop Conf set the fs.file.impl to that.

Shouldn't be too hard. Would you be interested in working on it?




On Fri, Jan 16, 2015 at 3:36 PM, Ewan Higgs <[hidden email]> wrote:

>  Yes, I am running on a local file system.
>
> Is there a bug open for this? Mingyu Kim reported the problem last April:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-reads-partitions-in-a-wrong-order-td4818.html
>
> -Ewan
>
>
> On 01/16/2015 07:41 PM, Reynold Xin wrote:
>
> You are running on a local file system right? HDFS orders the file based
> on names, but local file system often don't. I think that's why the
> difference.
>
>  We might be able to do a sort and order the partitions when we create a
> RDD to make this universal though.
>
> On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs <[hidden email]> wrote:
>
>> Hi all,
>> Quick one: when reading files, are the orders of partitions guaranteed to
>> be preserved? I am finding some weird behaviour where I run sortByKeys() on
>> an RDD (which has 16 byte keys) and write it to disk. If I open a python
>> shell and run the following:
>>
>> for part in range(29):
>>     print map(ord,
>> open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
>> 'r').read(16))
>>
>> Then each partition is in order based on the first value of each
>> partition.
>>
>> I can also call TeraValidate.validate from TeraSort and it is happy with
>> the results. It seems to be on loading the file that the reordering
>> happens. If this is expected, is there a way to ask Spark nicely to give me
>> the RDD in the order it was saved?
>>
>> This is based on trying to fix my TeraValidate code on this branch:
>> https://github.com/ehiggs/spark/tree/terasort
>>
>> Thanks,
>> Ewan
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

Ewan Higgs
Hi Reynold.
I'll take a look.

SPARK-5300 is open for this issue.
-Ewan

On 19/01/15 08:39, Reynold Xin wrote:

> Hi Ewan,
>
> Not sure if there is a JIRA ticket (there are too many that I lose track).
>
> I chatted briefly with Aaron on this. The way we can solve it is to
> create a new FileSystem implementation that overrides the listStatus
> method, and then in Hadoop Conf set the fs.file.impl to that.
>
> Shouldn't be too hard. Would you be interested in working on it?
>
>
>
>
> On Fri, Jan 16, 2015 at 3:36 PM, Ewan Higgs <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Yes, I am running on a local file system.
>
>     Is there a bug open for this? Mingyu Kim reported the problem last
>     April:
>     http://apache-spark-user-list.1001560.n3.nabble.com/Spark-reads-partitions-in-a-wrong-order-td4818.html
>
>     -Ewan
>
>
>     On 01/16/2015 07:41 PM, Reynold Xin wrote:
>>     You are running on a local file system right? HDFS orders the
>>     file based on names, but local file system often don't. I think
>>     that's why the difference.
>>
>>     We might be able to do a sort and order the partitions when we
>>     create a RDD to make this universal though.
>>
>>     On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs <[hidden email]
>>     <mailto:[hidden email]>> wrote:
>>
>>         Hi all,
>>         Quick one: when reading files, are the orders of partitions
>>         guaranteed to be preserved? I am finding some weird behaviour
>>         where I run sortByKeys() on an RDD (which has 16 byte keys)
>>         and write it to disk. If I open a python shell and run the
>>         following:
>>
>>         for part in range(29):
>>             print map(ord,
>>         open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
>>         'r').read(16))
>>
>>         Then each partition is in order based on the first value of
>>         each partition.
>>
>>         I can also call TeraValidate.validate from TeraSort and it is
>>         happy with the results. It seems to be on loading the file
>>         that the reordering happens. If this is expected, is there a
>>         way to ask Spark nicely to give me the RDD in the order it
>>         was saved?
>>
>>         This is based on trying to fix my TeraValidate code on this
>>         branch:
>>         https://github.com/ehiggs/spark/tree/terasort
>>
>>         Thanks,
>>         Ewan
>>
>>         ---------------------------------------------------------------------
>>         To unsubscribe, e-mail: [hidden email]
>>         <mailto:[hidden email]>
>>         For additional commands, e-mail: [hidden email]
>>         <mailto:[hidden email]>
>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

Antonin Delpeuch
Hi,

Sorry to dig out this thread but this bug is still present.

The fix proposed in this thread (creating a new FileSystem implementation
which sorts listed files) was rejected, with the suggestion that it is the
FileInputFormat's responsibility to sort the file names if preserving
partition order is desired:
https://github.com/apache/spark/pull/4204

Given that Spark RDDs are supposed to preserve the order of the collections
they represent, this would still deserve to be fixed in Spark, I think. As a
user, I expect that if I use saveAsTextFile and then load the resulting file
with sparkContext.textFile, I obtain a dataset in the same order.

Because Spark uses the FileInputFormats exposed by Hadoop, that would mean
either patching Hadoop for it to sort file names directly (which is likely
going to fail since Hadoop might not care about the ordering in general), or
create subclasses of all Hadoop formats used in Spark, adding the required
sorting to the listStatus method. This strikes me as less elegant than
implementing a new FileSystem as suggested by Reynold, though.

Another way to "fix" this would be to mention in the docs that order is not
preserved in this scenario, which could hopefully avoid bad surprises to
others (just like we already have a caveat about nondeterminism of order
after shuffles).

I would be happy to try submitting a fix for this, if there is a consensus
around the correct course of action.

Cheers,
Antonin



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

Steve Loughran-2


On Tue, 7 Apr 2020 at 15:26, Antonin Delpeuch <[hidden email]> wrote:
Hi,

Sorry to dig out this thread but this bug is still present.

The fix proposed in this thread (creating a new FileSystem implementation
which sorts listed files) was rejected, with the suggestion that it is the
FileInputFormat's responsibility to sort the file names if preserving
partition order is desired:
https://github.com/apache/spark/pull/4204

Given that Spark RDDs are supposed to preserve the order of the collections
they represent, this would still deserve to be fixed in Spark, I think. As a
user, I expect that if I use saveAsTextFile and then load the resulting file
with sparkContext.textFile, I obtain a dataset in the same order.

Because Spark uses the FileInputFormats exposed by Hadoop, that would mean
either patching Hadoop for it to sort file names directly (which is likely
going to fail since Hadoop might not care about the ordering in general),

Don't see any guarantees in Hadoop about the order of listLocatedStatus -and for the local FS you get what the OS gives you.

What isn't easy is to take an entire listing and sort it -not if it is potentially millions of entries. That issue is why the newer FS list APIs all return a RemoteIterator<>: incremental paging of values so reducing payload of single RPC messages between HDFS client & namenode (HDFS) or allowing for paged/incremental lists against object stores. You can't provide incremental pages of results *and sort those results at the same time*

Which, given they're my problem, means I wouldn't be happy with adding "sort all listings" as a new restriction on FS semantics.

 
or
create subclasses of all Hadoop formats used in Spark, adding the required
sorting to the listStatus method. This strikes me as less elegant than
implementing a new FileSystem as suggested by Reynold, though.


Again, you've got some scale issues to deal with -but as FileInputFormat builds a list it's already in trouble if you point it at a sufficiently large directory tree

Best thing to do would be to add entries to a treemap during the recursive treewalk and then serve it up ordered from there -no need to do a sort @ the end.

But: trying to subclass all Hadoop formats is itself troublesome. If you go that way: make it an optional interface. And/or talk to the mapreduce project about actually providing a base implementation

 
Another way to "fix" this would be to mention in the docs that order is not
preserved in this scenario, which could hopefully avoid bad surprises to
others (just like we already have a caveat about nondeterminism of order
after shuffles).

I would be happy to try submitting a fix for this, if there is a consensus
around the correct course of action.

Even if it's not the final desired goal, it's a correct description of the current state of the application ...
Reply | Threaded
Open this post in threaded view
|

Re: RDD order guarantees

Antonin Delpeuch
Thanks a lot for the reply Steve!

If you don't see a way to fix this in Spark itself, then I will try to
improve the docs.

Antonin

On 06/05/2020 17:19, Steve Loughran wrote:

>
>
> On Tue, 7 Apr 2020 at 15:26, Antonin Delpeuch <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>
>     Sorry to dig out this thread but this bug is still present.
>
>     The fix proposed in this thread (creating a new FileSystem
>     implementation
>     which sorts listed files) was rejected, with the suggestion that it
>     is the
>     FileInputFormat's responsibility to sort the file names if preserving
>     partition order is desired:
>     https://github.com/apache/spark/pull/4204
>
>     Given that Spark RDDs are supposed to preserve the order of the
>     collections
>     they represent, this would still deserve to be fixed in Spark, I
>     think. As a
>     user, I expect that if I use saveAsTextFile and then load the
>     resulting file
>     with sparkContext.textFile, I obtain a dataset in the same order.
>
>     Because Spark uses the FileInputFormats exposed by Hadoop, that
>     would mean
>     either patching Hadoop for it to sort file names directly (which is
>     likely
>     going to fail since Hadoop might not care about the ordering in
>     general),
>
>
> Don't see any guarantees in Hadoop about the order of listLocatedStatus
> -and for the local FS you get what the OS gives you.
>
> What isn't easy is to take an entire listing and sort it -not if it is
> potentially millions of entries. That issue is why the newer FS list
> APIs all return a RemoteIterator<>: incremental paging of values so
> reducing payload of single RPC messages between HDFS client & namenode
> (HDFS) or allowing for paged/incremental lists against object stores.
> You can't provide incremental pages of results *and sort those results
> at the same time*
>
> Which, given they're my problem, means I wouldn't be happy with adding
> "sort all listings" as a new restriction on FS semantics.
>
>  
>
>     or
>     create subclasses of all Hadoop formats used in Spark, adding the
>     required
>     sorting to the listStatus method. This strikes me as less elegant than
>     implementing a new FileSystem as suggested by Reynold, though.
>
>
> Again, you've got some scale issues to deal with -but as FileInputFormat
> builds a list it's already in trouble if you point it at a sufficiently
> large directory tree
>
> Best thing to do would be to add entries to a treemap during the
> recursive treewalk and then serve it up ordered from there -no need to
> do a sort @ the end.
>
> But: trying to subclass all Hadoop formats is itself troublesome. If you
> go that way: make it an optional interface. And/or talk to the mapreduce
> project about actually providing a base implementation
>
>  
>
>     Another way to "fix" this would be to mention in the docs that order
>     is not
>     preserved in this scenario, which could hopefully avoid bad surprises to
>     others (just like we already have a caveat about nondeterminism of order
>     after shuffles).
>
>     I would be happy to try submitting a fix for this, if there is a
>     consensus
>     around the correct course of action.
>
> Even if it's not the final desired goal, it's a correct description of
> the current state of the application ...


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]