Possible bug in DatasourceV2

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible bug in DatasourceV2

assaf.mendelson
Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in DatasourceV2

Hyukjin Kwon

+WEnchen, here looks the problem raised. This might have to be considered as a blocker ... 


On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <[hidden email]> wrote:
Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: Possible bug in DatasourceV2

assaf.mendelson

Actually, it is not just a question of a write only data source. The issue is that in my case (and I imagine this is true for others), the schema is not read from the database but is understood from the options. This means that I have no way of understanding the schema without supplying the read options. On the other hand, when writing, I have the schema from the dataframe.

 

I know the data source V2 API is considered experimental API and I have no problem with it, however, this means that the change will require a change in how the end user works with it (they suddenly need to add schema information which they did not before), not to mention this being a regression.

 

As to the pull request, this only handles cases where the save mode is not append, for the original example (having non existent path but have append will still fail and according to the documentation of Append, if the path does not exist it should create it).

 

I am currently having problem compiling everything so I can’t test it myself but wouldn’t changing the relation definition in “save”:

 

val relation = DataSourceV2Relation.create(source, options, None, Option(df.schema))

 

and changing create to look like this:

 

def create(source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {

    val schema = userSpecifiedSchema.getOrElse(source.createReader(options, userSpecifiedSchema).readSchema())

    val ident = tableIdent.orElse(tableFromOptions(options))

    DataSourceV2Relation(

      source, schema.toAttributes, options, ident, userSpecifiedSchema)

  }

 

Correct this?

 

Or even creating a new create which simply gets the schema as non optional?

 

Thanks,

        Assaf

 

From: Hyukjin Kwon [mailto:[hidden email]]
Sent: Thursday, October 11, 2018 10:24 AM
To: Mendelson, Assaf; Wenchen Fan
Cc: dev
Subject: Re: Possible bug in DatasourceV2

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

 

+WEnchen, here looks the problem raised. This might have to be considered as a blocker ... 

On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <[hidden email]> wrote:

Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in DatasourceV2

Hyukjin Kwon
That's why I initially suggested to revert this part out of Spark 2.4 and have more discussion at 3.0 since one of the design goal of Data source V2 is no behaviour changes to end users.

2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf <[hidden email]>님이 작성:

Actually, it is not just a question of a write only data source. The issue is that in my case (and I imagine this is true for others), the schema is not read from the database but is understood from the options. This means that I have no way of understanding the schema without supplying the read options. On the other hand, when writing, I have the schema from the dataframe.

 

I know the data source V2 API is considered experimental API and I have no problem with it, however, this means that the change will require a change in how the end user works with it (they suddenly need to add schema information which they did not before), not to mention this being a regression.

 

As to the pull request, this only handles cases where the save mode is not append, for the original example (having non existent path but have append will still fail and according to the documentation of Append, if the path does not exist it should create it).

 

I am currently having problem compiling everything so I can’t test it myself but wouldn’t changing the relation definition in “save”:

 

val relation = DataSourceV2Relation.create(source, options, None, Option(df.schema))

 

and changing create to look like this:

 

def create(source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {

    val schema = userSpecifiedSchema.getOrElse(source.createReader(options, userSpecifiedSchema).readSchema())

    val ident = tableIdent.orElse(tableFromOptions(options))

    DataSourceV2Relation(

      source, schema.toAttributes, options, ident, userSpecifiedSchema)

  }

 

Correct this?

 

Or even creating a new create which simply gets the schema as non optional?

 

Thanks,

        Assaf

 

From: Hyukjin Kwon [mailto:[hidden email]]
Sent: Thursday, October 11, 2018 10:24 AM
To: Mendelson, Assaf; Wenchen Fan
Cc: dev
Subject: Re: Possible bug in DatasourceV2

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

 

+WEnchen, here looks the problem raised. This might have to be considered as a blocker ... 

On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <[hidden email]> wrote:

Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in DatasourceV2

cloud0fan
Hi Hyukjin, can you open a PR to revert it from 2.4? Now I'm kind of convinced this is too breaking and we need more discussion.

+ Ryan Blue
Hi Ryan,
I think we need to look back at the new write API design and consider data sources that don't have table concept. We should opt-in for the schema validation of append operator.

On Thu, Oct 11, 2018 at 8:12 PM Hyukjin Kwon <[hidden email]> wrote:
That's why I initially suggested to revert this part out of Spark 2.4 and have more discussion at 3.0 since one of the design goal of Data source V2 is no behaviour changes to end users.

2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf <[hidden email]>님이 작성:

Actually, it is not just a question of a write only data source. The issue is that in my case (and I imagine this is true for others), the schema is not read from the database but is understood from the options. This means that I have no way of understanding the schema without supplying the read options. On the other hand, when writing, I have the schema from the dataframe.

 

I know the data source V2 API is considered experimental API and I have no problem with it, however, this means that the change will require a change in how the end user works with it (they suddenly need to add schema information which they did not before), not to mention this being a regression.

 

As to the pull request, this only handles cases where the save mode is not append, for the original example (having non existent path but have append will still fail and according to the documentation of Append, if the path does not exist it should create it).

 

I am currently having problem compiling everything so I can’t test it myself but wouldn’t changing the relation definition in “save”:

 

val relation = DataSourceV2Relation.create(source, options, None, Option(df.schema))

 

and changing create to look like this:

 

def create(source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {

    val schema = userSpecifiedSchema.getOrElse(source.createReader(options, userSpecifiedSchema).readSchema())

    val ident = tableIdent.orElse(tableFromOptions(options))

    DataSourceV2Relation(

      source, schema.toAttributes, options, ident, userSpecifiedSchema)

  }

 

Correct this?

 

Or even creating a new create which simply gets the schema as non optional?

 

Thanks,

        Assaf

 

From: Hyukjin Kwon [mailto:[hidden email]]
Sent: Thursday, October 11, 2018 10:24 AM
To: Mendelson, Assaf; Wenchen Fan
Cc: dev
Subject: Re: Possible bug in DatasourceV2

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

 

+WEnchen, here looks the problem raised. This might have to be considered as a blocker ... 

On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <[hidden email]> wrote:

Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Possible bug in DatasourceV2

Hyukjin Kwon
Thanks, Wenchen.

but I believe https://github.com/apache/spark/pull/22688 is still valid as well in the master branch since master branch still supports i, the readsupport should be made only when it's needed and it's already open.

2018년 10월 11일 (목) 오후 8:19, Wenchen Fan <[hidden email]>님이 작성:
Hi Hyukjin, can you open a PR to revert it from 2.4? Now I'm kind of convinced this is too breaking and we need more discussion.

+ Ryan Blue
Hi Ryan,
I think we need to look back at the new write API design and consider data sources that don't have table concept. We should opt-in for the schema validation of append operator.

On Thu, Oct 11, 2018 at 8:12 PM Hyukjin Kwon <[hidden email]> wrote:
That's why I initially suggested to revert this part out of Spark 2.4 and have more discussion at 3.0 since one of the design goal of Data source V2 is no behaviour changes to end users.

2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf <[hidden email]>님이 작성:

Actually, it is not just a question of a write only data source. The issue is that in my case (and I imagine this is true for others), the schema is not read from the database but is understood from the options. This means that I have no way of understanding the schema without supplying the read options. On the other hand, when writing, I have the schema from the dataframe.

 

I know the data source V2 API is considered experimental API and I have no problem with it, however, this means that the change will require a change in how the end user works with it (they suddenly need to add schema information which they did not before), not to mention this being a regression.

 

As to the pull request, this only handles cases where the save mode is not append, for the original example (having non existent path but have append will still fail and according to the documentation of Append, if the path does not exist it should create it).

 

I am currently having problem compiling everything so I can’t test it myself but wouldn’t changing the relation definition in “save”:

 

val relation = DataSourceV2Relation.create(source, options, None, Option(df.schema))

 

and changing create to look like this:

 

def create(source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {

    val schema = userSpecifiedSchema.getOrElse(source.createReader(options, userSpecifiedSchema).readSchema())

    val ident = tableIdent.orElse(tableFromOptions(options))

    DataSourceV2Relation(

      source, schema.toAttributes, options, ident, userSpecifiedSchema)

  }

 

Correct this?

 

Or even creating a new create which simply gets the schema as non optional?

 

Thanks,

        Assaf

 

From: Hyukjin Kwon [mailto:[hidden email]]
Sent: Thursday, October 11, 2018 10:24 AM
To: Mendelson, Assaf; Wenchen Fan
Cc: dev
Subject: Re: Possible bug in DatasourceV2

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

 

+WEnchen, here looks the problem raised. This might have to be considered as a blocker ... 

On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, <[hidden email]> wrote:

Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions
-->      val relation = DataSourceV2Relation.create(source, options)

          if (mode == SaveMode.Append) {
            runCommand(df.sparkSession, "save") {
              AppendData.byName(relation, df.logicalPlan)
            }

          } else {
            val writer = ws.createWriter(
              UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
              new DataSourceOptions(options.asJava))

            if (writer.isPresent) {
              runCommand(df.sparkSession, "save") {
                WriteToDataSourceV2(writer.get, df.logicalPlan)
              }
            }
          }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
      source: DataSourceV2,
      options: Map[String, String],
      tableIdent: Option[TableIdentifier] = None,
      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
    val reader = source.createReader(options, userSpecifiedSchema)
    val ident = tableIdent.orElse(tableFromOptions(options))
    DataSourceV2Relation(
      source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]