DataSourceV2 sync notes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

DataSourceV2 sync notes

Ryan Blue

Here are my notes from the DSv2 sync last night.

As usual, I didn’t take great notes because I was participating in the discussion. Feel free to send corrections or clarification.

Ryan Blue
John Zhuge
Xiao Li
Reynold Xin
Felix Cheung
Anton Okolnychyi
Bruce Robbins
Dale Richardson
Dongjoon Hyun
Genliang Wang
Matt Cheah
Russel Spitzer
Wenchen Fan
Maryann Xue
Jacky Lee


  • Passing query ID to DSv2 write builder
  • OutputMode in DSv2 writes
  • Catalog API framing to help clarify the discussion
  • Catalog API plugin system
  • Catlaog API approach (separate TableCatalog, FunctionCatalog, UDFCatalog, etc.)
  • Proposed TableCatalog API for catalog plugins (SPIP)
  • Migration to new Catalog API from ExternalCatalog
  • Proposed user-facing API for table catalog operations (SPIP)


  • Passing query ID to DSv2 write builder
    • Wenchen: query ID is needed for some sources
    • Reynold: this may be useful for batch if batch ever supports recovery or resuming a write
    • Ryan: this may also be useful in sources to identify the outputs of a particular batch. Iceberg already generates a UUID for each write.
    • Wenchen proposed adding query ID to the newWriteBuilder factory method
    • Ryan: if args to newWriteBuilder will change over time, then it is easier to maintain them as builder methods
    • Reynold: what does the read side do? [Note: it passes no arguments to newScanBuilder]
    • No one knew, so we tabled further discussion
  • OutputMode
    • Wenchen: OutputMode instructs current sources how to handle rows written to them.
    • Reynold: discussed with Michael and OutputMode doesn’t need to be in this new public API.
    • Ryan: the problem with OutputMode, specifically Update, is that it is ambiguous like SaveMode. Update mode doesn’t tell a sink how to identify the records to replace.
    • Reynold: nothing implements Update and it is only defined internally, only Append and Complete are needed
    • Ryan: that works for source implementations, but users can pass the mode in the API, so Spark should still throw an exception
    • Reynold added SupportsTruncate (similar to SupportsOverwrite) to the proposal to configure Complete mode.
    • Ryan added SupportsUpdate to the proposal for how Update mode could be configured
    • Discussion clarified that truncate, like overwrite, is not required to happen immediately when the builder is configured. This would allow sources to implement truncate and append in a single atomic operation for each epoch.
  • Catalog API framing: there are several issues to discuss. To make discussions more effective, let’s try to keep these areas separate:
    • A plugin system for catalog implementations
    • The approach to replace ExternalCatalog with multiple interfaces for specific purposes (TableCatalog, FunctionCatalog, etc.)
    • The proposed TableCatalog methods
    • How to migrate from ExternalCatalog to a new catalog API
    • A public API that can handle operations that are currently supported only in DDL
    • Let’s try to keep these discussions separate to make progress on each area
  • TableCatalog API proposal: an interface for plugins to expose create/load/alter/drop operations for tables.
    • Ryan provided a summary and pointed out that the part under discussion is the plugin API, TableCatalog, NOT the public API
    • Maryann: the proposed API uses 2-level identifiers, but many databases support 3 levels
    • Ryan: 2-level identifiers in a catalog is a reasonable simplification? Presto does this. A catalog could be used for each entry in the top level in a 3-level database. Otherwise, would Spark support 4 levels?
    • Someone?: identifiers should support arbitrary levels and the problem should be delegated to the catalog. Flexible table identifiers could be used to also support path-based tables in the same API
    • Consensus seemed to be around using arbitrary identifiers. [Will follow up with a DISCUSS thread]
    • Ryan: identifier is actually orthogonal to most of the proposal; it is enough that we assume there is some identifier. Let’s consider this proposal with TableIdentifier as a place-holder. Any other concerns about this API?
    • Maryann: what about replacing an entire schema [Note: Hive syntax supports this]. Alter table passes individual changes, which is difficult for some sources to handle.
    • Ryan: the purpose of individual changes is to match how SQL changes are expressed. This avoids forcing sources to diff schema trees, which would be error prone. Mirroring SQL also avoids schema evolution problems caused by replacing a schema with another schema because deletes and adds are explicit.
    • Matt: this API is simple and can express all of the changes needed
    • Xiao: why is bucketBy in the proposed public API?
    • Dale: is it possible to pass through column-level configuration? For example, column-level compression or encoding config for Kudu.
    • Reynold: StructType supports key-value metadata for this
    • Ryan: the public API should support that metadata
    • Reynold: can this proposal cover how to replace all methods of ExternalCatalog?
    • Ryan: this proposal isn’t intended to replace all of ExternalCatalog, only the table CRUD methods. Some ExternalCatalog methods may not be in the new API.
    • Reynold: it would be helpful to have that information
    • At this point, time ran out and we decided to resume next time discussing a plan to replace ExternalCatalog with a new catalog API.
Ryan Blue
Software Engineer