[DISCUSS] "latestFirst" option and metadata growing issue in File stream source

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] "latestFirst" option and metadata growing issue in File stream source

Jungtaek Lim-2
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

Jungtaek Lim-2
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

Jungtaek Lim-2
bump, is there any interest on this topic?

On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <[hidden email]> wrote:
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

vikram.agrawal
If we compare file-stream source with other streaming sources such as Kafka, the current behavior is indeed incomplete.  Starting the streaming from a custom offset/particular point of time is something that is missing. Typically filestream sources don't have auto-deletion of the older data/files. In kafka we can define the retention period. So even if we use "Earliest" we won't end up reading from the time when the Kafka topic was created. On the other hand, streaming sources can hold very old files. It's very valid use-cases to read the bulk of the old files using a batch job until a particular timestamp. And then use streaming jobs for real-time updates. 

So having support where we can specify a timestamp. and we would consider files created post that timestamp can be useful. 

Another concern which we need to consider is the listing cost. is there any way we can avoid listing the entire base directory and then filtering out the new files. if the data is organized as partitions using date, will it help to list only those partitions where new files were added? 


On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim <[hidden email]> wrote:
bump, is there any interest on this topic?

On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <[hidden email]> wrote:
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

Jungtaek Lim-2
(I'd like to keep the discussion thread focusing on the specific topic - let's initiate another discussion threads on different topics.)

Thanks for the input. I'd like to emphasize that the point in discussion is the "latestFirst" option - the rationalization starts from growing metadata log issues. I hope your input is picking option 2, but could you please make clear your input represents OK to "replace" the "latestFirst" option with "starting from timestamp"?


On Thu, Jul 30, 2020 at 4:48 PM vikram agrawal <[hidden email]> wrote:
If we compare file-stream source with other streaming sources such as Kafka, the current behavior is indeed incomplete.  Starting the streaming from a custom offset/particular point of time is something that is missing. Typically filestream sources don't have auto-deletion of the older data/files. In kafka we can define the retention period. So even if we use "Earliest" we won't end up reading from the time when the Kafka topic was created. On the other hand, streaming sources can hold very old files. It's very valid use-cases to read the bulk of the old files using a batch job until a particular timestamp. And then use streaming jobs for real-time updates. 

So having support where we can specify a timestamp. and we would consider files created post that timestamp can be useful. 

Another concern which we need to consider is the listing cost. is there any way we can avoid listing the entire base directory and then filtering out the new files. if the data is organized as partitions using date, will it help to list only those partitions where new files were added? 


On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim <[hidden email]> wrote:
bump, is there any interest on this topic?

On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <[hidden email]> wrote:
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

German Schiavon
HI Jungtaek,

I have a question, aren't both approaches compatible? 

How I see it, I think It would be interesting to have a retention period to delete old files and/or the possibility of indicating an offset (Timestamp). It would be very "similar" to how we do it with kafka.

WDYT?

On Thu, 30 Jul 2020 at 23:51, Jungtaek Lim <[hidden email]> wrote:
(I'd like to keep the discussion thread focusing on the specific topic - let's initiate another discussion threads on different topics.)

Thanks for the input. I'd like to emphasize that the point in discussion is the "latestFirst" option - the rationalization starts from growing metadata log issues. I hope your input is picking option 2, but could you please make clear your input represents OK to "replace" the "latestFirst" option with "starting from timestamp"?


On Thu, Jul 30, 2020 at 4:48 PM vikram agrawal <[hidden email]> wrote:
If we compare file-stream source with other streaming sources such as Kafka, the current behavior is indeed incomplete.  Starting the streaming from a custom offset/particular point of time is something that is missing. Typically filestream sources don't have auto-deletion of the older data/files. In kafka we can define the retention period. So even if we use "Earliest" we won't end up reading from the time when the Kafka topic was created. On the other hand, streaming sources can hold very old files. It's very valid use-cases to read the bulk of the old files using a batch job until a particular timestamp. And then use streaming jobs for real-time updates. 

So having support where we can specify a timestamp. and we would consider files created post that timestamp can be useful. 

Another concern which we need to consider is the listing cost. is there any way we can avoid listing the entire base directory and then filtering out the new files. if the data is organized as partitions using date, will it help to list only those partitions where new files were added? 


On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim <[hidden email]> wrote:
bump, is there any interest on this topic?

On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <[hidden email]> wrote:
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] "latestFirst" option and metadata growing issue in File stream source

Jungtaek Lim-2
Hi German,

option 1 isn't about "deleting" the old files, as your input directory may be accessed by multiple queries. Kafka centralizes the maintenance of input data hence possible to apply retention without problem.
option 1 is more about "hiding" the old files being read, so that end users "may" be able to delete the files once they ensure "all queries accessing the input directory" don't see the old files.

On Fri, Jul 31, 2020 at 2:57 PM German Schiavon <[hidden email]> wrote:
HI Jungtaek,

I have a question, aren't both approaches compatible? 

How I see it, I think It would be interesting to have a retention period to delete old files and/or the possibility of indicating an offset (Timestamp). It would be very "similar" to how we do it with kafka.

WDYT?

On Thu, 30 Jul 2020 at 23:51, Jungtaek Lim <[hidden email]> wrote:
(I'd like to keep the discussion thread focusing on the specific topic - let's initiate another discussion threads on different topics.)

Thanks for the input. I'd like to emphasize that the point in discussion is the "latestFirst" option - the rationalization starts from growing metadata log issues. I hope your input is picking option 2, but could you please make clear your input represents OK to "replace" the "latestFirst" option with "starting from timestamp"?


On Thu, Jul 30, 2020 at 4:48 PM vikram agrawal <[hidden email]> wrote:
If we compare file-stream source with other streaming sources such as Kafka, the current behavior is indeed incomplete.  Starting the streaming from a custom offset/particular point of time is something that is missing. Typically filestream sources don't have auto-deletion of the older data/files. In kafka we can define the retention period. So even if we use "Earliest" we won't end up reading from the time when the Kafka topic was created. On the other hand, streaming sources can hold very old files. It's very valid use-cases to read the bulk of the old files using a batch job until a particular timestamp. And then use streaming jobs for real-time updates. 

So having support where we can specify a timestamp. and we would consider files created post that timestamp can be useful. 

Another concern which we need to consider is the listing cost. is there any way we can avoid listing the entire base directory and then filtering out the new files. if the data is organized as partitions using date, will it help to list only those partitions where new files were added? 


On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim <[hidden email]> wrote:
bump, is there any interest on this topic?

On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <[hidden email]> wrote:
(Just to add rationalization, you can refer the original mail thread on dev@ list to see efforts on addressing problems in file stream source / sink - https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E)

On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <[hidden email]> wrote:
Hi devs,

As I have been going through the various issues on metadata log growing, it's not only the issue of sink, but also the issue of source.
Unlike sink metadata log which entries should be available to the readers, the source metadata log is only for the streaming query starting
from the checkpoint, hence in theory it should only memorize about minimal entries which prevent processing multiple times on the same file.

This is not applied to the file stream source, and I think it's because of the existence of the "latestFirst" option which I haven't seen from any sources. The option works as reading files in "backward" order, which means Spark can read the oldest file and latest file together in a micro-batch, which ends up having to memorize all files previously read. The option can be changed during query restart, so even if the query is started with "latestFirst" being false, it's not safe to apply the logic of minimizing entries to memorize, as the option can be changed to true and then we'll read files again.

I'm seeing two approaches here:

1) apply "retention" - unlike "maxFileAge", the option would apply to latestFirst as well. That said, if the retention is set to 7 days, the files older than 7 days would never be read in any way. With this approach we can at least get rid of entries which are older than retention. The issue is how to play nicely with existing "maxFileAge", as it also plays similar with the retention, though it's being ignored when latestFirst is turned on. (Change the semantic of "maxFileAge" vs leave it to "soft retention" and introduce another option.)

(This approach is being proposed under SPARK-17604, and PR is available - https://github.com/apache/spark/pull/28422)

2) replace "latestFirst" option with alternatives, which no longer read in "backward" order - this doesn't say we have to read all files to move forward. As we do with Kafka, start offset can be provided, ideally as a timestamp, which Spark will read from such timestamp and forward order. This doesn't cover all use cases of "latestFirst", but "latestFirst" doesn't seem to be natural with the concept of SS (think about watermark), I'd prefer to support alternatives instead of struggling with "latestFirst".

Would like to hear your opinions.

Thanks,
Jungtaek Lim (HeartSaVioR)