[How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

OBones
Hello,

I'm trying to extend Spark so that it can use our own binary format as a
read-only source for pipeline based computations.
I already have a java class that gives me enough elements to build a
complete StructType with enough metadata (NominalAttribute for instance).
It also gives me the row count for the file and methods to read any
given cell, as it basically is a giant array of values stored on disk.
In order for this to plug properly in the Spark framework, I looked at
the CSV source code and thus created a DefaultSource class in my
package, this way:

class DefaultSource
   extends RelationProvider
   with DataSourceRegister {

   override def shortName(): String = "binfile"

   private def checkPath(parameters: Map[String, String]): String = {
     parameters.getOrElse("path", sys.error("'path' must be specified
for BinFile data."))
   }
   override def createRelation(
                                sqlContext: SQLContext,
                                parameters: Map[String, String]):
BaseRelation = {
     val path = checkPath(parameters)
     BinFileRelation(Some(path))(sqlContext)
   }
}

I also created the BinFileRelation like this:

case class BinFileRelation /*protected[spark]*/ (
     location: Option[String])(@transient val sqlContext: SQLContext)
   extends BaseRelation with TableScan {

   private val reader = new BinFileReader(location.getOrElse(""))
   override val schema: StructType = {
     // retrieve column infos from reader, transform it into a valid
StructType with two columns,
     // the first being the label, the second being the vector of features
   }

   override def buildScan: RDD[Row] = {
     // I have no idea what to return here, so null for now.
     null
   }
}

So, as you see, I managed to create the required code to return a valid
schema, and was also able to write unittests for it.
I copied "protected[spark]" from the CSV implementation, but I commented
it out because it prevents compilation from being successful and it does
not seem to be required.
And most importantly, I have no idea how to create a valid dataframe to
be returned by buildScan so that the data that is stored on disk is not
loaded all at once in memory (it may be very huge, like hundreds of
millions of rows).
I read the documentation here:
https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
It says "Concrete implementation should inherit from one of the
descendant Scan classes" but I could not find those any of those
descendant in the documentation nor in the source code.

Looking further in the code for "BaseRelation" I found the JDBCRelation
class that implements buildScan by calling JDBCRDD.scanTable so I went
looking at this method which basically creates an instance of the
private class named JDBCRDD as well.
This class extends Row[InternalRow] so it looks to me as if I should to
the same for my own use
However, I'm not sure how to implement the compute method for a simple
read as mentioned above.

Any help would be greatly appreciated.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

Sandeep Joshi


On Thu, Jun 22, 2017 at 7:51 PM, OBones <[hidden email]> wrote:
Hello,

I'm trying to extend Spark so that it can use our own binary format as a read-only source for pipeline based computations.
I already have a java class that gives me enough elements to build a complete StructType with enough metadata (NominalAttribute for instance).
It also gives me the row count for the file and methods to read any given cell, as it basically is a giant array of values stored on disk.
In order for this to plug properly in the Spark framework, I looked at the CSV source code and thus created a DefaultSource class in my package, this way:

class DefaultSource
  extends RelationProvider
  with DataSourceRegister {

  override def shortName(): String = "binfile"

  private def checkPath(parameters: Map[String, String]): String = {
    parameters.getOrElse("path", sys.error("'path' must be specified for BinFile data."))
  }
  override def createRelation(
                               sqlContext: SQLContext,
                               parameters: Map[String, String]): BaseRelation = {
    val path = checkPath(parameters)
    BinFileRelation(Some(path))(sqlContext)
  }
}

I also created the BinFileRelation like this:

case class BinFileRelation /*protected[spark]*/ (
    location: Option[String])(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  private val reader = new BinFileReader(location.getOrElse(""))
  override val schema: StructType = {
    // retrieve column infos from reader, transform it into a valid StructType with two columns,
    // the first being the label, the second being the vector of features
  }

  override def buildScan: RDD[Row] = {
    // I have no idea what to return here, so null for now.
    null
  }
}

So, as you see, I managed to create the required code to return a valid schema, and was also able to write unittests for it.
I copied "protected[spark]" from the CSV implementation, but I commented it out because it prevents compilation from being successful and it does not seem to be required.
And most importantly, I have no idea how to create a valid dataframe to be returned by buildScan so that the data that is stored on disk is not loaded all at once in memory (it may be very huge, like hundreds of millions of rows).


You are effectively building a datasource for Spark

You can subclass the RDD class and create your own RDD which will be returned in buildScan above.
This RDD class must implement a compute() method which will return an Iterator
The iterator.next() will then be invoked by Spark as it executes.


 
I read the documentation here: https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
It says "Concrete implementation should inherit from one of the descendant Scan classes" but I could not find those any of those descendant in the documentation nor in the source code.

The scan classes referred to here are these, in addition to the CatalystScan at the bottom of the file

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L245-L277




Looking further in the code for "BaseRelation" I found the JDBCRelation class that implements buildScan by calling JDBCRDD.scanTable so I went looking at this method which basically creates an instance of the private class named JDBCRDD as well.
This class extends Row[InternalRow] so it looks to me as if I should to the same for my own use
However, I'm not sure how to implement the compute method for a simple read as mentioned above.

Any help would be greatly appreciated.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

OBones
Sandeep Joshi wrote:

>
>     So, as you see, I managed to create the required code to return a
>     valid schema, and was also able to write unittests for it.
>     I copied "protected[spark]" from the CSV implementation, but I
>     commented it out because it prevents compilation from being
>     successful and it does not seem to be required.
>     And most importantly, I have no idea how to create a valid
>     dataframe to be returned by buildScan so that the data that is
>     stored on disk is not loaded all at once in memory (it may be very
>     huge, like hundreds of millions of rows).
>
>
>
> You are effectively building a datasource for Spark
>
> You can subclass the RDD class and create your own RDD which will be
> returned in buildScan above.
> This RDD class must implement a compute() method which will return an
> Iterator
> The iterator.next() will then be invoked by Spark as it executes.
>
> Look at how the Cassandra connector does it
> https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L354
Ah yes, that makes sense. Somehow, I was fixating on creating a RDD[Row]
instance instead of deriving my own class from RDD[Row].

>     I read the documentation here:
>     https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
>     <https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html>
>     It says "Concrete implementation should inherit from one of the
>     descendant Scan classes" but I could not find those any of those
>     descendant in the documentation nor in the source code.
>
>
> The scan classes referred to here are these, in addition to the
> CatalystScan at the bottom of the file
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L245-L277
Great, it clarifies the situation.

With that in mind, I was able to create the complete set of classes and
work with my custom format.
Thanks for your help.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Loading...