Questions about the future of UDTs and Encoders

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about the future of UDTs and Encoders

hypatian
Hi, all!


I'm a developer who works to support data scientists at CERT. We've
been having some great success working with Spark for data analysis,
and I have some questions about how we could contribute to work on
Spark in support of our goals.

Specifically, we have some interest in user-defined types, or their
equivalents.


When Spark 2 arrived, user-defined types (UDTs) were made private and
seem to have fallen by the wayside in favor of using encoders for
Datasets. I have some questions about the future of these mechanisms,
and was wondering if there's been a plan published anywhere for the
future of these mechanisms, or anyone I could talk to about where
things are going with them.

I've roughly outlined our experience with these two mechanisms below,
and our hopes for what might be accomplished in the future.

We'd love to spend some effort on development here, but haven't been
able to figure out if anyone is already working on improvements in
this area, or if there's some plan in place for where things are going
to go.

So, I'd love to get in touch with anyone who might know more.


Background:

Much of the work in my group is analysis of Internet protocol data,
and I think that IP addresses are a great example how a custom atomic
type can be helpful.

IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
addresses) have a natural binary form (a sequence of bytes). Using
this format makes the default implementation of certain basic
operations sensible (equality and comparison, for example). Defining
UDFs for more complicated operations is not terribly difficultt. But
this format is not human-friendly to view.

The human-readable presentations of IP addresses, on the other hand,
are large and unwieldy to work with computationally. There is a
canonical textual form for both IPv4 and IPv6 addresses, but
converting back and forth between that form and the binary form is
expensive, and the text form is generally at least twice as large as
the binary form. The text form is suitable for presenting to human
beings, but that's about it.

There are also a variety of other types of Internet data that are best
represented by byte arrays and the like, meaning that simply saying
"just use a byte array for this column!" can be unfortunate for both
type-safety and comprehensibility of a colleciton of data.


When we were working on top of Spark 1, we had begun to look using
UDTs to represent IP addresses. There were some issues with working
with UDTs and working with the built-in operations like comparisons,
but we had some hope for improvements with future Spark releases.

With Spark 2.0, the UDT API was made private, and the encoder
mechanism was suggested for use instead. For a bit, we experimented
with using the API anyway by putting stubs into Spark's namespace, but
there weren't really a lot of good places to hook various operations
like equality that one would expect to work on an atomic type.


We also tried using the encoder APIs, and ran into a few problems
there as well. Encoders are well suited to handling "top-level"
values, but the most convenient way to work with encoded data is by
having your top level be a case class defining types and names for a
record type. And here, there's a problem, because encoders from the
implicit environment are not available when encoding the fields of a
case class. So, if we defined a custom encoder for our IPAddress type,
and then included an IPAddress as a field of a record, this would
result in an error.

One approach we tried to get around that was to make IP addresses
themselves into case classes as well, so that only the default
encoders would be required. This eliminated the error, but made
working with the values a nightmare. If we made a Dataset[IPAddress],
the byte array would be presented in a reasonable manner, but a
Dataset[Rec] where Rec had IPAddress fields was another story,
resulting in the default toString of Java arrays being used:

+-------------+-------------+
|            a|            b|
+-------------+-------------+
|[[B@47260109]|[[B@3538740a]|
|[[B@617f4814]|[[B@77e69bee]|
+-------------+-------------+

(See code snippet at the end of this message for details.)

Now basically all interactions would have to go through UDFs,
including remembering to format the IPAddress field if you wanted any
useful information out of it at all.


As a result, since our initial experiments with 2.0 we dropped back
and punted to using text for all IP addresses. But, we'd still like to
do better. What we optimally want is some mechanism for a user-defined
atomic type (whether via encoders or via registering a new type) which
allows for:

 * An appropriately efficient underlying form to be used. (A struct
   with a byte array field would be fine. A byte array field would be
   fine.)

 * A display form that is meaningful to the user (the expected form
   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)

 * At least some support for standard SQL operators like equality and
   comparison, and the ability to define UDFs that work with the type.

Longer term, it would be lovely to:

 * Be able to work with values of the type in an appropriate way in
   different source languags (i.e. make it not hard to work with the
   values in Python or R, although the restrictions of those languages
   will require additional implementation work.)

 * Be able to provide new Catalyst optimizations specific to the type
   and functions defined on the type.

We'd love to provide some effort at achieving these goals, but aren't
sure where to start. We'd like to avoid stepping in the way of any
efforts that might already be underway to improve these mechanisms.


Thanks very much!

Katherine Prevost
Carnegie Mellon / Software Engineering Institute / CERT


-------------------------------------------------------------------->8--

// Simple example demonstrating the treatment of a case class with a
// byte array within another case class.

case class IPAddress(bytes: Array[Byte]) {
  override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))"
}

val a = IPAddress(Array(1,2,3,4))
val b = IPAddress(Array(5,6,7,8))
val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32))

val x = Array(a, b, c, d)
val xs = sc.parallelize(x).toDS

/*
scala> xs.show
+--------------------+
|               bytes|
+--------------------+
|       [01 02 03 04]|
|       [05 06 07 08]|
|[01 02 03 04 05 0...|
|[11 12 13 14 15 1...|
+--------------------+
*/

case class Rec(a: IPAddress, b: IPAddress) {
  override def toString: String = s"Rec($a, $b)"
}

val e = Rec(a, b)
val f = Rec(c, d)
val y = Array(e, f)
val ys = sc.parallelize(y).toDS

/*
scala> ys.show
+-------------+-------------+
|            a|            b|
+-------------+-------------+
|[[B@47260109]|[[B@3538740a]|
|[[B@617f4814]|[[B@77e69bee]|
+-------------+-------------+
*/

-------------------------------------------------------------------->8--

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

Jörn Franke
Not sure I got to fully understand the issue (source code is always helpful ;-) but why don't you override the toString method of IPAddress. So, IP address could still be byte , but when it is displayed then toString converts the byteaddress into something human-readable?

> On 15. Aug 2017, at 18:49, Katherine Prevost <[hidden email]> wrote:
>
> Hi, all!
>
>
> I'm a developer who works to support data scientists at CERT. We've
> been having some great success working with Spark for data analysis,
> and I have some questions about how we could contribute to work on
> Spark in support of our goals.
>
> Specifically, we have some interest in user-defined types, or their
> equivalents.
>
>
> When Spark 2 arrived, user-defined types (UDTs) were made private and
> seem to have fallen by the wayside in favor of using encoders for
> Datasets. I have some questions about the future of these mechanisms,
> and was wondering if there's been a plan published anywhere for the
> future of these mechanisms, or anyone I could talk to about where
> things are going with them.
>
> I've roughly outlined our experience with these two mechanisms below,
> and our hopes for what might be accomplished in the future.
>
> We'd love to spend some effort on development here, but haven't been
> able to figure out if anyone is already working on improvements in
> this area, or if there's some plan in place for where things are going
> to go.
>
> So, I'd love to get in touch with anyone who might know more.
>
>
> Background:
>
> Much of the work in my group is analysis of Internet protocol data,
> and I think that IP addresses are a great example how a custom atomic
> type can be helpful.
>
> IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
> addresses) have a natural binary form (a sequence of bytes). Using
> this format makes the default implementation of certain basic
> operations sensible (equality and comparison, for example). Defining
> UDFs for more complicated operations is not terribly difficultt. But
> this format is not human-friendly to view.
>
> The human-readable presentations of IP addresses, on the other hand,
> are large and unwieldy to work with computationally. There is a
> canonical textual form for both IPv4 and IPv6 addresses, but
> converting back and forth between that form and the binary form is
> expensive, and the text form is generally at least twice as large as
> the binary form. The text form is suitable for presenting to human
> beings, but that's about it.
>
> There are also a variety of other types of Internet data that are best
> represented by byte arrays and the like, meaning that simply saying
> "just use a byte array for this column!" can be unfortunate for both
> type-safety and comprehensibility of a colleciton of data.
>
>
> When we were working on top of Spark 1, we had begun to look using
> UDTs to represent IP addresses. There were some issues with working
> with UDTs and working with the built-in operations like comparisons,
> but we had some hope for improvements with future Spark releases.
>
> With Spark 2.0, the UDT API was made private, and the encoder
> mechanism was suggested for use instead. For a bit, we experimented
> with using the API anyway by putting stubs into Spark's namespace, but
> there weren't really a lot of good places to hook various operations
> like equality that one would expect to work on an atomic type.
>
>
> We also tried using the encoder APIs, and ran into a few problems
> there as well. Encoders are well suited to handling "top-level"
> values, but the most convenient way to work with encoded data is by
> having your top level be a case class defining types and names for a
> record type. And here, there's a problem, because encoders from the
> implicit environment are not available when encoding the fields of a
> case class. So, if we defined a custom encoder for our IPAddress type,
> and then included an IPAddress as a field of a record, this would
> result in an error.
>
> One approach we tried to get around that was to make IP addresses
> themselves into case classes as well, so that only the default
> encoders would be required. This eliminated the error, but made
> working with the values a nightmare. If we made a Dataset[IPAddress],
> the byte array would be presented in a reasonable manner, but a
> Dataset[Rec] where Rec had IPAddress fields was another story,
> resulting in the default toString of Java arrays being used:
>
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
>
> (See code snippet at the end of this message for details.)
>
> Now basically all interactions would have to go through UDFs,
> including remembering to format the IPAddress field if you wanted any
> useful information out of it at all.
>
>
> As a result, since our initial experiments with 2.0 we dropped back
> and punted to using text for all IP addresses. But, we'd still like to
> do better. What we optimally want is some mechanism for a user-defined
> atomic type (whether via encoders or via registering a new type) which
> allows for:
>
> * An appropriately efficient underlying form to be used. (A struct
>   with a byte array field would be fine. A byte array field would be
>   fine.)
>
> * A display form that is meaningful to the user (the expected form
>   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)
>
> * At least some support for standard SQL operators like equality and
>   comparison, and the ability to define UDFs that work with the type.
>
> Longer term, it would be lovely to:
>
> * Be able to work with values of the type in an appropriate way in
>   different source languags (i.e. make it not hard to work with the
>   values in Python or R, although the restrictions of those languages
>   will require additional implementation work.)
>
> * Be able to provide new Catalyst optimizations specific to the type
>   and functions defined on the type.
>
> We'd love to provide some effort at achieving these goals, but aren't
> sure where to start. We'd like to avoid stepping in the way of any
> efforts that might already be underway to improve these mechanisms.
>
>
> Thanks very much!
>
> Katherine Prevost
> Carnegie Mellon / Software Engineering Institute / CERT
>
>
> -------------------------------------------------------------------->8--
>
> // Simple example demonstrating the treatment of a case class with a
> // byte array within another case class.
>
> case class IPAddress(bytes: Array[Byte]) {
>  override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))"
> }
>
> val a = IPAddress(Array(1,2,3,4))
> val b = IPAddress(Array(5,6,7,8))
> val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
> val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32))
>
> val x = Array(a, b, c, d)
> val xs = sc.parallelize(x).toDS
>
> /*
> scala> xs.show
> +--------------------+
> |               bytes|
> +--------------------+
> |       [01 02 03 04]|
> |       [05 06 07 08]|
> |[01 02 03 04 05 0...|
> |[11 12 13 14 15 1...|
> +--------------------+
> */
>
> case class Rec(a: IPAddress, b: IPAddress) {
>  override def toString: String = s"Rec($a, $b)"
> }
>
> val e = Rec(a, b)
> val f = Rec(c, d)
> val y = Array(e, f)
> val ys = sc.parallelize(y).toDS
>
> /*
> scala> ys.show
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
> */
>
> -------------------------------------------------------------------->8--
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

Katherine Prevost
I'd say the quick summary of the problem is this:

The encoder mechanism does not deal well with fields of case classes (you must use builtin types (including other case classes) for case class fields), and UDTs are not currently available (and never integrated well with built-in operations).

Encoders work great for individual fields if you're using tuples, but once you get up over four or five fields this becomes incomprehensible. And, of course, encoders do nothing for you once you are in the realm of dataframes (including operations on fields, results of dataframe-based methods, and working in languages other than Scala.)

The sort of machinations I describe below are unpleasant but not a huge deal for people who are trained as developers... but they're a much bigger mess when we have to provide these interfaces to our data scientists. Yes, they can do it, but the "every address is a string and you have to use these functions that parse the strings over and over again" approach is easier to use (if massively inefficient).

I would like to improve Spark so that we can provide these types that our data scientists need to use *all the time* in a way that's both efficient and easy to use.

Hence, my interest in doing work on the UDT and/or Encoder mechanisms of Spark (or equivalent, if something new is in the works), and my interest in hearing from anybody who is already working in this area, or hearing about any future plans that have already been made in this area.


In more detail:

On Wed, Aug 16, 2017 at 2:49 AM Jörn Franke <[hidden email]> wrote:
Not sure I got to fully understand the issue (source code is always helpful ;-) but why don't you override the toString method of IPAddress. So, IP address could still be byte , but when it is displayed then toString converts the byteaddress into something human-readable?
 
There are a couple of reasons it's not that simple. (If you look at the sample snippets of code I did include, you'll see that I did define toString methods.)

The first problem is basically because toString doesn't happen when working with DataFrames, which are often the result of common Spark operations in Scala (though staying in the realm of Datasets is getting easier, and apparently also becoming more efficient). Outside of Scala, it's DataFrames all the way down.

(If you look at my example code, you'll also see what happens when you have a DataFrame with a field that is a struct with a byte array in it, and nobody ever wants to see "[B@617f4814".)

You can get around that (as long as you're still in a Dataset) with something like this (this is using the IPAddress.toString method to produce "IPAddress(Array(1,2,3,4))"):

scala> ys.take(20)
res10: Array[Rec] = Array(Rec(IPAddress(Array(1, 2, 3, 4)), IPAddress(Array(5, 6, 7, 8))), Rec(IPAddress(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)), IPAddress(Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32))))

But then of course you lose any easy ability to view Rec fields in columns. (And while you could make something that prints Rec as columns, what happens once you transform your record and turn it into a tuple?)

The second one is that operating on the fields cleanly is still rather painful, even if the values were to be displayed cleanly. This is what you have to do to search for rows that have a specific IPAddress value (ys("a") is a column of IPAddress, a is an IPAddress):

scala> ys.select(ys("a.bytes") === a.bytes)
res9: org.apache.spark.sql.DataFrame = [(a.bytes AS `bytes` = X'01020304'): boolean]

It's worth noting that an implicit conversion from IPAddress to Array[Byte] or to Column wouldn't work here, because === accepts Any.


katherine.

> On 15. Aug 2017, at 18:49, Katherine Prevost <[hidden email]> wrote:
>
> Hi, all!
>
>
> I'm a developer who works to support data scientists at CERT. We've
> been having some great success working with Spark for data analysis,
> and I have some questions about how we could contribute to work on
> Spark in support of our goals.
>
> Specifically, we have some interest in user-defined types, or their
> equivalents.
>
>
> When Spark 2 arrived, user-defined types (UDTs) were made private and
> seem to have fallen by the wayside in favor of using encoders for
> Datasets. I have some questions about the future of these mechanisms,
> and was wondering if there's been a plan published anywhere for the
> future of these mechanisms, or anyone I could talk to about where
> things are going with them.
>
> I've roughly outlined our experience with these two mechanisms below,
> and our hopes for what might be accomplished in the future.
>
> We'd love to spend some effort on development here, but haven't been
> able to figure out if anyone is already working on improvements in
> this area, or if there's some plan in place for where things are going
> to go.
>
> So, I'd love to get in touch with anyone who might know more.
>
>
> Background:
>
> Much of the work in my group is analysis of Internet protocol data,
> and I think that IP addresses are a great example how a custom atomic
> type can be helpful.
>
> IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
> addresses) have a natural binary form (a sequence of bytes). Using
> this format makes the default implementation of certain basic
> operations sensible (equality and comparison, for example). Defining
> UDFs for more complicated operations is not terribly difficultt. But
> this format is not human-friendly to view.
>
> The human-readable presentations of IP addresses, on the other hand,
> are large and unwieldy to work with computationally. There is a
> canonical textual form for both IPv4 and IPv6 addresses, but
> converting back and forth between that form and the binary form is
> expensive, and the text form is generally at least twice as large as
> the binary form. The text form is suitable for presenting to human
> beings, but that's about it.
>
> There are also a variety of other types of Internet data that are best
> represented by byte arrays and the like, meaning that simply saying
> "just use a byte array for this column!" can be unfortunate for both
> type-safety and comprehensibility of a colleciton of data.
>
>
> When we were working on top of Spark 1, we had begun to look using
> UDTs to represent IP addresses. There were some issues with working
> with UDTs and working with the built-in operations like comparisons,
> but we had some hope for improvements with future Spark releases.
>
> With Spark 2.0, the UDT API was made private, and the encoder
> mechanism was suggested for use instead. For a bit, we experimented
> with using the API anyway by putting stubs into Spark's namespace, but
> there weren't really a lot of good places to hook various operations
> like equality that one would expect to work on an atomic type.
>
>
> We also tried using the encoder APIs, and ran into a few problems
> there as well. Encoders are well suited to handling "top-level"
> values, but the most convenient way to work with encoded data is by
> having your top level be a case class defining types and names for a
> record type. And here, there's a problem, because encoders from the
> implicit environment are not available when encoding the fields of a
> case class. So, if we defined a custom encoder for our IPAddress type,
> and then included an IPAddress as a field of a record, this would
> result in an error.
>
> One approach we tried to get around that was to make IP addresses
> themselves into case classes as well, so that only the default
> encoders would be required. This eliminated the error, but made
> working with the values a nightmare. If we made a Dataset[IPAddress],
> the byte array would be presented in a reasonable manner, but a
> Dataset[Rec] where Rec had IPAddress fields was another story,
> resulting in the default toString of Java arrays being used:
>
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
>
> (See code snippet at the end of this message for details.)
>
> Now basically all interactions would have to go through UDFs,
> including remembering to format the IPAddress field if you wanted any
> useful information out of it at all.
>
>
> As a result, since our initial experiments with 2.0 we dropped back
> and punted to using text for all IP addresses. But, we'd still like to
> do better. What we optimally want is some mechanism for a user-defined
> atomic type (whether via encoders or via registering a new type) which
> allows for:
>
> * An appropriately efficient underlying form to be used. (A struct
>   with a byte array field would be fine. A byte array field would be
>   fine.)
>
> * A display form that is meaningful to the user (the expected form
>   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)
>
> * At least some support for standard SQL operators like equality and
>   comparison, and the ability to define UDFs that work with the type.
>
> Longer term, it would be lovely to:
>
> * Be able to work with values of the type in an appropriate way in
>   different source languags (i.e. make it not hard to work with the
>   values in Python or R, although the restrictions of those languages
>   will require additional implementation work.)
>
> * Be able to provide new Catalyst optimizations specific to the type
>   and functions defined on the type.
>
> We'd love to provide some effort at achieving these goals, but aren't
> sure where to start. We'd like to avoid stepping in the way of any
> efforts that might already be underway to improve these mechanisms.
>
>
> Thanks very much!
>
> Katherine Prevost
> Carnegie Mellon / Software Engineering Institute / CERT
>
>
> -------------------------------------------------------------------->8--
>
> // Simple example demonstrating the treatment of a case class with a
> // byte array within another case class.
>
> case class IPAddress(bytes: Array[Byte]) {
>  override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))"
> }
>
> val a = IPAddress(Array(1,2,3,4))
> val b = IPAddress(Array(5,6,7,8))
> val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
> val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32))
>
> val x = Array(a, b, c, d)
> val xs = sc.parallelize(x).toDS
>
> /*
> scala> xs.show
> +--------------------+
> |               bytes|
> +--------------------+
> |       [01 02 03 04]|
> |       [05 06 07 08]|
> |[01 02 03 04 05 0...|
> |[11 12 13 14 15 1...|
> +--------------------+
> */
>
> case class Rec(a: IPAddress, b: IPAddress) {
>  override def toString: String = s"Rec($a, $b)"
> }
>
> val e = Rec(a, b)
> val f = Rec(c, d)
> val y = Array(e, f)
> val ys = sc.parallelize(y).toDS
>
> /*
> scala> ys.show
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
> */
>
> -------------------------------------------------------------------->8--
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

Erik Erlandson-2
I've been working on packaging some UDTs as well.  I have them working in scala and pyspark, although I haven't been able to get them to serialize to parquet, which puzzles me.

Although it works, I have to define UDTs under the org.apache.spark scope due to the privatization, which is a bit awkward.

On Wed, Aug 16, 2017 at 8:55 AM, Katherine Prevost <[hidden email]> wrote:
I'd say the quick summary of the problem is this:

The encoder mechanism does not deal well with fields of case classes (you must use builtin types (including other case classes) for case class fields), and UDTs are not currently available (and never integrated well with built-in operations).

Encoders work great for individual fields if you're using tuples, but once you get up over four or five fields this becomes incomprehensible. And, of course, encoders do nothing for you once you are in the realm of dataframes (including operations on fields, results of dataframe-based methods, and working in languages other than Scala.)

The sort of machinations I describe below are unpleasant but not a huge deal for people who are trained as developers... but they're a much bigger mess when we have to provide these interfaces to our data scientists. Yes, they can do it, but the "every address is a string and you have to use these functions that parse the strings over and over again" approach is easier to use (if massively inefficient).

I would like to improve Spark so that we can provide these types that our data scientists need to use *all the time* in a way that's both efficient and easy to use.

Hence, my interest in doing work on the UDT and/or Encoder mechanisms of Spark (or equivalent, if something new is in the works), and my interest in hearing from anybody who is already working in this area, or hearing about any future plans that have already been made in this area.


In more detail:

On Wed, Aug 16, 2017 at 2:49 AM Jörn Franke <[hidden email]> wrote:
Not sure I got to fully understand the issue (source code is always helpful ;-) but why don't you override the toString method of IPAddress. So, IP address could still be byte , but when it is displayed then toString converts the byteaddress into something human-readable?
 
There are a couple of reasons it's not that simple. (If you look at the sample snippets of code I did include, you'll see that I did define toString methods.)

The first problem is basically because toString doesn't happen when working with DataFrames, which are often the result of common Spark operations in Scala (though staying in the realm of Datasets is getting easier, and apparently also becoming more efficient). Outside of Scala, it's DataFrames all the way down.

(If you look at my example code, you'll also see what happens when you have a DataFrame with a field that is a struct with a byte array in it, and nobody ever wants to see "[B@617f4814".)

You can get around that (as long as you're still in a Dataset) with something like this (this is using the IPAddress.toString method to produce "IPAddress(Array(1,2,3,4))"):

scala> ys.take(20)
res10: Array[Rec] = Array(Rec(IPAddress(Array(1, 2, 3, 4)), IPAddress(Array(5, 6, 7, 8))), Rec(IPAddress(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)), IPAddress(Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32))))

But then of course you lose any easy ability to view Rec fields in columns. (And while you could make something that prints Rec as columns, what happens once you transform your record and turn it into a tuple?)

The second one is that operating on the fields cleanly is still rather painful, even if the values were to be displayed cleanly. This is what you have to do to search for rows that have a specific IPAddress value (ys("a") is a column of IPAddress, a is an IPAddress):

scala> ys.select(ys("a.bytes") === a.bytes)
res9: org.apache.spark.sql.DataFrame = [(a.bytes AS `bytes` = X'01020304'): boolean]

It's worth noting that an implicit conversion from IPAddress to Array[Byte] or to Column wouldn't work here, because === accepts Any.


katherine.

> On 15. Aug 2017, at 18:49, Katherine Prevost <[hidden email]> wrote:
>
> Hi, all!
>
>
> I'm a developer who works to support data scientists at CERT. We've
> been having some great success working with Spark for data analysis,
> and I have some questions about how we could contribute to work on
> Spark in support of our goals.
>
> Specifically, we have some interest in user-defined types, or their
> equivalents.
>
>
> When Spark 2 arrived, user-defined types (UDTs) were made private and
> seem to have fallen by the wayside in favor of using encoders for
> Datasets. I have some questions about the future of these mechanisms,
> and was wondering if there's been a plan published anywhere for the
> future of these mechanisms, or anyone I could talk to about where
> things are going with them.
>
> I've roughly outlined our experience with these two mechanisms below,
> and our hopes for what might be accomplished in the future.
>
> We'd love to spend some effort on development here, but haven't been
> able to figure out if anyone is already working on improvements in
> this area, or if there's some plan in place for where things are going
> to go.
>
> So, I'd love to get in touch with anyone who might know more.
>
>
> Background:
>
> Much of the work in my group is analysis of Internet protocol data,
> and I think that IP addresses are a great example how a custom atomic
> type can be helpful.
>
> IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
> addresses) have a natural binary form (a sequence of bytes). Using
> this format makes the default implementation of certain basic
> operations sensible (equality and comparison, for example). Defining
> UDFs for more complicated operations is not terribly difficultt. But
> this format is not human-friendly to view.
>
> The human-readable presentations of IP addresses, on the other hand,
> are large and unwieldy to work with computationally. There is a
> canonical textual form for both IPv4 and IPv6 addresses, but
> converting back and forth between that form and the binary form is
> expensive, and the text form is generally at least twice as large as
> the binary form. The text form is suitable for presenting to human
> beings, but that's about it.
>
> There are also a variety of other types of Internet data that are best
> represented by byte arrays and the like, meaning that simply saying
> "just use a byte array for this column!" can be unfortunate for both
> type-safety and comprehensibility of a colleciton of data.
>
>
> When we were working on top of Spark 1, we had begun to look using
> UDTs to represent IP addresses. There were some issues with working
> with UDTs and working with the built-in operations like comparisons,
> but we had some hope for improvements with future Spark releases.
>
> With Spark 2.0, the UDT API was made private, and the encoder
> mechanism was suggested for use instead. For a bit, we experimented
> with using the API anyway by putting stubs into Spark's namespace, but
> there weren't really a lot of good places to hook various operations
> like equality that one would expect to work on an atomic type.
>
>
> We also tried using the encoder APIs, and ran into a few problems
> there as well. Encoders are well suited to handling "top-level"
> values, but the most convenient way to work with encoded data is by
> having your top level be a case class defining types and names for a
> record type. And here, there's a problem, because encoders from the
> implicit environment are not available when encoding the fields of a
> case class. So, if we defined a custom encoder for our IPAddress type,
> and then included an IPAddress as a field of a record, this would
> result in an error.
>
> One approach we tried to get around that was to make IP addresses
> themselves into case classes as well, so that only the default
> encoders would be required. This eliminated the error, but made
> working with the values a nightmare. If we made a Dataset[IPAddress],
> the byte array would be presented in a reasonable manner, but a
> Dataset[Rec] where Rec had IPAddress fields was another story,
> resulting in the default toString of Java arrays being used:
>
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
>
> (See code snippet at the end of this message for details.)
>
> Now basically all interactions would have to go through UDFs,
> including remembering to format the IPAddress field if you wanted any
> useful information out of it at all.
>
>
> As a result, since our initial experiments with 2.0 we dropped back
> and punted to using text for all IP addresses. But, we'd still like to
> do better. What we optimally want is some mechanism for a user-defined
> atomic type (whether via encoders or via registering a new type) which
> allows for:
>
> * An appropriately efficient underlying form to be used. (A struct
>   with a byte array field would be fine. A byte array field would be
>   fine.)
>
> * A display form that is meaningful to the user (the expected form
>   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)
>
> * At least some support for standard SQL operators like equality and
>   comparison, and the ability to define UDFs that work with the type.
>
> Longer term, it would be lovely to:
>
> * Be able to work with values of the type in an appropriate way in
>   different source languags (i.e. make it not hard to work with the
>   values in Python or R, although the restrictions of those languages
>   will require additional implementation work.)
>
> * Be able to provide new Catalyst optimizations specific to the type
>   and functions defined on the type.
>
> We'd love to provide some effort at achieving these goals, but aren't
> sure where to start. We'd like to avoid stepping in the way of any
> efforts that might already be underway to improve these mechanisms.
>
>
> Thanks very much!
>
> Katherine Prevost
> Carnegie Mellon / Software Engineering Institute / CERT
>
>
> -------------------------------------------------------------------->8--
>
> // Simple example demonstrating the treatment of a case class with a
> // byte array within another case class.
>
> case class IPAddress(bytes: Array[Byte]) {
>  override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))"
> }
>
> val a = IPAddress(Array(1,2,3,4))
> val b = IPAddress(Array(5,6,7,8))
> val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
> val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32))
>
> val x = Array(a, b, c, d)
> val xs = sc.parallelize(x).toDS
>
> /*
> scala> xs.show
> +--------------------+
> |               bytes|
> +--------------------+
> |       [01 02 03 04]|
> |       [05 06 07 08]|
> |[01 02 03 04 05 0...|
> |[11 12 13 14 15 1...|
> +--------------------+
> */
>
> case class Rec(a: IPAddress, b: IPAddress) {
>  override def toString: String = s"Rec($a, $b)"
> }
>
> val e = Rec(a, b)
> val f = Rec(c, d)
> val y = Array(e, f)
> val ys = sc.parallelize(y).toDS
>
> /*
> scala> ys.show
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
> */
>
> -------------------------------------------------------------------->8--
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

Patrick GRANDJEAN
In reply to this post by Katherine Prevost
Hi Kathrine,

I am also interested in UDTs in order to support serialization of some legacy third-party types. I have been monitoring the following JIRA issue:



Patrick.


De : Katherine Prevost <[hidden email]>
À : Jörn Franke <[hidden email]>; Katherine Prevost <[hidden email]>
Cc : [hidden email]
Envoyé le : Mercredi 16 août 2017 11h55
Objet : Re: Questions about the future of UDTs and Encoders

I'd say the quick summary of the problem is this:

The encoder mechanism does not deal well with fields of case classes (you must use builtin types (including other case classes) for case class fields), and UDTs are not currently available (and never integrated well with built-in operations).

Encoders work great for individual fields if you're using tuples, but once you get up over four or five fields this becomes incomprehensible. And, of course, encoders do nothing for you once you are in the realm of dataframes (including operations on fields, results of dataframe-based methods, and working in languages other than Scala.)

The sort of machinations I describe below are unpleasant but not a huge deal for people who are trained as developers... but they're a much bigger mess when we have to provide these interfaces to our data scientists. Yes, they can do it, but the "every address is a string and you have to use these functions that parse the strings over and over again" approach is easier to use (if massively inefficient).

I would like to improve Spark so that we can provide these types that our data scientists need to use *all the time* in a way that's both efficient and easy to use.

Hence, my interest in doing work on the UDT and/or Encoder mechanisms of Spark (or equivalent, if something new is in the works), and my interest in hearing from anybody who is already working in this area, or hearing about any future plans that have already been made in this area.


In more detail:

On Wed, Aug 16, 2017 at 2:49 AM Jörn Franke <[hidden email]> wrote:
Not sure I got to fully understand the issue (source code is always helpful ;-) but why don't you override the toString method of IPAddress. So, IP address could still be byte , but when it is displayed then toString converts the byteaddress into something human-readable?
 
There are a couple of reasons it's not that simple. (If you look at the sample snippets of code I did include, you'll see that I did define toString methods.)

The first problem is basically because toString doesn't happen when working with DataFrames, which are often the result of common Spark operations in Scala (though staying in the realm of Datasets is getting easier, and apparently also becoming more efficient). Outside of Scala, it's DataFrames all the way down.

(If you look at my example code, you'll also see what happens when you have a DataFrame with a field that is a struct with a byte array in it, and nobody ever wants to see "[B@617f4814".)

You can get around that (as long as you're still in a Dataset) with something like this (this is using the IPAddress.toString method to produce "IPAddress(Array(1,2,3,4))"):

scala> ys.take(20)
res10: Array[Rec] = Array(Rec(IPAddress(Array(1, 2, 3, 4)), IPAddress(Array(5, 6, 7, 8))), Rec(IPAddress(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)), IPAddress(Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32))))

But then of course you lose any easy ability to view Rec fields in columns. (And while you could make something that prints Rec as columns, what happens once you transform your record and turn it into a tuple?)

The second one is that operating on the fields cleanly is still rather painful, even if the values were to be displayed cleanly. This is what you have to do to search for rows that have a specific IPAddress value (ys("a") is a column of IPAddress, a is an IPAddress):

scala> ys.select(ys("a.bytes") === a.bytes)
res9: org.apache.spark.sql.DataFrame = [(a.bytes AS `bytes` = X'01020304'): boolean]

It's worth noting that an implicit conversion from IPAddress to Array[Byte] or to Column wouldn't work here, because === accepts Any.


katherine.

> On 15. Aug 2017, at 18:49, Katherine Prevost <[hidden email]> wrote:
>
> Hi, all!
>
>
> I'm a developer who works to support data scientists at CERT. We've
> been having some great success working with Spark for data analysis,
> and I have some questions about how we could contribute to work on
> Spark in support of our goals.
>
> Specifically, we have some interest in user-defined types, or their
> equivalents.
>
>
> When Spark 2 arrived, user-defined types (UDTs) were made private and
> seem to have fallen by the wayside in favor of using encoders for
> Datasets. I have some questions about the future of these mechanisms,
> and was wondering if there's been a plan published anywhere for the
> future of these mechanisms, or anyone I could talk to about where
> things are going with them.
>
> I've roughly outlined our experience with these two mechanisms below,
> and our hopes for what might be accomplished in the future.
>
> We'd love to spend some effort on development here, but haven't been
> able to figure out if anyone is already working on improvements in
> this area, or if there's some plan in place for where things are going
> to go.
>
> So, I'd love to get in touch with anyone who might know more.
>
>
> Background:
>
> Much of the work in my group is analysis of Internet protocol data,
> and I think that IP addresses are a great example how a custom atomic
> type can be helpful.
>
> IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
> addresses) have a natural binary form (a sequence of bytes). Using
> this format makes the default implementation of certain basic
> operations sensible (equality and comparison, for example). Defining
> UDFs for more complicated operations is not terribly difficultt. But
> this format is not human-friendly to view.
>
> The human-readable presentations of IP addresses, on the other hand,
> are large and unwieldy to work with computationally. There is a
> canonical textual form for both IPv4 and IPv6 addresses, but
> converting back and forth between that form and the binary form is
> expensive, and the text form is generally at least twice as large as
> the binary form. The text form is suitable for presenting to human
> beings, but that's about it.
>
> There are also a variety of other types of Internet data that are best
> represented by byte arrays and the like, meaning that simply saying
> "just use a byte array for this column!" can be unfortunate for both
> type-safety and comprehensibility of a colleciton of data.
>
>
> When we were working on top of Spark 1, we had begun to look using
> UDTs to represent IP addresses. There were some issues with working
> with UDTs and working with the built-in operations like comparisons,
> but we had some hope for improvements with future Spark releases.
>
> With Spark 2.0, the UDT API was made private, and the encoder
> mechanism was suggested for use instead. For a bit, we experimented
> with using the API anyway by putting stubs into Spark's namespace, but
> there weren't really a lot of good places to hook various operations
> like equality that one would expect to work on an atomic type.
>
>
> We also tried using the encoder APIs, and ran into a few problems
> there as well. Encoders are well suited to handling "top-level"
> values, but the most convenient way to work with encoded data is by
> having your top level be a case class defining types and names for a
> record type. And here, there's a problem, because encoders from the
> implicit environment are not available when encoding the fields of a
> case class. So, if we defined a custom encoder for our IPAddress type,
> and then included an IPAddress as a field of a record, this would
> result in an error.
>
> One approach we tried to get around that was to make IP addresses
> themselves into case classes as well, so that only the default
> encoders would be required. This eliminated the error, but made
> working with the values a nightmare. If we made a Dataset[IPAddress],
> the byte array would be presented in a reasonable manner, but a
> Dataset[Rec] where Rec had IPAddress fields was another story,
> resulting in the default toString of Java arrays being used:
>
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
>
> (See code snippet at the end of this message for details.)
>
> Now basically all interactions would have to go through UDFs,
> including remembering to format the IPAddress field if you wanted any
> useful information out of it at all.
>
>
> As a result, since our initial experiments with 2.0 we dropped back
> and punted to using text for all IP addresses. But, we'd still like to
> do better. What we optimally want is some mechanism for a user-defined
> atomic type (whether via encoders or via registering a new type) which
> allows for:
>
> * An appropriately efficient underlying form to be used. (A struct
>   with a byte array field would be fine. A byte array field would be
>   fine.)
>
> * A display form that is meaningful to the user (the expected form
>   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)
>
> * At least some support for standard SQL operators like equality and
>   comparison, and the ability to define UDFs that work with the type.
>
> Longer term, it would be lovely to:
>
> * Be able to work with values of the type in an appropriate way in
>   different source languags (i.e. make it not hard to work with the
>   values in Python or R, although the restrictions of those languages
>   will require additional implementation work.)
>
> * Be able to provide new Catalyst optimizations specific to the type
>   and functions defined on the type.
>
> We'd love to provide some effort at achieving these goals, but aren't
> sure where to start. We'd like to avoid stepping in the way of any
> efforts that might already be underway to improve these mechanisms.
>
>
> Thanks very much!
>
> Katherine Prevost
> Carnegie Mellon / Software Engineering Institute / CERT
>
>
> -------------------------------------------------------------------->8--
>
> // Simple example demonstrating the treatment of a case class with a
> // byte array within another case class.
>
> case class IPAddress(bytes: Array[Byte]) {
>  override def toString: String = s"IPAddress(Array(${bytes.mkString(", ")}))"
> }
>
> val a = IPAddress(Array(1,2,3,4))
> val b = IPAddress(Array(5,6,7,8))
> val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
> val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32))
>
> val x = Array(a, b, c, d)
> val xs = sc.parallelize(x).toDS
>
> /*
> scala> xs.show
> +--------------------+
> |               bytes|
> +--------------------+
> |       [01 02 03 04]|
> |       [05 06 07 08]|
> |[01 02 03 04 05 0...|
> |[11 12 13 14 15 1...|
> +--------------------+
> */
>
> case class Rec(a: IPAddress, b: IPAddress) {
>  override def toString: String = s"Rec($a, $b)"
> }
>
> val e = Rec(a, b)
> val f = Rec(c, d)
> val y = Array(e, f)
> val ys = sc.parallelize(y).toDS
>
> /*
> scala> ys.show
> +-------------+-------------+
> |            a|            b|
> +-------------+-------------+
> |[[B@47260109]|[[B@3538740a]|
> |[[B@617f4814]|[[B@77e69bee]|
> +-------------+-------------+
> */
>
> -------------------------------------------------------------------->8--
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

mlopez
In reply to this post by Katherine Prevost
Hello everyone!

I'm a developer at a security ratings company. We've been moving to Spark
for our data analytics and nearly every dataset we have contains IP
addresses or variable-length subnets. Katherine's descriptions of use cases
and attempts to emulate networking types overlap with ours. I would add that
we also need to write complex queries over subnets in addition to IP
addresses.

Has there been any update on this topic?
https://github.com/apache/spark/pull/16478 was last updated in February of
this year.

I would also like to know if it would be better to work toward IP networking
types. Supposing Spark had UDT support, would it be just as good as built-in
support for networking types? Where would they fall short? Would it be
possible to pass custom rules catalyst for optimizing expressions with
networking types?

We have to write complex joins over predicates like subnet containment and
have to resort to difficult to read tricks to ensure that Spark doesn't
resort to an inefficient join strategy. For example, it would be great to
simply write `df1.join(df2, contains($"src_net", $"dst_net")` to join
records from one dataset that have subnets that are contained in another.



-----
Michael Lopez
Cheerful Engineer!
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Michael Lopez
Cheerful Engineer!
Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

Patrick GRANDJEAN
Hi Michael,

Having faced the same limitation, I have found these two libraries to be helpful:


Both use Shapeless to derive Datasets.

I hope it helps.

Patrick.


On Nov 14, 2017, at 20:38, mlopez <[hidden email]> wrote:

Hello everyone!

I'm a developer at a security ratings company. We've been moving to Spark
for our data analytics and nearly every dataset we have contains IP
addresses or variable-length subnets. Katherine's descriptions of use cases
and attempts to emulate networking types overlap with ours. I would add that
we also need to write complex queries over subnets in addition to IP
addresses.

Has there been any update on this topic?
https://github.com/apache/spark/pull/16478 was last updated in February of
this year.

I would also like to know if it would be better to work toward IP networking
types. Supposing Spark had UDT support, would it be just as good as built-in
support for networking types? Where would they fall short? Would it be
possible to pass custom rules catalyst for optimizing expressions with
networking types?

We have to write complex joins over predicates like subnet containment and
have to resort to difficult to read tricks to ensure that Spark doesn't
resort to an inefficient join strategy. For example, it would be great to
simply write `df1.join(df2, contains($"src_net", $"dst_net")` to join
records from one dataset that have subnets that are contained in another.



-----
Michael Lopez
Cheerful Engineer!
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Questions about the future of UDTs and Encoders

mlopez
Thank you for your response, Grandjean.

Frameless looks great, but it is not quite what I need. From what I can tell, Frameless provides a layer of type-safety on top of Spark facilities, like column expressions and encoders. There are also some great quality enhancments in Frameless, like Injections. creating custom encoders. I need support for network types and their fundamental operators just like in Postgres (https://www.postgresql.org/docs/current/static/functions-net.html) and Cassandra (http://cassandra.apache.org/doc/latest/cql/types.html).

Specifically, I'm looking for the following.

- Column expressions for manpulating network values like IP addresses and variable-length subnets.
- Tungsten support for optimal data representations of network types. While this is easy to emulate for IPv4 addresses (32-bit integers), it is messy to emulate variable-length IPv6 subnets.
- Support for custom catalyst optimization rules for predicates like subnet containment.

Can UDTs evan support the following? Or would we need to add network types to the list of built-ins to achieve the above features?

On Sat, Nov 18, 2017 at 8:51 PM Grandjean Patrick <[hidden email]> wrote:
Hi Michael,

Having faced the same limitation, I have found these two libraries to be helpful:


Both use Shapeless to derive Datasets.

I hope it helps.

Patrick.


On Nov 14, 2017, at 20:38, mlopez <[hidden email]> wrote:

Hello everyone!

I'm a developer at a security ratings company. We've been moving to Spark
for our data analytics and nearly every dataset we have contains IP
addresses or variable-length subnets. Katherine's descriptions of use cases
and attempts to emulate networking types overlap with ours. I would add that
we also need to write complex queries over subnets in addition to IP
addresses.

Has there been any update on this topic?
https://github.com/apache/spark/pull/16478 was last updated in February of
this year.

I would also like to know if it would be better to work toward IP networking
types. Supposing Spark had UDT support, would it be just as good as built-in
support for networking types? Where would they fall short? Would it be
possible to pass custom rules catalyst for optimizing expressions with
networking types?

We have to write complex joins over predicates like subnet containment and
have to resort to difficult to read tricks to ensure that Spark doesn't
resort to an inefficient join strategy. For example, it would be great to
simply write `df1.join(df2, contains($"src_net", $"dst_net")` to join
records from one dataset that have subnets that are contained in another.



-----
Michael Lopez
Cheerful Engineer!
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe [hidden email]


Michael Lopez
Cheerful Engineer!