DataSourceV2 sync tomorrow

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSourceV2 sync tomorrow

Ryan Blue

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Cody Koeninger-2
Am I the only one for whom the livestream link didn't work last time?
Would like to be able to at least watch the discussion this time
around.
On Tue, Nov 13, 2018 at 6:01 PM Ryan Blue <[hidden email]> wrote:

>
> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
> Read API for v2 - see Wenchen’s doc
> Capabilities API - see the dev list thread
> Using CatalogTableIdentifier to reliably separate v2 code paths - see PR #21978
> A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Arun Mahadevan
In reply to this post by Ryan Blue
IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed (so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a micro-batch.

And if the micro-batch could be executed without knowing the 'latest' offset (say until 'next' returns false), we only need the current offset (to figure out the offset boundaries of a micro-batch) and may be then the 'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue <[hidden email]> wrote:

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Jamison Bennett
Hi Spark Team,

I am interested in joining this meeting because I am interested in the data source v2 APIs. I couldn't find information about this meeting, so could someone please share the link?

Thanks,

Jamison Bennett

Cloudera Software Engineer

[hidden email]

515 Congress Ave, Suite 1212   |   Austin, TX   |   78701



On Wed, Nov 14, 2018 at 1:51 AM Arun Mahadevan <[hidden email]> wrote:
IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed (so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a micro-batch.

And if the micro-batch could be executed without knowing the 'latest' offset (say until 'next' returns false), we only need the current offset (to figure out the offset boundaries of a micro-batch) and may be then the 'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue <[hidden email]> wrote:

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Ryan Blue
Jamison, I've added you to the invite. If anyone else wants to be invited, please send me a request. You can send it directly to me to avoid too many messages on this thread.

On Wed, Nov 14, 2018 at 8:57 AM Jamison Bennett <[hidden email]> wrote:
Hi Spark Team,

I am interested in joining this meeting because I am interested in the data source v2 APIs. I couldn't find information about this meeting, so could someone please share the link?

Thanks,

Jamison Bennett

Cloudera Software Engineer

[hidden email]

515 Congress Ave, Suite 1212   |   Austin, TX   |   78701



On Wed, Nov 14, 2018 at 1:51 AM Arun Mahadevan <[hidden email]> wrote:
IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed (so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a micro-batch.

And if the micro-batch could be executed without knowing the 'latest' offset (say until 'next' returns false), we only need the current offset (to figure out the offset boundaries of a micro-batch) and may be then the 'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue <[hidden email]> wrote:

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Ryan Blue
In reply to this post by Ryan Blue

Some people said that it didn't work last time. I'm not sure why that would happen, but I don't use these much so I'm no expert. If you can't join the live stream, then feel free to join the meet up.

I'll also plan on joining earlier than I did last time, in case we the meet/hangout needs to be up for people to view the live stream.

rb

On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 sync tomorrow

Ryan Blue
In reply to this post by Ryan Blue

Below are my notes from the sync yesterday. Thanks to everyone that participated!
For the format of this sync, I think it would help to make a small change. Since we have so many people and it take a long time to introduce everyone, let’s try to get to the content faster by not doing the round of introductions and topic gathering. Instead, please send your topics to the sync thread on this list ahead of time. Just make sure I have them and I’ll add them to the invite and agenda, along with any links for background.

I also want to add a quick note about the live stream. After running a couple of tests, it looks like live streams only work within an organization. In the future, I won’t add a live stream since no one but people from Netflix can join.

Last, here are the notes:

Attendees

Ryan Blue - Netflix
John Zhuge - Netflix
Yuanjian Li - Baidu - Interested in Catalog API
Felix Cheung - Uber
Hyukjin Kwon - Hortonworks
Vinoo Ganesh - Palantir
Soumya Sanyal - ?
Bruce Robbins - Cloudera
Alessandro Bellina - Oath, here to learn
Jamison Bennett - Cloudera - Interested in Catalog API
Anton Okolnychyi - Apple
Gengliang Wang - DataBricks - ORC source
Wenchen Fan - DataBricks
Dilip Biswal - IBM - Push-down of new operators like limit
Kevin Yu - IBM
Matt Cheah - Palantir - Interested in CBO
Austin Nobis - Cloudera
Jungtaek Lim - Hortonworks - Interested in exactly-once semantics
Vikram Agrawal - Custom metrics
Sribasti Chakravarti

Suggested Topics

  • DSv2 API changes
    • New proposal
    • Alternative #1: Combining Scan with Batch or Stream
    • Alternative #2: Combining micro-batch and continuous APIs
  • Capabilities API
  • CatalogTableIdentifier
  • Push-down API
  • CBO and stats API
  • Exactly-once semantics

Discussion

The entire discussion was about the DSv2 API changes in Wenchen’s design doc.

  • Wenchen went through the current status and the new proposal.
    • Not many questions, the API and behavior are clear and understandable.
    • Some discussion, started by Dilip about how join push-down will work. Ryan noted that we just need to make sure that the design doesn’t preclude reasonable options for later. Wenchen suggested one such option, to add methods to push a join into the ScanBuilder. It isn’t clear how exactly this will work, but consensus seemed to be that this will not break later designs. Dilip has a join push-down design doc (please reply with a link!).
    • Consensus was to go with the new proposal.
  • Wenchen went through alternative #1, which merges Scan into the next layer to produce BatchScan, MicroBatchStreamScan, ContinuousStreamScan
    • Ryan’s commentary: concerned that Scan is a distinct concept and may be useful in implementations. Would merging it into other objects cause duplication or force an inheritance hierarchy? Clearly, the names show that it is mixing two concepts: BatchScan = Batch + Scan
    • Matt commented that it seems unlikely that Scan will be independently useful
    • Wenchen noted that we can merge later if it isn’t useful
    • Ryan noted that separate interfaces give the most flexibility for implementations. An implementation can create BatchScan that extends both.
    • Conclusion: keep the interfaces separate for now and reassess later.
  • Ryan went through alternative #2, which merges micro-batch and continuous read interfaces
    • To merge execution code, Spark would be responsible for stopping tasks. Tasks would attempt to read forever and Spark determines whether to run a batch or run forever.
    • Some tasks are naturally limited, like data files added to a table. Spark would need to handle tasks stopping themselves early.
    • Some tasks are naturally boundless, like Kafka topic partitions. Tasks would need to provide offsets for Spark to decide when to stop reading.
    • The resulting task reader behavior is awkward and no longer fits either naturally limited (must provide “offset”) nor naturally boundless tasks (why stop early? why use micro-batch?)
    • Conclusion was to have simpler APIs by keeping modes separate.

On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue <[hidden email]> wrote:

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous APIs:

The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.

Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}
--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix