How to implement a "saveAsBinaryFile" function?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to implement a "saveAsBinaryFile" function?

Duan,Bing
Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:
"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

Bests!

Bing
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a "saveAsBinaryFile" function?

Driesprong, Fokko
Hi Bing,

Good question and the answer is; it depends on what your use-case is. 

If you really just want to write raw bytes, then you could create a .foreach where you open an OutputStream and write it to some file. But this is probably not what you want, and in practice not very handy since you want to keep the records.

My suggestion would be to write it as Parquet or Avro, and write it to a binary field. With Avro you have the bytes primitive which converts in Spark to Array[Byte]: https://avro.apache.org/docs/1.9.1/spec.html Similar to Parquet where you have the BYTE_ARRAY: https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0

In the words of Linus Torvalds; Talk is cheap, show me the code:

MacBook-Pro-van-Fokko:~ fokkodriesprong$ spark-shell
20/01/16 10:58:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://172.20.10.3:4040
Spark context available as 'sc' (master = local[*], app id = local-1579168731763).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data: Array[Array[Byte]] = Array(
     |   Array(0x19.toByte, 0x25.toByte)
     | )
data: Array[Array[Byte]] = Array(Array(25, 37))

scala> val rdd = sc.parallelize(data, 1);
rdd: org.apache.spark.rdd.RDD[Array[Byte]] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.toDF("byte")
res1: org.apache.spark.sql.DataFrame = [byte: binary]

scala> val df = rdd.toDF("byte")
df: org.apache.spark.sql.DataFrame = [byte: binary]

scala> df.write.parquet("/tmp/bytes/")



MacBook-Pro-van-Fokko:~ fokkodriesprong$ ls -lah /tmp/bytes/
total 24
drwxr-xr-x   6 fokkodriesprong  wheel   192B 16 jan 11:01 .
drwxrwxrwt  16 root             wheel   512B 16 jan 11:01 ..
-rw-r--r--   1 fokkodriesprong  wheel     8B 16 jan 11:01 ._SUCCESS.crc
-rw-r--r--   1 fokkodriesprong  wheel    12B 16 jan 11:01 .part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet.crc
-rw-r--r--   1 fokkodriesprong  wheel     0B 16 jan 11:01 _SUCCESS
-rw-r--r--   1 fokkodriesprong  wheel   384B 16 jan 11:01 part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet

MacBook-Pro-van-Fokko:~ fokkodriesprong$ parquet-tools schema /tmp/bytes/part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet
message spark_schema {
  optional binary byte;
}


Hope this helps.

Cheers, Fokko


Op do 16 jan. 2020 om 09:34 schreef Duan,Bing <[hidden email]>:
Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:
"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

Bests!

Bing
Reply | Threaded
Open this post in threaded view
|

Re: How to implement a "saveAsBinaryFile" function?

Long, Andrew-2
In reply to this post by Duan,Bing

Hey Bing,

 

There’s a couple different approaches you could take.  The quickest and easiest would be to use the existing APIs

 

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
 
//W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper class in the source
 
val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

 

The more complicated but pretty approach would be to either implement a custom datasource.

 

From: "Duan,Bing" <[hidden email]>
Date: Thursday, January 16, 2020 at 12:35 AM
To: "[hidden email]" <[hidden email]>
Subject: How to implement a "saveAsBinaryFile" function?

 

Hi all:

 

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:

"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

 

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

 

Bests!

 

Bing

Reply | Threaded
Open this post in threaded view
|

Re: How to implement a "saveAsBinaryFile" function?

Maxim Gekk
Hi Bing,

You can try Text datasource. It shouldn't modify strings:
scala> Seq(""""201900002_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt")
$ cat tmp/text.txt/part-00000-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt
"201900002_1",1,24,0,2,”S66.000x001”

Maxim Gekk

Software Engineer

Databricks B. V. 



On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew <[hidden email]> wrote:

Hey Bing,

 

There’s a couple different approaches you could take.  The quickest and easiest would be to use the existing APIs

 

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
 
//W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper class in the source
 
val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

 

The more complicated but pretty approach would be to either implement a custom datasource.

 

From: "Duan,Bing" <[hidden email]>
Date: Thursday, January 16, 2020 at 12:35 AM
To: "[hidden email]" <[hidden email]>
Subject: How to implement a "saveAsBinaryFile" function?

 

Hi all:

 

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:

"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

 

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

 

Bests!

 

Bing

Reply | Threaded
Open this post in threaded view
|

Re: How to implement a "saveAsBinaryFile" function?

Duan,Bing
Hi FokkoMaxim, Long: 

Thanks!

This reading has been occurred in a custom datasource as below:

override def createRelation(…) {
blocks.map(block => (block.bytes)).saveAsTextFile(parameters("path”))
...
}

I am a new Sparker,  will try the those methods you guys provides. 

Best!

Bing.

On Jan 17, 2020, at 4:28 AM, Maxim Gekk <[hidden email]> wrote:

Hi Bing,

You can try Text datasource. It shouldn't modify strings:
scala> Seq(""""201900002_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt")
$ cat tmp/text.txt/part-00000-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt
"201900002_1",1,24,0,2,”S66.000x001”

Maxim Gekk
Software Engineer
Databricks B. V. 


On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew <[hidden email]> wrote:

Hey Bing,

 

There’s a couple different approaches you could take.  The quickest and easiest would be to use the existing APIs

 

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
 
//W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper class in the source
 
val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

 

The more complicated but pretty approach would be to either implement a custom datasource.

 

From: "Duan,Bing" <[hidden email]>
Date: Thursday, January 16, 2020 at 12:35 AM
To: "[hidden email]" <[hidden email]>
Subject: How to implement a "saveAsBinaryFile" function?

 

Hi all:

 

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:

"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

 

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

 

Bests!

 

Bing


Reply | Threaded
Open this post in threaded view
|

Re: How to implement a "saveAsBinaryFile" function?

jelmer
In reply to this post by Duan,Bing

On Thu, 16 Jan 2020 at 09:34, Duan,Bing <[hidden email]> wrote:
Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this:
"\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should be "201900002_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]?

Bests!

Bing