Fwd: spark git commit: [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Fwd: spark git commit: [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

Jacek Laskowski
Hi,

Looks like the change has broken the build for me:

[INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
spark-sql_2.11 ---
/Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
error: not found: type Array
  public void loadBytes(Array array) {
                        ^

...

222 warnings found
one error found
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [  4.864 s]
[INFO] Spark Project Tags ................................. SUCCESS [  5.689 s]
[INFO] Spark Project Sketch ............................... SUCCESS [  4.646 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  6.074 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 10.305 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  7.355 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  7.639 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 10.364 s]
[INFO] Spark Project Core ................................. SUCCESS [02:01 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [  9.711 s]
[INFO] Spark Project GraphX ............................... SUCCESS [ 16.652 s]
[INFO] Spark Project Streaming ............................ SUCCESS [ 36.845 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [01:41 min]
[INFO] Spark Project SQL .................................. FAILURE [02:14 min]

Is this only me or others suffer from it too?

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski



---------- Forwarded message ----------
From:  <[hidden email]>
Date: Thu, Jul 20, 2017 at 3:00 PM
Subject: spark git commit: [SPARK-21472][SQL] Introduce
ArrowColumnVector as a reader for Arrow vectors.
To: [hidden email]


Repository: spark
Updated Branches:
  refs/heads/master 5d1850d4b -> cb19880cd


[SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

## What changes were proposed in this pull request?

Introducing `ArrowColumnVector` as a reader for Arrow vectors.
It extends `ColumnVector`, so we will be able to use it with
`ColumnarBatch` and its functionalities.
Currently it supports primitive types and `StringType`, `ArrayType`
and `StructType`.

## How was this patch tested?

Added tests for `ArrowColumnVector` and existing tests.

Author: Takuya UESHIN <[hidden email]>

Closes #18680 from ueshin/issues/SPARK-21472.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c

Branch: refs/heads/master
Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
Parents: 5d1850d
Author: Takuya UESHIN <[hidden email]>
Authored: Thu Jul 20 21:00:30 2017 +0800
Committer: Wenchen Fan <[hidden email]>
Committed: Thu Jul 20 21:00:30 2017 +0800

----------------------------------------------------------------------
 .../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++
 .../sql/execution/vectorized/ColumnVector.java  |  16 +-
 .../vectorized/ReadOnlyColumnVector.java        | 251 ++++++++
 .../sql/execution/arrow/ArrowConverters.scala   |  32 +-
 .../spark/sql/execution/arrow/ArrowUtils.scala  | 109 ++++
 .../execution/arrow/ArrowConvertersSuite.scala  |   2 +-
 .../sql/execution/arrow/ArrowUtilsSuite.scala   |  65 ++
 .../vectorized/ArrowColumnVectorSuite.scala     | 410 +++++++++++++
 8 files changed, 1436 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
new file mode 100644
index 0000000..68e0abc
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column vector backed by Apache Arrow.
+ */
+public final class ArrowColumnVector extends ReadOnlyColumnVector {
+
+  private final ArrowVectorAccessor accessor;
+  private final int valueCount;
+
+  private void ensureAccessible(int index) {
+    if (index < 0 || index >= valueCount) {
+      throw new IndexOutOfBoundsException(
+        String.format("index: %d, valueCount: %d", index, valueCount));
+    }
+  }
+
+  private void ensureAccessible(int index, int count) {
+    if (index < 0 || index + count > valueCount) {
+      throw new IndexOutOfBoundsException(
+        String.format("index range: [%d, %d), valueCount: %d", index,
index + count, valueCount));
+    }
+  }
+
+  @Override
+  public long nullsNativeAddress() {
+    throw new RuntimeException("Cannot get native address for arrow column");
+  }
+
+  @Override
+  public long valuesNativeAddress() {
+    throw new RuntimeException("Cannot get native address for arrow column");
+  }
+
+  @Override
+  public void close() {
+    if (childColumns != null) {
+      for (int i = 0; i < childColumns.length; i++) {
+        childColumns[i].close();
+      }
+    }
+    accessor.close();
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.isNullAt(rowId);
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getBoolean(rowId);
+  }
+
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    boolean[] array = new boolean[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getBoolean(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public byte getByte(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getByte(rowId);
+  }
+
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    byte[] array = new byte[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getByte(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Shorts
+  //
+
+  @Override
+  public short getShort(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getShort(rowId);
+  }
+
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    short[] array = new short[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getShort(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Ints
+  //
+
+  @Override
+  public int getInt(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getInt(rowId);
+  }
+
+  @Override
+  public int[] getInts(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    int[] array = new int[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getInt(rowId + i);
+    }
+    return array;
+  }
+
+  @Override
+  public int getDictId(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public long getLong(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getLong(rowId);
+  }
+
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    long[] array = new long[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getLong(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with floats
+  //
+
+  @Override
+  public float getFloat(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getFloat(rowId);
+  }
+
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    float[] array = new float[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getFloat(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public double getDouble(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getDouble(rowId);
+  }
+
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    double[] array = new double[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getDouble(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Arrays
+  //
+
+  @Override
+  public int getArrayLength(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getArrayLength(rowId);
+  }
+
+  @Override
+  public int getArrayOffset(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getArrayOffset(rowId);
+  }
+
+  @Override
+  public void loadBytes(Array array) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Decimals
+  //
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    ensureAccessible(rowId);
+    return accessor.getDecimal(rowId, precision, scale);
+  }
+
+  //
+  // APIs dealing with UTF8Strings
+  //
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getUTF8String(rowId);
+  }
+
+  //
+  // APIs dealing with Binaries
+  //
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getBinary(rowId);
+  }
+
+  public ArrowColumnVector(ValueVector vector) {
+    super(vector.getValueCapacity(),
ArrowUtils.fromArrowField(vector.getField()),
+      MemoryMode.OFF_HEAP);
+
+    if (vector instanceof NullableBitVector) {
+      accessor = new BooleanAccessor((NullableBitVector) vector);
+    } else if (vector instanceof NullableTinyIntVector) {
+      accessor = new ByteAccessor((NullableTinyIntVector) vector);
+    } else if (vector instanceof NullableSmallIntVector) {
+      accessor = new ShortAccessor((NullableSmallIntVector) vector);
+    } else if (vector instanceof NullableIntVector) {
+      accessor = new IntAccessor((NullableIntVector) vector);
+    } else if (vector instanceof NullableBigIntVector) {
+      accessor = new LongAccessor((NullableBigIntVector) vector);
+    } else if (vector instanceof NullableFloat4Vector) {
+      accessor = new FloatAccessor((NullableFloat4Vector) vector);
+    } else if (vector instanceof NullableFloat8Vector) {
+      accessor = new DoubleAccessor((NullableFloat8Vector) vector);
+    } else if (vector instanceof NullableDecimalVector) {
+      accessor = new DecimalAccessor((NullableDecimalVector) vector);
+    } else if (vector instanceof NullableVarCharVector) {
+      accessor = new StringAccessor((NullableVarCharVector) vector);
+    } else if (vector instanceof NullableVarBinaryVector) {
+      accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
+    } else if (vector instanceof ListVector) {
+      ListVector listVector = (ListVector) vector;
+      accessor = new ArrayAccessor(listVector);
+
+      childColumns = new ColumnVector[1];
+      childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
+      resultArray = new Array(childColumns[0]);
+    } else if (vector instanceof MapVector) {
+      MapVector mapVector = (MapVector) vector;
+      accessor = new StructAccessor(mapVector);
+
+      childColumns = new ArrowColumnVector[mapVector.size()];
+      for (int i = 0; i < childColumns.length; ++i) {
+        childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
+      }
+      resultStruct = new ColumnarBatch.Row(childColumns);
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    valueCount = accessor.getValueCount();
+    numNulls = accessor.getNullCount();
+    anyNullsSet = numNulls > 0;
+  }
+
+  private static abstract class ArrowVectorAccessor {
+
+    private final ValueVector vector;
+    private final ValueVector.Accessor nulls;
+
+    private final int valueCount;
+    private final int nullCount;
+
+    ArrowVectorAccessor(ValueVector vector) {
+      this.vector = vector;
+      this.nulls = vector.getAccessor();
+      this.valueCount = nulls.getValueCount();
+      this.nullCount = nulls.getNullCount();
+    }
+
+    final boolean isNullAt(int rowId) {
+      return nulls.isNull(rowId);
+    }
+
+    final int getValueCount() {
+      return valueCount;
+    }
+
+    final int getNullCount() {
+      return nullCount;
+    }
+
+    final void close() {
+      vector.close();
+    }
+
+    boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getArrayLength(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getArrayOffset(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class BooleanAccessor extends ArrowVectorAccessor {
+
+    private final NullableBitVector.Accessor accessor;
+
+    BooleanAccessor(NullableBitVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final boolean getBoolean(int rowId) {
+      return accessor.get(rowId) == 1;
+    }
+  }
+
+  private static class ByteAccessor extends ArrowVectorAccessor {
+
+    private final NullableTinyIntVector.Accessor accessor;
+
+    ByteAccessor(NullableTinyIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final byte getByte(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class ShortAccessor extends ArrowVectorAccessor {
+
+    private final NullableSmallIntVector.Accessor accessor;
+
+    ShortAccessor(NullableSmallIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final short getShort(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class IntAccessor extends ArrowVectorAccessor {
+
+    private final NullableIntVector.Accessor accessor;
+
+    IntAccessor(NullableIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class LongAccessor extends ArrowVectorAccessor {
+
+    private final NullableBigIntVector.Accessor accessor;
+
+    LongAccessor(NullableBigIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class FloatAccessor extends ArrowVectorAccessor {
+
+    private final NullableFloat4Vector.Accessor accessor;
+
+    FloatAccessor(NullableFloat4Vector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class DoubleAccessor extends ArrowVectorAccessor {
+
+    private final NullableFloat8Vector.Accessor accessor;
+
+    DoubleAccessor(NullableFloat8Vector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class DecimalAccessor extends ArrowVectorAccessor {
+
+    private final NullableDecimalVector.Accessor accessor;
+
+    DecimalAccessor(NullableDecimalVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      if (isNullAt(rowId)) return null;
+      return Decimal.apply(accessor.getObject(rowId), precision, scale);
+    }
+  }
+
+  private static class StringAccessor extends ArrowVectorAccessor {
+
+    private final NullableVarCharVector.Accessor accessor;
+    private final NullableVarCharHolder stringResult = new
NullableVarCharHolder();
+
+    StringAccessor(NullableVarCharVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      accessor.get(rowId, stringResult);
+      if (stringResult.isSet == 0) {
+        return null;
+      } else {
+        return UTF8String.fromAddress(null,
+          stringResult.buffer.memoryAddress() + stringResult.start,
+          stringResult.end - stringResult.start);
+      }
+    }
+  }
+
+  private static class BinaryAccessor extends ArrowVectorAccessor {
+
+    private final NullableVarBinaryVector.Accessor accessor;
+
+    BinaryAccessor(NullableVarBinaryVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      return accessor.getObject(rowId);
+    }
+  }
+
+  private static class ArrayAccessor extends ArrowVectorAccessor {
+
+    private final UInt4Vector.Accessor accessor;
+
+    ArrayAccessor(ListVector vector) {
+      super(vector);
+      this.accessor = vector.getOffsetVector().getAccessor();
+    }
+
+    @Override
+    final int getArrayLength(int rowId) {
+      return accessor.get(rowId + 1) - accessor.get(rowId);
+    }
+
+    @Override
+    final int getArrayOffset(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class StructAccessor extends ArrowVectorAccessor {
+
+    StructAccessor(MapVector vector) {
+      super(vector);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 0c027f8..7796638 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -646,7 +646,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the decimal for rowId.
    */
-  public final Decimal getDecimal(int rowId, int precision, int scale) {
+  public Decimal getDecimal(int rowId, int precision, int scale) {
     if (precision <= Decimal.MAX_INT_DIGITS()) {
       return Decimal.createUnsafe(getInt(rowId), precision, scale);
     } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -661,7 +661,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   }


-  public final void putDecimal(int rowId, Decimal value, int precision) {
+  public void putDecimal(int rowId, Decimal value, int precision) {
     if (precision <= Decimal.MAX_INT_DIGITS()) {
       putInt(rowId, (int) value.toUnscaledLong());
     } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -675,7 +675,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the UTF8String for rowId.
    */
-  public final UTF8String getUTF8String(int rowId) {
+  public UTF8String getUTF8String(int rowId) {
     if (dictionary == null) {
       ColumnVector.Array a = getByteArray(rowId);
       return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
@@ -688,7 +688,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the byte array for rowId.
    */
-  public final byte[] getBinary(int rowId) {
+  public byte[] getBinary(int rowId) {
     if (dictionary == null) {
       ColumnVector.Array array = getByteArray(rowId);
       byte[] bytes = new byte[array.length];
@@ -956,7 +956,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Data type for this column.
    */
-  protected final DataType type;
+  protected DataType type;

   /**
    * Number of nulls in this column. This is an optimization for the
reader, to skip NULL checks.
@@ -988,17 +988,17 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * If this is a nested type (array or struct), the column for the child data.
    */
-  protected final ColumnVector[] childColumns;
+  protected ColumnVector[] childColumns;

   /**
    * Reusable Array holder for getArray().
    */
-  protected final Array resultArray;
+  protected Array resultArray;

   /**
    * Reusable Struct holder for getStruct().
    */
-  protected final ColumnarBatch.Row resultStruct;
+  protected ColumnarBatch.Row resultStruct;

   /**
    * The Dictionary for this column.

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
new file mode 100644
index 0000000..e9f6e7c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
+
+/**
+ * An abstract class for read-only column vector.
+ */
+public abstract class ReadOnlyColumnVector extends ColumnVector {
+
+  protected ReadOnlyColumnVector(int capacity, DataType type,
MemoryMode memMode) {
+    super(capacity, DataTypes.NullType, memMode);
+    this.type = type;
+    isConstant = true;
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public final void putNotNull(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNull(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNulls(int rowId, int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNotNulls(int rowId, int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public final void putBoolean(int rowId, boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBooleans(int rowId, int count, boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public final void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Shorts
+  //
+
+  @Override
+  public final void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putShorts(int rowId, int count, short value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putShorts(int rowId, int count, short[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Ints
+  //
+
+  @Override
+  public final void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putIntsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public final void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with floats
+  //
+
+  @Override
+  public final void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, float[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public final void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double[] src,
int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, byte[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Arrays
+  //
+
+  @Override
+  public final void putArray(int rowId, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Byte Arrays
+  //
+
+  @Override
+  public final int putByteArray(int rowId, byte[] value, int offset,
int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Decimals
+  //
+
+  @Override
+  public final void putDecimal(int rowId, Decimal value, int precision) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // Other APIs
+  //
+
+  @Override
+  public final void setDictionary(Dictionary dictionary) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ColumnVector reserveDictionaryIds(int capacity) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected final void reserveInternal(int newCapacity) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 6af5c73..c913efe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
 private[sql] object ArrowConverters {

   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
-    dataType match {
-      case BooleanType => ArrowType.Bool.INSTANCE
-      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
-      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-      case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-      case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-      case ByteType => new ArrowType.Int(8, true)
-      case StringType => ArrowType.Utf8.INSTANCE
-      case BinaryType => ArrowType.Binary.INSTANCE
-      case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dataType")
-    }
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-    val arrowFields = schema.fields.map { f =>
-      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
List.empty[Field].asJava)
-    }
-    new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
    * Maps Iterator from InternalRow to ArrowPayload. Limit
ArrowRecordBatch size in ArrowPayload
    * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
    */
@@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
       batch: ArrowRecordBatch,
       schema: StructType,
       allocator: BufferAllocator): Array[Byte] = {
-    val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
+    val arrowSchema = ArrowUtils.toArrowSchema(schema)
     val root = VectorSchemaRoot.create(arrowSchema, allocator)
     val out = new ByteArrayOutputStream()
     val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
@@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
    * Create an Arrow ColumnWriter given the type and ordinal of row.
    */
   def apply(dataType: DataType, ordinal: Int, allocator:
BufferAllocator): ColumnWriter = {
-    val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
+    val dtype = ArrowUtils.toArrowType(dataType)
     dataType match {
       case BooleanType => new BooleanColumnWriter(dtype, ordinal, allocator)
       case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
new file mode 100644
index 0000000..2caf1ef
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.memory.RootAllocator
+import org.apache.arrow.vector.types.FloatingPointPrecision
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
+
+import org.apache.spark.sql.types._
+
+object ArrowUtils {
+
+  val rootAllocator = new RootAllocator(Long.MaxValue)
+
+  // todo: support more types.
+
+  def toArrowType(dt: DataType): ArrowType = dt match {
+    case BooleanType => ArrowType.Bool.INSTANCE
+    case ByteType => new ArrowType.Int(8, true)
+    case ShortType => new ArrowType.Int(8 * 2, true)
+    case IntegerType => new ArrowType.Int(8 * 4, true)
+    case LongType => new ArrowType.Int(8 * 8, true)
+    case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+    case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+    case StringType => ArrowType.Utf8.INSTANCE
+    case BinaryType => ArrowType.Binary.INSTANCE
+    case DecimalType.Fixed(precision, scale) => new
ArrowType.Decimal(precision, scale)
+    case _ => throw new UnsupportedOperationException(s"Unsupported
data type: ${dt.simpleString}")
+  }
+
+  def fromArrowType(dt: ArrowType): DataType = dt match {
+    case ArrowType.Bool.INSTANCE => BooleanType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 => ByteType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 2 => ShortType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 4 => IntegerType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 8 => LongType
+    case float: ArrowType.FloatingPoint
+      if float.getPrecision() == FloatingPointPrecision.SINGLE => FloatType
+    case float: ArrowType.FloatingPoint
+      if float.getPrecision() == FloatingPointPrecision.DOUBLE => DoubleType
+    case ArrowType.Utf8.INSTANCE => StringType
+    case ArrowType.Binary.INSTANCE => BinaryType
+    case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
+    case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dt")
+  }
+
+  def toArrowField(name: String, dt: DataType, nullable: Boolean): Field = {
+    dt match {
+      case ArrayType(elementType, containsNull) =>
+        val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
+        new Field(name, fieldType, Seq(toArrowField("element",
elementType, containsNull)).asJava)
+      case StructType(fields) =>
+        val fieldType = new FieldType(nullable,
ArrowType.Struct.INSTANCE, null)
+        new Field(name, fieldType,
+          fields.map { field =>
+            toArrowField(field.name, field.dataType, field.nullable)
+          }.toSeq.asJava)
+      case dataType =>
+        val fieldType = new FieldType(nullable, toArrowType(dataType), null)
+        new Field(name, fieldType, Seq.empty[Field].asJava)
+    }
+  }
+
+  def fromArrowField(field: Field): DataType = {
+    field.getType match {
+      case ArrowType.List.INSTANCE =>
+        val elementField = field.getChildren().get(0)
+        val elementType = fromArrowField(elementField)
+        ArrayType(elementType, containsNull = elementField.isNullable)
+      case ArrowType.Struct.INSTANCE =>
+        val fields = field.getChildren().asScala.map { child =>
+          val dt = fromArrowField(child)
+          StructField(child.getName, dt, child.isNullable)
+        }
+        StructType(fields)
+      case arrowType => fromArrowType(arrowType)
+    }
+  }
+
+  def toArrowSchema(schema: StructType): Schema = {
+    new Schema(schema.map { field =>
+      toArrowField(field.name, field.dataType, field.nullable)
+    }.asJava)
+  }
+
+  def fromArrowSchema(schema: Schema): StructType = {
+    StructType(schema.getFields.asScala.map { field =>
+      val dt = fromArrowField(field)
+      StructField(field.getName, dt, field.isNullable)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 159328c..55b4655 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
SharedSQLContext with BeforeAndAfterAll {
     val allocator = new RootAllocator(Long.MaxValue)
     val jsonReader = new JsonFileReader(jsonFile, allocator)

-    val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
+    val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
     val jsonSchema = jsonReader.start()
     Validator.compareSchemas(arrowSchema, jsonSchema)


http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
new file mode 100644
index 0000000..638619f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types._
+
+class ArrowUtilsSuite extends SparkFunSuite {
+
+  def roundtrip(dt: DataType): Unit = {
+    dt match {
+      case schema: StructType =>
+        assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
=== schema)
+      case _ =>
+        roundtrip(new StructType().add("value", dt))
+    }
+  }
+
+  test("simple") {
+    roundtrip(BooleanType)
+    roundtrip(ByteType)
+    roundtrip(ShortType)
+    roundtrip(IntegerType)
+    roundtrip(LongType)
+    roundtrip(FloatType)
+    roundtrip(DoubleType)
+    roundtrip(StringType)
+    roundtrip(BinaryType)
+    roundtrip(DecimalType.SYSTEM_DEFAULT)
+  }
+
+  test("array") {
+    roundtrip(ArrayType(IntegerType, containsNull = true))
+    roundtrip(ArrayType(IntegerType, containsNull = false))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = true))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = true))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = false))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = false))
+  }
+
+  test("struct") {
+    roundtrip(new StructType())
+    roundtrip(new StructType().add("i", IntegerType))
+    roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
+    roundtrip(new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType)))
+    roundtrip(new StructType().add(
+      "struct",
+      new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
new file mode 100644
index 0000000..d24a9e1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.arrow.ArrowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ArrowColumnVectorSuite extends SparkFunSuite {
+
+  test("boolean") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("boolean", BooleanType,
nullable = true)
+      .createVector(allocator).asInstanceOf[NullableBitVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === BooleanType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getBoolean(i) === (i % 2 == 0))
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
(i % 2 == 0)))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("byte") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableTinyIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toByte)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ByteType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getByte(i) === i.toByte)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getBytes(0, 10) === (0 until 10).map(i => i.toByte))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("short") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("short", ShortType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableSmallIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toShort)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ShortType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getShort(i) === i.toShort)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getShorts(0, 10) === (0 until 10).map(i => i.toShort))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("int") {
+    val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === IntegerType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getInt(i) === i)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getInts(0, 10) === (0 until 10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("long") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("long", LongType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableBigIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toLong)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === LongType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getLong(i) === i.toLong)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getLongs(0, 10) === (0 until 10).map(i => i.toLong))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("float") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("float", FloatType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableFloat4Vector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toFloat)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === FloatType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getFloat(i) === i.toFloat)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getFloats(0, 10) === (0 until 10).map(i => i.toFloat))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("double") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableFloat8Vector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toDouble)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === DoubleType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getDouble(i) === i.toDouble)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
i.toDouble))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("string") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("string", StringType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableVarCharVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      mutator.setSafe(i, utf8, 0, utf8.length)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === StringType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getUTF8String(i) === UTF8String.fromString(s"str$i"))
+    }
+    assert(columnVector.isNullAt(10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("binary") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      mutator.setSafe(i, utf8, 0, utf8.length)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === BinaryType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
+    }
+    assert(columnVector.isNullAt(10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("array") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("array",
ArrayType(IntegerType), nullable = true)
+      .createVector(allocator).asInstanceOf[ListVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+    val elementVector = vector.getDataVector().asInstanceOf[NullableIntVector]
+    val elementMutator = elementVector.getMutator()
+
+    // [1, 2]
+    mutator.startNewValue(0)
+    elementMutator.setSafe(0, 1)
+    elementMutator.setSafe(1, 2)
+    mutator.endValue(0, 2)
+
+    // [3, null, 5]
+    mutator.startNewValue(1)
+    elementMutator.setSafe(2, 3)
+    elementMutator.setNull(3)
+    elementMutator.setSafe(4, 5)
+    mutator.endValue(1, 3)
+
+    // null
+
+    // []
+    mutator.startNewValue(3)
+    mutator.endValue(3, 0)
+
+    elementMutator.setValueCount(5)
+    mutator.setValueCount(4)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ArrayType(IntegerType))
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    val array0 = columnVector.getArray(0)
+    assert(array0.numElements() === 2)
+    assert(array0.getInt(0) === 1)
+    assert(array0.getInt(1) === 2)
+
+    val array1 = columnVector.getArray(1)
+    assert(array1.numElements() === 3)
+    assert(array1.getInt(0) === 3)
+    assert(array1.isNullAt(1))
+    assert(array1.getInt(2) === 5)
+
+    assert(columnVector.isNullAt(2))
+
+    val array3 = columnVector.getArray(3)
+    assert(array3.numElements() === 0)
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("struct") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
+    val schema = new StructType().add("int", IntegerType).add("long", LongType)
+    val vector = ArrowUtils.toArrowField("struct", schema, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableMapVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+    val intVector = vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
+    val intMutator = intVector.getMutator()
+    val longVector =
vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
+    val longMutator = longVector.getMutator()
+
+    // (1, 1L)
+    mutator.setIndexDefined(0)
+    intMutator.setSafe(0, 1)
+    longMutator.setSafe(0, 1L)
+
+    // (2, null)
+    mutator.setIndexDefined(1)
+    intMutator.setSafe(1, 2)
+    longMutator.setNull(1)
+
+    // (null, 3L)
+    mutator.setIndexDefined(2)
+    intMutator.setNull(2)
+    longMutator.setSafe(2, 3L)
+
+    // null
+    mutator.setNull(3)
+
+    // (5, 5L)
+    mutator.setIndexDefined(4)
+    intMutator.setSafe(4, 5)
+    longMutator.setSafe(4, 5L)
+
+    intMutator.setValueCount(5)
+    longMutator.setValueCount(5)
+    mutator.setValueCount(5)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === schema)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    val row0 = columnVector.getStruct(0, 2)
+    assert(row0.getInt(0) === 1)
+    assert(row0.getLong(1) === 1L)
+
+    val row1 = columnVector.getStruct(1, 2)
+    assert(row1.getInt(0) === 2)
+    assert(row1.isNullAt(1))
+
+    val row2 = columnVector.getStruct(2, 2)
+    assert(row2.isNullAt(0))
+    assert(row2.getLong(1) === 3L)
+
+    assert(columnVector.isNullAt(3))
+
+    val row4 = columnVector.getStruct(4, 2)
+    assert(row4.getInt(0) === 5)
+    assert(row4.getLong(1) === 5L)
+
+    columnVector.close()
+    allocator.close()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: spark git commit: [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

Liang-Chi Hsieh

Yeah, I think it should be "ColumnVector.Array". Already ping @ueshin for this issue.

Jacek Laskowski wrote
Hi,

Looks like the change has broken the build for me:

[INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
spark-sql_2.11 ---
/Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
error: not found: type Array
  public void loadBytes(Array array) {
                        ^

...

222 warnings found
one error found
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [  4.864 s]
[INFO] Spark Project Tags ................................. SUCCESS [  5.689 s]
[INFO] Spark Project Sketch ............................... SUCCESS [  4.646 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  6.074 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 10.305 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  7.355 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  7.639 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 10.364 s]
[INFO] Spark Project Core ................................. SUCCESS [02:01 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [  9.711 s]
[INFO] Spark Project GraphX ............................... SUCCESS [ 16.652 s]
[INFO] Spark Project Streaming ............................ SUCCESS [ 36.845 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [01:41 min]
[INFO] Spark Project SQL .................................. FAILURE [02:14 min]

Is this only me or others suffer from it too?

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski



---------- Forwarded message ----------
From:  <[hidden email]>
Date: Thu, Jul 20, 2017 at 3:00 PM
Subject: spark git commit: [SPARK-21472][SQL] Introduce
ArrowColumnVector as a reader for Arrow vectors.
To: [hidden email]


Repository: spark
Updated Branches:
  refs/heads/master 5d1850d4b -> cb19880cd


[SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

## What changes were proposed in this pull request?

Introducing `ArrowColumnVector` as a reader for Arrow vectors.
It extends `ColumnVector`, so we will be able to use it with
`ColumnarBatch` and its functionalities.
Currently it supports primitive types and `StringType`, `ArrayType`
and `StructType`.

## How was this patch tested?

Added tests for `ArrowColumnVector` and existing tests.

Author: Takuya UESHIN <[hidden email]>

Closes #18680 from ueshin/issues/SPARK-21472.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c

Branch: refs/heads/master
Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
Parents: 5d1850d
Author: Takuya UESHIN <[hidden email]>
Authored: Thu Jul 20 21:00:30 2017 +0800
Committer: Wenchen Fan <[hidden email]>
Committed: Thu Jul 20 21:00:30 2017 +0800

----------------------------------------------------------------------
 .../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++
 .../sql/execution/vectorized/ColumnVector.java  |  16 +-
 .../vectorized/ReadOnlyColumnVector.java        | 251 ++++++++
 .../sql/execution/arrow/ArrowConverters.scala   |  32 +-
 .../spark/sql/execution/arrow/ArrowUtils.scala  | 109 ++++
 .../execution/arrow/ArrowConvertersSuite.scala  |   2 +-
 .../sql/execution/arrow/ArrowUtilsSuite.scala   |  65 ++
 .../vectorized/ArrowColumnVectorSuite.scala     | 410 +++++++++++++
 8 files changed, 1436 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
new file mode 100644
index 0000000..68e0abc
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column vector backed by Apache Arrow.
+ */
+public final class ArrowColumnVector extends ReadOnlyColumnVector {
+
+  private final ArrowVectorAccessor accessor;
+  private final int valueCount;
+
+  private void ensureAccessible(int index) {
+    if (index < 0 || index >= valueCount) {
+      throw new IndexOutOfBoundsException(
+        String.format("index: %d, valueCount: %d", index, valueCount));
+    }
+  }
+
+  private void ensureAccessible(int index, int count) {
+    if (index < 0 || index + count > valueCount) {
+      throw new IndexOutOfBoundsException(
+        String.format("index range: [%d, %d), valueCount: %d", index,
index + count, valueCount));
+    }
+  }
+
+  @Override
+  public long nullsNativeAddress() {
+    throw new RuntimeException("Cannot get native address for arrow column");
+  }
+
+  @Override
+  public long valuesNativeAddress() {
+    throw new RuntimeException("Cannot get native address for arrow column");
+  }
+
+  @Override
+  public void close() {
+    if (childColumns != null) {
+      for (int i = 0; i < childColumns.length; i++) {
+        childColumns[i].close();
+      }
+    }
+    accessor.close();
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.isNullAt(rowId);
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getBoolean(rowId);
+  }
+
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    boolean[] array = new boolean[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getBoolean(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public byte getByte(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getByte(rowId);
+  }
+
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    byte[] array = new byte[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getByte(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Shorts
+  //
+
+  @Override
+  public short getShort(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getShort(rowId);
+  }
+
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    short[] array = new short[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getShort(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Ints
+  //
+
+  @Override
+  public int getInt(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getInt(rowId);
+  }
+
+  @Override
+  public int[] getInts(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    int[] array = new int[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getInt(rowId + i);
+    }
+    return array;
+  }
+
+  @Override
+  public int getDictId(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public long getLong(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getLong(rowId);
+  }
+
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    long[] array = new long[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getLong(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with floats
+  //
+
+  @Override
+  public float getFloat(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getFloat(rowId);
+  }
+
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    float[] array = new float[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getFloat(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public double getDouble(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getDouble(rowId);
+  }
+
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    ensureAccessible(rowId, count);
+    double[] array = new double[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = accessor.getDouble(rowId + i);
+    }
+    return array;
+  }
+
+  //
+  // APIs dealing with Arrays
+  //
+
+  @Override
+  public int getArrayLength(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getArrayLength(rowId);
+  }
+
+  @Override
+  public int getArrayOffset(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getArrayOffset(rowId);
+  }
+
+  @Override
+  public void loadBytes(Array array) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Decimals
+  //
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    ensureAccessible(rowId);
+    return accessor.getDecimal(rowId, precision, scale);
+  }
+
+  //
+  // APIs dealing with UTF8Strings
+  //
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getUTF8String(rowId);
+  }
+
+  //
+  // APIs dealing with Binaries
+  //
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    ensureAccessible(rowId);
+    return accessor.getBinary(rowId);
+  }
+
+  public ArrowColumnVector(ValueVector vector) {
+    super(vector.getValueCapacity(),
ArrowUtils.fromArrowField(vector.getField()),
+      MemoryMode.OFF_HEAP);
+
+    if (vector instanceof NullableBitVector) {
+      accessor = new BooleanAccessor((NullableBitVector) vector);
+    } else if (vector instanceof NullableTinyIntVector) {
+      accessor = new ByteAccessor((NullableTinyIntVector) vector);
+    } else if (vector instanceof NullableSmallIntVector) {
+      accessor = new ShortAccessor((NullableSmallIntVector) vector);
+    } else if (vector instanceof NullableIntVector) {
+      accessor = new IntAccessor((NullableIntVector) vector);
+    } else if (vector instanceof NullableBigIntVector) {
+      accessor = new LongAccessor((NullableBigIntVector) vector);
+    } else if (vector instanceof NullableFloat4Vector) {
+      accessor = new FloatAccessor((NullableFloat4Vector) vector);
+    } else if (vector instanceof NullableFloat8Vector) {
+      accessor = new DoubleAccessor((NullableFloat8Vector) vector);
+    } else if (vector instanceof NullableDecimalVector) {
+      accessor = new DecimalAccessor((NullableDecimalVector) vector);
+    } else if (vector instanceof NullableVarCharVector) {
+      accessor = new StringAccessor((NullableVarCharVector) vector);
+    } else if (vector instanceof NullableVarBinaryVector) {
+      accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
+    } else if (vector instanceof ListVector) {
+      ListVector listVector = (ListVector) vector;
+      accessor = new ArrayAccessor(listVector);
+
+      childColumns = new ColumnVector[1];
+      childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
+      resultArray = new Array(childColumns[0]);
+    } else if (vector instanceof MapVector) {
+      MapVector mapVector = (MapVector) vector;
+      accessor = new StructAccessor(mapVector);
+
+      childColumns = new ArrowColumnVector[mapVector.size()];
+      for (int i = 0; i < childColumns.length; ++i) {
+        childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
+      }
+      resultStruct = new ColumnarBatch.Row(childColumns);
+    } else {
+      throw new UnsupportedOperationException();
+    }
+    valueCount = accessor.getValueCount();
+    numNulls = accessor.getNullCount();
+    anyNullsSet = numNulls > 0;
+  }
+
+  private static abstract class ArrowVectorAccessor {
+
+    private final ValueVector vector;
+    private final ValueVector.Accessor nulls;
+
+    private final int valueCount;
+    private final int nullCount;
+
+    ArrowVectorAccessor(ValueVector vector) {
+      this.vector = vector;
+      this.nulls = vector.getAccessor();
+      this.valueCount = nulls.getValueCount();
+      this.nullCount = nulls.getNullCount();
+    }
+
+    final boolean isNullAt(int rowId) {
+      return nulls.isNull(rowId);
+    }
+
+    final int getValueCount() {
+      return valueCount;
+    }
+
+    final int getNullCount() {
+      return nullCount;
+    }
+
+    final void close() {
+      vector.close();
+    }
+
+    boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getArrayLength(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    int getArrayOffset(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class BooleanAccessor extends ArrowVectorAccessor {
+
+    private final NullableBitVector.Accessor accessor;
+
+    BooleanAccessor(NullableBitVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final boolean getBoolean(int rowId) {
+      return accessor.get(rowId) == 1;
+    }
+  }
+
+  private static class ByteAccessor extends ArrowVectorAccessor {
+
+    private final NullableTinyIntVector.Accessor accessor;
+
+    ByteAccessor(NullableTinyIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final byte getByte(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class ShortAccessor extends ArrowVectorAccessor {
+
+    private final NullableSmallIntVector.Accessor accessor;
+
+    ShortAccessor(NullableSmallIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final short getShort(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class IntAccessor extends ArrowVectorAccessor {
+
+    private final NullableIntVector.Accessor accessor;
+
+    IntAccessor(NullableIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class LongAccessor extends ArrowVectorAccessor {
+
+    private final NullableBigIntVector.Accessor accessor;
+
+    LongAccessor(NullableBigIntVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class FloatAccessor extends ArrowVectorAccessor {
+
+    private final NullableFloat4Vector.Accessor accessor;
+
+    FloatAccessor(NullableFloat4Vector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class DoubleAccessor extends ArrowVectorAccessor {
+
+    private final NullableFloat8Vector.Accessor accessor;
+
+    DoubleAccessor(NullableFloat8Vector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class DecimalAccessor extends ArrowVectorAccessor {
+
+    private final NullableDecimalVector.Accessor accessor;
+
+    DecimalAccessor(NullableDecimalVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      if (isNullAt(rowId)) return null;
+      return Decimal.apply(accessor.getObject(rowId), precision, scale);
+    }
+  }
+
+  private static class StringAccessor extends ArrowVectorAccessor {
+
+    private final NullableVarCharVector.Accessor accessor;
+    private final NullableVarCharHolder stringResult = new
NullableVarCharHolder();
+
+    StringAccessor(NullableVarCharVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      accessor.get(rowId, stringResult);
+      if (stringResult.isSet == 0) {
+        return null;
+      } else {
+        return UTF8String.fromAddress(null,
+          stringResult.buffer.memoryAddress() + stringResult.start,
+          stringResult.end - stringResult.start);
+      }
+    }
+  }
+
+  private static class BinaryAccessor extends ArrowVectorAccessor {
+
+    private final NullableVarBinaryVector.Accessor accessor;
+
+    BinaryAccessor(NullableVarBinaryVector vector) {
+      super(vector);
+      this.accessor = vector.getAccessor();
+    }
+
+    @Override
+    final byte[] getBinary(int rowId) {
+      return accessor.getObject(rowId);
+    }
+  }
+
+  private static class ArrayAccessor extends ArrowVectorAccessor {
+
+    private final UInt4Vector.Accessor accessor;
+
+    ArrayAccessor(ListVector vector) {
+      super(vector);
+      this.accessor = vector.getOffsetVector().getAccessor();
+    }
+
+    @Override
+    final int getArrayLength(int rowId) {
+      return accessor.get(rowId + 1) - accessor.get(rowId);
+    }
+
+    @Override
+    final int getArrayOffset(int rowId) {
+      return accessor.get(rowId);
+    }
+  }
+
+  private static class StructAccessor extends ArrowVectorAccessor {
+
+    StructAccessor(MapVector vector) {
+      super(vector);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 0c027f8..7796638 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -646,7 +646,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the decimal for rowId.
    */
-  public final Decimal getDecimal(int rowId, int precision, int scale) {
+  public Decimal getDecimal(int rowId, int precision, int scale) {
     if (precision <= Decimal.MAX_INT_DIGITS()) {
       return Decimal.createUnsafe(getInt(rowId), precision, scale);
     } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -661,7 +661,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   }


-  public final void putDecimal(int rowId, Decimal value, int precision) {
+  public void putDecimal(int rowId, Decimal value, int precision) {
     if (precision <= Decimal.MAX_INT_DIGITS()) {
       putInt(rowId, (int) value.toUnscaledLong());
     } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -675,7 +675,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the UTF8String for rowId.
    */
-  public final UTF8String getUTF8String(int rowId) {
+  public UTF8String getUTF8String(int rowId) {
     if (dictionary == null) {
       ColumnVector.Array a = getByteArray(rowId);
       return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
@@ -688,7 +688,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Returns the byte array for rowId.
    */
-  public final byte[] getBinary(int rowId) {
+  public byte[] getBinary(int rowId) {
     if (dictionary == null) {
       ColumnVector.Array array = getByteArray(rowId);
       byte[] bytes = new byte[array.length];
@@ -956,7 +956,7 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * Data type for this column.
    */
-  protected final DataType type;
+  protected DataType type;

   /**
    * Number of nulls in this column. This is an optimization for the
reader, to skip NULL checks.
@@ -988,17 +988,17 @@ public abstract class ColumnVector implements
AutoCloseable {
   /**
    * If this is a nested type (array or struct), the column for the child data.
    */
-  protected final ColumnVector[] childColumns;
+  protected ColumnVector[] childColumns;

   /**
    * Reusable Array holder for getArray().
    */
-  protected final Array resultArray;
+  protected Array resultArray;

   /**
    * Reusable Struct holder for getStruct().
    */
-  protected final ColumnarBatch.Row resultStruct;
+  protected ColumnarBatch.Row resultStruct;

   /**
    * The Dictionary for this column.

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
new file mode 100644
index 0000000..e9f6e7c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
+
+/**
+ * An abstract class for read-only column vector.
+ */
+public abstract class ReadOnlyColumnVector extends ColumnVector {
+
+  protected ReadOnlyColumnVector(int capacity, DataType type,
MemoryMode memMode) {
+    super(capacity, DataTypes.NullType, memMode);
+    this.type = type;
+    isConstant = true;
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public final void putNotNull(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNull(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNulls(int rowId, int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putNotNulls(int rowId, int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public final void putBoolean(int rowId, boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBooleans(int rowId, int count, boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public final void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Shorts
+  //
+
+  @Override
+  public final void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putShorts(int rowId, int count, short value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putShorts(int rowId, int count, short[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Ints
+  //
+
+  @Override
+  public final void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putIntsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public final void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putLongsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with floats
+  //
+
+  @Override
+  public final void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, float[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public final void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double[] src,
int srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, byte[] src, int
srcIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Arrays
+  //
+
+  @Override
+  public final void putArray(int rowId, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Byte Arrays
+  //
+
+  @Override
+  public final int putByteArray(int rowId, byte[] value, int offset,
int count) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // APIs dealing with Decimals
+  //
+
+  @Override
+  public final void putDecimal(int rowId, Decimal value, int precision) {
+    throw new UnsupportedOperationException();
+  }
+
+  //
+  // Other APIs
+  //
+
+  @Override
+  public final void setDictionary(Dictionary dictionary) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ColumnVector reserveDictionaryIds(int capacity) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected final void reserveInternal(int newCapacity) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 6af5c73..c913efe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
 private[sql] object ArrowConverters {

   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
-    dataType match {
-      case BooleanType => ArrowType.Bool.INSTANCE
-      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
-      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-      case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-      case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-      case ByteType => new ArrowType.Int(8, true)
-      case StringType => ArrowType.Utf8.INSTANCE
-      case BinaryType => ArrowType.Binary.INSTANCE
-      case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dataType")
-    }
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-    val arrowFields = schema.fields.map { f =>
-      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
List.empty[Field].asJava)
-    }
-    new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
    * Maps Iterator from InternalRow to ArrowPayload. Limit
ArrowRecordBatch size in ArrowPayload
    * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
    */
@@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
       batch: ArrowRecordBatch,
       schema: StructType,
       allocator: BufferAllocator): Array[Byte] = {
-    val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
+    val arrowSchema = ArrowUtils.toArrowSchema(schema)
     val root = VectorSchemaRoot.create(arrowSchema, allocator)
     val out = new ByteArrayOutputStream()
     val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
@@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
    * Create an Arrow ColumnWriter given the type and ordinal of row.
    */
   def apply(dataType: DataType, ordinal: Int, allocator:
BufferAllocator): ColumnWriter = {
-    val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
+    val dtype = ArrowUtils.toArrowType(dataType)
     dataType match {
       case BooleanType => new BooleanColumnWriter(dtype, ordinal, allocator)
       case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
new file mode 100644
index 0000000..2caf1ef
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.memory.RootAllocator
+import org.apache.arrow.vector.types.FloatingPointPrecision
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
+
+import org.apache.spark.sql.types._
+
+object ArrowUtils {
+
+  val rootAllocator = new RootAllocator(Long.MaxValue)
+
+  // todo: support more types.
+
+  def toArrowType(dt: DataType): ArrowType = dt match {
+    case BooleanType => ArrowType.Bool.INSTANCE
+    case ByteType => new ArrowType.Int(8, true)
+    case ShortType => new ArrowType.Int(8 * 2, true)
+    case IntegerType => new ArrowType.Int(8 * 4, true)
+    case LongType => new ArrowType.Int(8 * 8, true)
+    case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+    case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+    case StringType => ArrowType.Utf8.INSTANCE
+    case BinaryType => ArrowType.Binary.INSTANCE
+    case DecimalType.Fixed(precision, scale) => new
ArrowType.Decimal(precision, scale)
+    case _ => throw new UnsupportedOperationException(s"Unsupported
data type: ${dt.simpleString}")
+  }
+
+  def fromArrowType(dt: ArrowType): DataType = dt match {
+    case ArrowType.Bool.INSTANCE => BooleanType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 => ByteType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 2 => ShortType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 4 => IntegerType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 8 => LongType
+    case float: ArrowType.FloatingPoint
+      if float.getPrecision() == FloatingPointPrecision.SINGLE => FloatType
+    case float: ArrowType.FloatingPoint
+      if float.getPrecision() == FloatingPointPrecision.DOUBLE => DoubleType
+    case ArrowType.Utf8.INSTANCE => StringType
+    case ArrowType.Binary.INSTANCE => BinaryType
+    case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
+    case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dt")
+  }
+
+  def toArrowField(name: String, dt: DataType, nullable: Boolean): Field = {
+    dt match {
+      case ArrayType(elementType, containsNull) =>
+        val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
+        new Field(name, fieldType, Seq(toArrowField("element",
elementType, containsNull)).asJava)
+      case StructType(fields) =>
+        val fieldType = new FieldType(nullable,
ArrowType.Struct.INSTANCE, null)
+        new Field(name, fieldType,
+          fields.map { field =>
+            toArrowField(field.name, field.dataType, field.nullable)
+          }.toSeq.asJava)
+      case dataType =>
+        val fieldType = new FieldType(nullable, toArrowType(dataType), null)
+        new Field(name, fieldType, Seq.empty[Field].asJava)
+    }
+  }
+
+  def fromArrowField(field: Field): DataType = {
+    field.getType match {
+      case ArrowType.List.INSTANCE =>
+        val elementField = field.getChildren().get(0)
+        val elementType = fromArrowField(elementField)
+        ArrayType(elementType, containsNull = elementField.isNullable)
+      case ArrowType.Struct.INSTANCE =>
+        val fields = field.getChildren().asScala.map { child =>
+          val dt = fromArrowField(child)
+          StructField(child.getName, dt, child.isNullable)
+        }
+        StructType(fields)
+      case arrowType => fromArrowType(arrowType)
+    }
+  }
+
+  def toArrowSchema(schema: StructType): Schema = {
+    new Schema(schema.map { field =>
+      toArrowField(field.name, field.dataType, field.nullable)
+    }.asJava)
+  }
+
+  def fromArrowSchema(schema: Schema): StructType = {
+    StructType(schema.getFields.asScala.map { field =>
+      val dt = fromArrowField(field)
+      StructField(field.getName, dt, field.isNullable)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 159328c..55b4655 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
SharedSQLContext with BeforeAndAfterAll {
     val allocator = new RootAllocator(Long.MaxValue)
     val jsonReader = new JsonFileReader(jsonFile, allocator)

-    val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
+    val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
     val jsonSchema = jsonReader.start()
     Validator.compareSchemas(arrowSchema, jsonSchema)


http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
new file mode 100644
index 0000000..638619f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types._
+
+class ArrowUtilsSuite extends SparkFunSuite {
+
+  def roundtrip(dt: DataType): Unit = {
+    dt match {
+      case schema: StructType =>
+        assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
=== schema)
+      case _ =>
+        roundtrip(new StructType().add("value", dt))
+    }
+  }
+
+  test("simple") {
+    roundtrip(BooleanType)
+    roundtrip(ByteType)
+    roundtrip(ShortType)
+    roundtrip(IntegerType)
+    roundtrip(LongType)
+    roundtrip(FloatType)
+    roundtrip(DoubleType)
+    roundtrip(StringType)
+    roundtrip(BinaryType)
+    roundtrip(DecimalType.SYSTEM_DEFAULT)
+  }
+
+  test("array") {
+    roundtrip(ArrayType(IntegerType, containsNull = true))
+    roundtrip(ArrayType(IntegerType, containsNull = false))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = true))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = true))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = false))
+    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = false))
+  }
+
+  test("struct") {
+    roundtrip(new StructType())
+    roundtrip(new StructType().add("i", IntegerType))
+    roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
+    roundtrip(new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType)))
+    roundtrip(new StructType().add(
+      "struct",
+      new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
new file mode 100644
index 0000000..d24a9e1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.arrow.ArrowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ArrowColumnVectorSuite extends SparkFunSuite {
+
+  test("boolean") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("boolean", BooleanType,
nullable = true)
+      .createVector(allocator).asInstanceOf[NullableBitVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === BooleanType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getBoolean(i) === (i % 2 == 0))
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
(i % 2 == 0)))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("byte") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableTinyIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toByte)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ByteType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getByte(i) === i.toByte)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getBytes(0, 10) === (0 until 10).map(i => i.toByte))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("short") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("short", ShortType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableSmallIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toShort)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ShortType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getShort(i) === i.toShort)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getShorts(0, 10) === (0 until 10).map(i => i.toShort))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("int") {
+    val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === IntegerType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getInt(i) === i)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getInts(0, 10) === (0 until 10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("long") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("long", LongType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableBigIntVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toLong)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === LongType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getLong(i) === i.toLong)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getLongs(0, 10) === (0 until 10).map(i => i.toLong))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("float") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("float", FloatType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableFloat4Vector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toFloat)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === FloatType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getFloat(i) === i.toFloat)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getFloats(0, 10) === (0 until 10).map(i => i.toFloat))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("double") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableFloat8Vector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      mutator.setSafe(i, i.toDouble)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === DoubleType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getDouble(i) === i.toDouble)
+    }
+    assert(columnVector.isNullAt(10))
+
+    assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
i.toDouble))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("string") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("string", StringType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableVarCharVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      mutator.setSafe(i, utf8, 0, utf8.length)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === StringType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getUTF8String(i) === UTF8String.fromString(s"str$i"))
+    }
+    assert(columnVector.isNullAt(10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("binary") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+
+    (0 until 10).foreach { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      mutator.setSafe(i, utf8, 0, utf8.length)
+    }
+    mutator.setNull(10)
+    mutator.setValueCount(11)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === BinaryType)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    (0 until 10).foreach { i =>
+      assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
+    }
+    assert(columnVector.isNullAt(10))
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("array") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
+    val vector = ArrowUtils.toArrowField("array",
ArrayType(IntegerType), nullable = true)
+      .createVector(allocator).asInstanceOf[ListVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+    val elementVector = vector.getDataVector().asInstanceOf[NullableIntVector]
+    val elementMutator = elementVector.getMutator()
+
+    // [1, 2]
+    mutator.startNewValue(0)
+    elementMutator.setSafe(0, 1)
+    elementMutator.setSafe(1, 2)
+    mutator.endValue(0, 2)
+
+    // [3, null, 5]
+    mutator.startNewValue(1)
+    elementMutator.setSafe(2, 3)
+    elementMutator.setNull(3)
+    elementMutator.setSafe(4, 5)
+    mutator.endValue(1, 3)
+
+    // null
+
+    // []
+    mutator.startNewValue(3)
+    mutator.endValue(3, 0)
+
+    elementMutator.setValueCount(5)
+    mutator.setValueCount(4)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === ArrayType(IntegerType))
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    val array0 = columnVector.getArray(0)
+    assert(array0.numElements() === 2)
+    assert(array0.getInt(0) === 1)
+    assert(array0.getInt(1) === 2)
+
+    val array1 = columnVector.getArray(1)
+    assert(array1.numElements() === 3)
+    assert(array1.getInt(0) === 3)
+    assert(array1.isNullAt(1))
+    assert(array1.getInt(2) === 5)
+
+    assert(columnVector.isNullAt(2))
+
+    val array3 = columnVector.getArray(3)
+    assert(array3.numElements() === 0)
+
+    columnVector.close()
+    allocator.close()
+  }
+
+  test("struct") {
+    val allocator =
ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
+    val schema = new StructType().add("int", IntegerType).add("long", LongType)
+    val vector = ArrowUtils.toArrowField("struct", schema, nullable = true)
+      .createVector(allocator).asInstanceOf[NullableMapVector]
+    vector.allocateNew()
+    val mutator = vector.getMutator()
+    val intVector = vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
+    val intMutator = intVector.getMutator()
+    val longVector =
vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
+    val longMutator = longVector.getMutator()
+
+    // (1, 1L)
+    mutator.setIndexDefined(0)
+    intMutator.setSafe(0, 1)
+    longMutator.setSafe(0, 1L)
+
+    // (2, null)
+    mutator.setIndexDefined(1)
+    intMutator.setSafe(1, 2)
+    longMutator.setNull(1)
+
+    // (null, 3L)
+    mutator.setIndexDefined(2)
+    intMutator.setNull(2)
+    longMutator.setSafe(2, 3L)
+
+    // null
+    mutator.setNull(3)
+
+    // (5, 5L)
+    mutator.setIndexDefined(4)
+    intMutator.setSafe(4, 5)
+    longMutator.setSafe(4, 5L)
+
+    intMutator.setValueCount(5)
+    longMutator.setValueCount(5)
+    mutator.setValueCount(5)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === schema)
+    assert(columnVector.anyNullsSet)
+    assert(columnVector.numNulls === 1)
+
+    val row0 = columnVector.getStruct(0, 2)
+    assert(row0.getInt(0) === 1)
+    assert(row0.getLong(1) === 1L)
+
+    val row1 = columnVector.getStruct(1, 2)
+    assert(row1.getInt(0) === 2)
+    assert(row1.isNullAt(1))
+
+    val row2 = columnVector.getStruct(2, 2)
+    assert(row2.isNullAt(0))
+    assert(row2.getLong(1) === 3L)
+
+    assert(columnVector.isNullAt(3))
+
+    val row4 = columnVector.getStruct(4, 2)
+    assert(row4.getInt(0) === 5)
+    assert(row4.getLong(1) === 5L)
+
+    columnVector.close()
+    allocator.close()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fwd: spark git commit: [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.

Takuya UESHIN
Hi,

Thank you for reporting it.

I believe it's fixed now.
If you still have a problem, please let me know.

Thanks.


On Fri, Jul 21, 2017 at 4:57 PM, Liang-Chi Hsieh <[hidden email]> wrote:

Yeah, I think it should be "ColumnVector.Array". Already ping @ueshin for
this issue.


Jacek Laskowski wrote
> Hi,
>
> Looks like the change has broken the build for me:
>
> [INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
> spark-sql_2.11 ---
> /Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
> error: not found: type Array
>   public void loadBytes(Array array) {
>                         ^
>
> ...
>
> 222 warnings found
> one error found
> [INFO]
> ------------------------------------------------------------------------
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Spark Project Parent POM ........................... SUCCESS [
> 4.864 s]
> [INFO] Spark Project Tags ................................. SUCCESS [
> 5.689 s]
> [INFO] Spark Project Sketch ............................... SUCCESS [
> 4.646 s]
> [INFO] Spark Project Local DB ............................. SUCCESS [
> 6.074 s]
> [INFO] Spark Project Networking ........................... SUCCESS [
> 10.305 s]
> [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [
> 7.355 s]
> [INFO] Spark Project Unsafe ............................... SUCCESS [
> 7.639 s]
> [INFO] Spark Project Launcher ............................. SUCCESS [
> 10.364 s]
> [INFO] Spark Project Core ................................. SUCCESS [02:01
> min]
> [INFO] Spark Project ML Local Library ..................... SUCCESS [
> 9.711 s]
> [INFO] Spark Project GraphX ............................... SUCCESS [
> 16.652 s]
> [INFO] Spark Project Streaming ............................ SUCCESS [
> 36.845 s]
> [INFO] Spark Project Catalyst ............................. SUCCESS [01:41
> min]
> [INFO] Spark Project SQL .................................. FAILURE [02:14
> min]
>
> Is this only me or others suffer from it too?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
>
> ---------- Forwarded message ----------
> From:  &lt;

> wenchen@

> &gt;
> Date: Thu, Jul 20, 2017 at 3:00 PM
> Subject: spark git commit: [SPARK-21472][SQL] Introduce
> ArrowColumnVector as a reader for Arrow vectors.
> To:

> commits@.apache

>
>
> Repository: spark
> Updated Branches:
>   refs/heads/master 5d1850d4b -> cb19880cd
>
>
> [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow
> vectors.
>
> ## What changes were proposed in this pull request?
>
> Introducing `ArrowColumnVector` as a reader for Arrow vectors.
> It extends `ColumnVector`, so we will be able to use it with
> `ColumnarBatch` and its functionalities.
> Currently it supports primitive types and `StringType`, `ArrayType`
> and `StructType`.
>
> ## How was this patch tested?
>
> Added tests for `ArrowColumnVector` and existing tests.
>
> Author: Takuya UESHIN &lt;

> ueshin@

> &gt;
>
> Closes #18680 from ueshin/issues/SPARK-21472.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/spark/repo
> Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
> Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
> Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c
>
> Branch: refs/heads/master
> Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
> Parents: 5d1850d
> Author: Takuya UESHIN &lt;

> ueshin@

> &gt;
> Authored: Thu Jul 20 21:00:30 2017 +0800
> Committer: Wenchen Fan &lt;

> wenchen@

> &gt;
> Committed: Thu Jul 20 21:00:30 2017 +0800
>
> ----------------------------------------------------------------------
>  .../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++
>  .../sql/execution/vectorized/ColumnVector.java  |  16 +-
>  .../vectorized/ReadOnlyColumnVector.java        | 251 ++++++++
>  .../sql/execution/arrow/ArrowConverters.scala   |  32 +-
>  .../spark/sql/execution/arrow/ArrowUtils.scala  | 109 ++++
>  .../execution/arrow/ArrowConvertersSuite.scala  |   2 +-
>  .../sql/execution/arrow/ArrowUtilsSuite.scala   |  65 ++
>  .../vectorized/ArrowColumnVectorSuite.scala     | 410 +++++++++++++
>  8 files changed, 1436 insertions(+), 39 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> new file mode 100644
> index 0000000..68e0abc
> --- /dev/null
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> @@ -0,0 +1,590 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized;
> +
> +import org.apache.arrow.vector.*;
> +import org.apache.arrow.vector.complex.*;
> +import org.apache.arrow.vector.holders.NullableVarCharHolder;
> +
> +import org.apache.spark.memory.MemoryMode;
> +import org.apache.spark.sql.execution.arrow.ArrowUtils;
> +import org.apache.spark.sql.types.*;
> +import org.apache.spark.unsafe.types.UTF8String;
> +
> +/**
> + * A column vector backed by Apache Arrow.
> + */
> +public final class ArrowColumnVector extends ReadOnlyColumnVector {
> +
> +  private final ArrowVectorAccessor accessor;
> +  private final int valueCount;
> +
> +  private void ensureAccessible(int index) {
> +    if (index < 0 || index >= valueCount) {
> +      throw new IndexOutOfBoundsException(
> +        String.format("index: %d, valueCount: %d", index, valueCount));
> +    }
> +  }
> +
> +  private void ensureAccessible(int index, int count) {
> +    if (index < 0 || index + count > valueCount) {
> +      throw new IndexOutOfBoundsException(
> +        String.format("index range: [%d, %d), valueCount: %d", index,
> index + count, valueCount));
> +    }
> +  }
> +
> +  @Override
> +  public long nullsNativeAddress() {
> +    throw new RuntimeException("Cannot get native address for arrow
> column");
> +  }
> +
> +  @Override
> +  public long valuesNativeAddress() {
> +    throw new RuntimeException("Cannot get native address for arrow
> column");
> +  }
> +
> +  @Override
> +  public void close() {
> +    if (childColumns != null) {
> +      for (int i = 0; i < childColumns.length; i++) {
> +        childColumns[i].close();
> +      }
> +    }
> +    accessor.close();
> +  }
> +
> +  //
> +  // APIs dealing with nulls
> +  //
> +
> +  @Override
> +  public boolean isNullAt(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.isNullAt(rowId);
> +  }
> +
> +  //
> +  // APIs dealing with Booleans
> +  //
> +
> +  @Override
> +  public boolean getBoolean(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getBoolean(rowId);
> +  }
> +
> +  @Override
> +  public boolean[] getBooleans(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    boolean[] array = new boolean[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getBoolean(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with Bytes
> +  //
> +
> +  @Override
> +  public byte getByte(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getByte(rowId);
> +  }
> +
> +  @Override
> +  public byte[] getBytes(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    byte[] array = new byte[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getByte(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with Shorts
> +  //
> +
> +  @Override
> +  public short getShort(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getShort(rowId);
> +  }
> +
> +  @Override
> +  public short[] getShorts(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    short[] array = new short[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getShort(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with Ints
> +  //
> +
> +  @Override
> +  public int getInt(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getInt(rowId);
> +  }
> +
> +  @Override
> +  public int[] getInts(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    int[] array = new int[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getInt(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  @Override
> +  public int getDictId(int rowId) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Longs
> +  //
> +
> +  @Override
> +  public long getLong(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getLong(rowId);
> +  }
> +
> +  @Override
> +  public long[] getLongs(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    long[] array = new long[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getLong(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with floats
> +  //
> +
> +  @Override
> +  public float getFloat(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getFloat(rowId);
> +  }
> +
> +  @Override
> +  public float[] getFloats(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    float[] array = new float[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getFloat(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with doubles
> +  //
> +
> +  @Override
> +  public double getDouble(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getDouble(rowId);
> +  }
> +
> +  @Override
> +  public double[] getDoubles(int rowId, int count) {
> +    ensureAccessible(rowId, count);
> +    double[] array = new double[count];
> +    for (int i = 0; i < count; ++i) {
> +      array[i] = accessor.getDouble(rowId + i);
> +    }
> +    return array;
> +  }
> +
> +  //
> +  // APIs dealing with Arrays
> +  //
> +
> +  @Override
> +  public int getArrayLength(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getArrayLength(rowId);
> +  }
> +
> +  @Override
> +  public int getArrayOffset(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getArrayOffset(rowId);
> +  }
> +
> +  @Override
> +  public void loadBytes(Array array) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Decimals
> +  //
> +
> +  @Override
> +  public Decimal getDecimal(int rowId, int precision, int scale) {
> +    ensureAccessible(rowId);
> +    return accessor.getDecimal(rowId, precision, scale);
> +  }
> +
> +  //
> +  // APIs dealing with UTF8Strings
> +  //
> +
> +  @Override
> +  public UTF8String getUTF8String(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getUTF8String(rowId);
> +  }
> +
> +  //
> +  // APIs dealing with Binaries
> +  //
> +
> +  @Override
> +  public byte[] getBinary(int rowId) {
> +    ensureAccessible(rowId);
> +    return accessor.getBinary(rowId);
> +  }
> +
> +  public ArrowColumnVector(ValueVector vector) {
> +    super(vector.getValueCapacity(),
> ArrowUtils.fromArrowField(vector.getField()),
> +      MemoryMode.OFF_HEAP);
> +
> +    if (vector instanceof NullableBitVector) {
> +      accessor = new BooleanAccessor((NullableBitVector) vector);
> +    } else if (vector instanceof NullableTinyIntVector) {
> +      accessor = new ByteAccessor((NullableTinyIntVector) vector);
> +    } else if (vector instanceof NullableSmallIntVector) {
> +      accessor = new ShortAccessor((NullableSmallIntVector) vector);
> +    } else if (vector instanceof NullableIntVector) {
> +      accessor = new IntAccessor((NullableIntVector) vector);
> +    } else if (vector instanceof NullableBigIntVector) {
> +      accessor = new LongAccessor((NullableBigIntVector) vector);
> +    } else if (vector instanceof NullableFloat4Vector) {
> +      accessor = new FloatAccessor((NullableFloat4Vector) vector);
> +    } else if (vector instanceof NullableFloat8Vector) {
> +      accessor = new DoubleAccessor((NullableFloat8Vector) vector);
> +    } else if (vector instanceof NullableDecimalVector) {
> +      accessor = new DecimalAccessor((NullableDecimalVector) vector);
> +    } else if (vector instanceof NullableVarCharVector) {
> +      accessor = new StringAccessor((NullableVarCharVector) vector);
> +    } else if (vector instanceof NullableVarBinaryVector) {
> +      accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
> +    } else if (vector instanceof ListVector) {
> +      ListVector listVector = (ListVector) vector;
> +      accessor = new ArrayAccessor(listVector);
> +
> +      childColumns = new ColumnVector[1];
> +      childColumns[0] = new
> ArrowColumnVector(listVector.getDataVector());
> +      resultArray = new Array(childColumns[0]);
> +    } else if (vector instanceof MapVector) {
> +      MapVector mapVector = (MapVector) vector;
> +      accessor = new StructAccessor(mapVector);
> +
> +      childColumns = new ArrowColumnVector[mapVector.size()];
> +      for (int i = 0; i < childColumns.length; ++i) {
> +        childColumns[i] = new
> ArrowColumnVector(mapVector.getVectorById(i));
> +      }
> +      resultStruct = new ColumnarBatch.Row(childColumns);
> +    } else {
> +      throw new UnsupportedOperationException();
> +    }
> +    valueCount = accessor.getValueCount();
> +    numNulls = accessor.getNullCount();
> +    anyNullsSet = numNulls > 0;
> +  }
> +
> +  private static abstract class ArrowVectorAccessor {
> +
> +    private final ValueVector vector;
> +    private final ValueVector.Accessor nulls;
> +
> +    private final int valueCount;
> +    private final int nullCount;
> +
> +    ArrowVectorAccessor(ValueVector vector) {
> +      this.vector = vector;
> +      this.nulls = vector.getAccessor();
> +      this.valueCount = nulls.getValueCount();
> +      this.nullCount = nulls.getNullCount();
> +    }
> +
> +    final boolean isNullAt(int rowId) {
> +      return nulls.isNull(rowId);
> +    }
> +
> +    final int getValueCount() {
> +      return valueCount;
> +    }
> +
> +    final int getNullCount() {
> +      return nullCount;
> +    }
> +
> +    final void close() {
> +      vector.close();
> +    }
> +
> +    boolean getBoolean(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    byte getByte(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    short getShort(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    int getInt(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    long getLong(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    float getFloat(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    double getDouble(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    Decimal getDecimal(int rowId, int precision, int scale) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    UTF8String getUTF8String(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    byte[] getBinary(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    int getArrayLength(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +    int getArrayOffset(int rowId) {
> +      throw new UnsupportedOperationException();
> +    }
> +  }
> +
> +  private static class BooleanAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableBitVector.Accessor accessor;
> +
> +    BooleanAccessor(NullableBitVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final boolean getBoolean(int rowId) {
> +      return accessor.get(rowId) == 1;
> +    }
> +  }
> +
> +  private static class ByteAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableTinyIntVector.Accessor accessor;
> +
> +    ByteAccessor(NullableTinyIntVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final byte getByte(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class ShortAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableSmallIntVector.Accessor accessor;
> +
> +    ShortAccessor(NullableSmallIntVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final short getShort(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class IntAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableIntVector.Accessor accessor;
> +
> +    IntAccessor(NullableIntVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final int getInt(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class LongAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableBigIntVector.Accessor accessor;
> +
> +    LongAccessor(NullableBigIntVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final long getLong(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class FloatAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableFloat4Vector.Accessor accessor;
> +
> +    FloatAccessor(NullableFloat4Vector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final float getFloat(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class DoubleAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableFloat8Vector.Accessor accessor;
> +
> +    DoubleAccessor(NullableFloat8Vector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final double getDouble(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class DecimalAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableDecimalVector.Accessor accessor;
> +
> +    DecimalAccessor(NullableDecimalVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final Decimal getDecimal(int rowId, int precision, int scale) {
> +      if (isNullAt(rowId)) return null;
> +      return Decimal.apply(accessor.getObject(rowId), precision, scale);
> +    }
> +  }
> +
> +  private static class StringAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableVarCharVector.Accessor accessor;
> +    private final NullableVarCharHolder stringResult = new
> NullableVarCharHolder();
> +
> +    StringAccessor(NullableVarCharVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final UTF8String getUTF8String(int rowId) {
> +      accessor.get(rowId, stringResult);
> +      if (stringResult.isSet == 0) {
> +        return null;
> +      } else {
> +        return UTF8String.fromAddress(null,
> +          stringResult.buffer.memoryAddress() + stringResult.start,
> +          stringResult.end - stringResult.start);
> +      }
> +    }
> +  }
> +
> +  private static class BinaryAccessor extends ArrowVectorAccessor {
> +
> +    private final NullableVarBinaryVector.Accessor accessor;
> +
> +    BinaryAccessor(NullableVarBinaryVector vector) {
> +      super(vector);
> +      this.accessor = vector.getAccessor();
> +    }
> +
> +    @Override
> +    final byte[] getBinary(int rowId) {
> +      return accessor.getObject(rowId);
> +    }
> +  }
> +
> +  private static class ArrayAccessor extends ArrowVectorAccessor {
> +
> +    private final UInt4Vector.Accessor accessor;
> +
> +    ArrayAccessor(ListVector vector) {
> +      super(vector);
> +      this.accessor = vector.getOffsetVector().getAccessor();
> +    }
> +
> +    @Override
> +    final int getArrayLength(int rowId) {
> +      return accessor.get(rowId + 1) - accessor.get(rowId);
> +    }
> +
> +    @Override
> +    final int getArrayOffset(int rowId) {
> +      return accessor.get(rowId);
> +    }
> +  }
> +
> +  private static class StructAccessor extends ArrowVectorAccessor {
> +
> +    StructAccessor(MapVector vector) {
> +      super(vector);
> +    }
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> index 0c027f8..7796638 100644
> ---
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> @@ -646,7 +646,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    /**
>     * Returns the decimal for rowId.
>     */
> -  public final Decimal getDecimal(int rowId, int precision, int scale) {
> +  public Decimal getDecimal(int rowId, int precision, int scale) {
>      if (precision <= Decimal.MAX_INT_DIGITS()) {
>        return Decimal.createUnsafe(getInt(rowId), precision, scale);
>      } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> @@ -661,7 +661,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    }
>
>
> -  public final void putDecimal(int rowId, Decimal value, int precision) {
> +  public void putDecimal(int rowId, Decimal value, int precision) {
>      if (precision <= Decimal.MAX_INT_DIGITS()) {
>        putInt(rowId, (int) value.toUnscaledLong());
>      } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> @@ -675,7 +675,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    /**
>     * Returns the UTF8String for rowId.
>     */
> -  public final UTF8String getUTF8String(int rowId) {
> +  public UTF8String getUTF8String(int rowId) {
>      if (dictionary == null) {
>        ColumnVector.Array a = getByteArray(rowId);
>        return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset,
> a.length);
> @@ -688,7 +688,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    /**
>     * Returns the byte array for rowId.
>     */
> -  public final byte[] getBinary(int rowId) {
> +  public byte[] getBinary(int rowId) {
>      if (dictionary == null) {
>        ColumnVector.Array array = getByteArray(rowId);
>        byte[] bytes = new byte[array.length];
> @@ -956,7 +956,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    /**
>     * Data type for this column.
>     */
> -  protected final DataType type;
> +  protected DataType type;
>
>    /**
>     * Number of nulls in this column. This is an optimization for the
> reader, to skip NULL checks.
> @@ -988,17 +988,17 @@ public abstract class ColumnVector implements
> AutoCloseable {
>    /**
>     * If this is a nested type (array or struct), the column for the child
> data.
>     */
> -  protected final ColumnVector[] childColumns;
> +  protected ColumnVector[] childColumns;
>
>    /**
>     * Reusable Array holder for getArray().
>     */
> -  protected final Array resultArray;
> +  protected Array resultArray;
>
>    /**
>     * Reusable Struct holder for getStruct().
>     */
> -  protected final ColumnarBatch.Row resultStruct;
> +  protected ColumnarBatch.Row resultStruct;
>
>    /**
>     * The Dictionary for this column.
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> new file mode 100644
> index 0000000..e9f6e7c
> --- /dev/null
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> @@ -0,0 +1,251 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized;
> +
> +import org.apache.spark.memory.MemoryMode;
> +import org.apache.spark.sql.types.*;
> +
> +/**
> + * An abstract class for read-only column vector.
> + */
> +public abstract class ReadOnlyColumnVector extends ColumnVector {
> +
> +  protected ReadOnlyColumnVector(int capacity, DataType type,
> MemoryMode memMode) {
> +    super(capacity, DataTypes.NullType, memMode);
> +    this.type = type;
> +    isConstant = true;
> +  }
> +
> +  //
> +  // APIs dealing with nulls
> +  //
> +
> +  @Override
> +  public final void putNotNull(int rowId) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putNull(int rowId) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putNulls(int rowId, int count) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putNotNulls(int rowId, int count) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Booleans
> +  //
> +
> +  @Override
> +  public final void putBoolean(int rowId, boolean value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putBooleans(int rowId, int count, boolean value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Bytes
> +  //
> +
> +  @Override
> +  public final void putByte(int rowId, byte value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putBytes(int rowId, int count, byte value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putBytes(int rowId, int count, byte[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Shorts
> +  //
> +
> +  @Override
> +  public final void putShort(int rowId, short value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putShorts(int rowId, int count, short value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putShorts(int rowId, int count, short[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Ints
> +  //
> +
> +  @Override
> +  public final void putInt(int rowId, int value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putInts(int rowId, int count, int value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putInts(int rowId, int count, int[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putIntsLittleEndian(int rowId, int count, byte[]
> src, int srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Longs
> +  //
> +
> +  @Override
> +  public final void putLong(int rowId, long value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putLongs(int rowId, int count, long value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putLongs(int rowId, int count, long[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putLongsLittleEndian(int rowId, int count, byte[]
> src, int srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with floats
> +  //
> +
> +  @Override
> +  public final void putFloat(int rowId, float value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putFloats(int rowId, int count, float value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putFloats(int rowId, int count, float[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putFloats(int rowId, int count, byte[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with doubles
> +  //
> +
> +  @Override
> +  public final void putDouble(int rowId, double value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putDoubles(int rowId, int count, double value) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putDoubles(int rowId, int count, double[] src,
> int srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final void putDoubles(int rowId, int count, byte[] src, int
> srcIndex) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Arrays
> +  //
> +
> +  @Override
> +  public final void putArray(int rowId, int offset, int length) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Byte Arrays
> +  //
> +
> +  @Override
> +  public final int putByteArray(int rowId, byte[] value, int offset,
> int count) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // APIs dealing with Decimals
> +  //
> +
> +  @Override
> +  public final void putDecimal(int rowId, Decimal value, int precision) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  //
> +  // Other APIs
> +  //
> +
> +  @Override
> +  public final void setDictionary(Dictionary dictionary) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public final ColumnVector reserveDictionaryIds(int capacity) {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  protected final void reserveInternal(int newCapacity) {
> +    throw new UnsupportedOperationException();
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> index 6af5c73..c913efe 100644
> ---
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> +++
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> @@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
>  private[sql] object ArrowConverters {
>
>    /**
> -   * Map a Spark DataType to ArrowType.
> -   */
> -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType
> = {
> -    dataType match {
> -      case BooleanType => ArrowType.Bool.INSTANCE
> -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize,
> true)
> -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize,
> true)
> -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
> -      case FloatType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> -      case DoubleType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> -      case ByteType => new ArrowType.Int(8, true)
> -      case StringType => ArrowType.Utf8.INSTANCE
> -      case BinaryType => ArrowType.Binary.INSTANCE
> -      case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: $dataType")
> -    }
> -  }
> -
> -  /**
> -   * Convert a Spark Dataset schema to Arrow schema.
> -   */
> -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
> -    val arrowFields = schema.fields.map { f =>
> -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
> List.empty[Field].asJava)
> -    }
> -    new Schema(arrowFields.toList.asJava)
> -  }
> -
> -  /**
>     * Maps Iterator from InternalRow to ArrowPayload. Limit
> ArrowRecordBatch size in ArrowPayload
>     * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
>     */
> @@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
>        batch: ArrowRecordBatch,
>        schema: StructType,
>        allocator: BufferAllocator): Array[Byte] = {
> -    val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
> +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
>      val root = VectorSchemaRoot.create(arrowSchema, allocator)
>      val out = new ByteArrayOutputStream()
>      val writer = new ArrowFileWriter(root, null,
> Channels.newChannel(out))
> @@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
>     * Create an Arrow ColumnWriter given the type and ordinal of row.
>     */
>    def apply(dataType: DataType, ordinal: Int, allocator:
> BufferAllocator): ColumnWriter = {
> -    val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
> +    val dtype = ArrowUtils.toArrowType(dataType)
>      dataType match {
>        case BooleanType => new BooleanColumnWriter(dtype, ordinal,
> allocator)
>        case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> new file mode 100644
> index 0000000..2caf1ef
> --- /dev/null
> +++
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> @@ -0,0 +1,109 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.arrow
> +
> +import scala.collection.JavaConverters._
> +
> +import org.apache.arrow.memory.RootAllocator
> +import org.apache.arrow.vector.types.FloatingPointPrecision
> +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType,
> Schema}
> +
> +import org.apache.spark.sql.types._
> +
> +object ArrowUtils {
> +
> +  val rootAllocator = new RootAllocator(Long.MaxValue)
> +
> +  // todo: support more types.
> +
> +  def toArrowType(dt: DataType): ArrowType = dt match {
> +    case BooleanType => ArrowType.Bool.INSTANCE
> +    case ByteType => new ArrowType.Int(8, true)
> +    case ShortType => new ArrowType.Int(8 * 2, true)
> +    case IntegerType => new ArrowType.Int(8 * 4, true)
> +    case LongType => new ArrowType.Int(8 * 8, true)
> +    case FloatType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> +    case DoubleType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> +    case StringType => ArrowType.Utf8.INSTANCE
> +    case BinaryType => ArrowType.Binary.INSTANCE
> +    case DecimalType.Fixed(precision, scale) => new
> ArrowType.Decimal(precision, scale)
> +    case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: ${dt.simpleString}")
> +  }
> +
> +  def fromArrowType(dt: ArrowType): DataType = dt match {
> +    case ArrowType.Bool.INSTANCE => BooleanType
> +    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 => ByteType
> +    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 2 => ShortType
> +    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 4 => IntegerType
> +    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 8 => LongType
> +    case float: ArrowType.FloatingPoint
> +      if float.getPrecision() == FloatingPointPrecision.SINGLE =>
> FloatType
> +    case float: ArrowType.FloatingPoint
> +      if float.getPrecision() == FloatingPointPrecision.DOUBLE =>
> DoubleType
> +    case ArrowType.Utf8.INSTANCE => StringType
> +    case ArrowType.Binary.INSTANCE => BinaryType
> +    case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
> +    case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: $dt")
> +  }
> +
> +  def toArrowField(name: String, dt: DataType, nullable: Boolean): Field
> = {
> +    dt match {
> +      case ArrayType(elementType, containsNull) =>
> +        val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE,
> null)
> +        new Field(name, fieldType, Seq(toArrowField("element",
> elementType, containsNull)).asJava)
> +      case StructType(fields) =>
> +        val fieldType = new FieldType(nullable,
> ArrowType.Struct.INSTANCE, null)
> +        new Field(name, fieldType,
> +          fields.map { field =>
> +            toArrowField(field.name, field.dataType, field.nullable)
> +          }.toSeq.asJava)
> +      case dataType =>
> +        val fieldType = new FieldType(nullable, toArrowType(dataType),
> null)
> +        new Field(name, fieldType, Seq.empty[Field].asJava)
> +    }
> +  }
> +
> +  def fromArrowField(field: Field): DataType = {
> +    field.getType match {
> +      case ArrowType.List.INSTANCE =>
> +        val elementField = field.getChildren().get(0)
> +        val elementType = fromArrowField(elementField)
> +        ArrayType(elementType, containsNull = elementField.isNullable)
> +      case ArrowType.Struct.INSTANCE =>
> +        val fields = field.getChildren().asScala.map { child =>
> +          val dt = fromArrowField(child)
> +          StructField(child.getName, dt, child.isNullable)
> +        }
> +        StructType(fields)
> +      case arrowType => fromArrowType(arrowType)
> +    }
> +  }
> +
> +  def toArrowSchema(schema: StructType): Schema = {
> +    new Schema(schema.map { field =>
> +      toArrowField(field.name, field.dataType, field.nullable)
> +    }.asJava)
> +  }
> +
> +  def fromArrowSchema(schema: Schema): StructType = {
> +    StructType(schema.getFields.asScala.map { field =>
> +      val dt = fromArrowField(field)
> +      StructField(field.getName, dt, field.isNullable)
> +    })
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> index 159328c..55b4655 100644
> ---
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> @@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
> SharedSQLContext with BeforeAndAfterAll {
>      val allocator = new RootAllocator(Long.MaxValue)
>      val jsonReader = new JsonFileReader(jsonFile, allocator)
>
> -    val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
> +    val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
>      val jsonSchema = jsonReader.start()
>      Validator.compareSchemas(arrowSchema, jsonSchema)
>
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> new file mode 100644
> index 0000000..638619f
> --- /dev/null
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> @@ -0,0 +1,65 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.arrow
> +
> +import org.apache.spark.SparkFunSuite
> +import org.apache.spark.sql.types._
> +
> +class ArrowUtilsSuite extends SparkFunSuite {
> +
> +  def roundtrip(dt: DataType): Unit = {
> +    dt match {
> +      case schema: StructType =>
> +
> assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
> === schema)
> +      case _ =>
> +        roundtrip(new StructType().add("value", dt))
> +    }
> +  }
> +
> +  test("simple") {
> +    roundtrip(BooleanType)
> +    roundtrip(ByteType)
> +    roundtrip(ShortType)
> +    roundtrip(IntegerType)
> +    roundtrip(LongType)
> +    roundtrip(FloatType)
> +    roundtrip(DoubleType)
> +    roundtrip(StringType)
> +    roundtrip(BinaryType)
> +    roundtrip(DecimalType.SYSTEM_DEFAULT)
> +  }
> +
> +  test("array") {
> +    roundtrip(ArrayType(IntegerType, containsNull = true))
> +    roundtrip(ArrayType(IntegerType, containsNull = false))
> +    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> containsNull = true))
> +    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> containsNull = true))
> +    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> containsNull = false))
> +    roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> containsNull = false))
> +  }
> +
> +  test("struct") {
> +    roundtrip(new StructType())
> +    roundtrip(new StructType().add("i", IntegerType))
> +    roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
> +    roundtrip(new StructType().add("i", IntegerType).add("arr",
> ArrayType(IntegerType)))
> +    roundtrip(new StructType().add(
> +      "struct",
> +      new StructType().add("i", IntegerType).add("arr",
> ArrayType(IntegerType))))
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> new file mode 100644
> index 0000000..d24a9e1
> --- /dev/null
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> @@ -0,0 +1,410 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized
> +
> +import org.apache.arrow.vector._
> +import org.apache.arrow.vector.complex._
> +
> +import org.apache.spark.SparkFunSuite
> +import org.apache.spark.sql.execution.arrow.ArrowUtils
> +import org.apache.spark.sql.types._
> +import org.apache.spark.unsafe.types.UTF8String
> +
> +class ArrowColumnVectorSuite extends SparkFunSuite {
> +
> +  test("boolean") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
> Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("boolean", BooleanType,
> nullable = true)
> +      .createVector(allocator).asInstanceOf[NullableBitVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === BooleanType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getBoolean(i) === (i % 2 == 0))
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
> (i % 2 == 0)))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("byte") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("byte", ByteType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableTinyIntVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i.toByte)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === ByteType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getByte(i) === i.toByte)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getBytes(0, 10) === (0 until 10).map(i =>
> i.toByte))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("short") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("short", ShortType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableSmallIntVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i.toShort)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === ShortType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getShort(i) === i.toShort)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getShorts(0, 10) === (0 until 10).map(i =>
> i.toShort))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("int") {
> +    val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
> 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("int", IntegerType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableIntVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === IntegerType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getInt(i) === i)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getInts(0, 10) === (0 until 10))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("long") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("long", LongType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableBigIntVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i.toLong)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === LongType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getLong(i) === i.toLong)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getLongs(0, 10) === (0 until 10).map(i =>
> i.toLong))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("float") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("float", FloatType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableFloat4Vector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i.toFloat)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === FloatType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getFloat(i) === i.toFloat)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getFloats(0, 10) === (0 until 10).map(i =>
> i.toFloat))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("double") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("double", DoubleType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableFloat8Vector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      mutator.setSafe(i, i.toDouble)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === DoubleType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getDouble(i) === i.toDouble)
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
> i.toDouble))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("string") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("string", StringType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableVarCharVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      val utf8 = s"str$i".getBytes("utf8")
> +      mutator.setSafe(i, utf8, 0, utf8.length)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === StringType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getUTF8String(i) ===
> UTF8String.fromString(s"str$i"))
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("binary") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +
> +    (0 until 10).foreach { i =>
> +      val utf8 = s"str$i".getBytes("utf8")
> +      mutator.setSafe(i, utf8, 0, utf8.length)
> +    }
> +    mutator.setNull(10)
> +    mutator.setValueCount(11)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === BinaryType)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    (0 until 10).foreach { i =>
> +      assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
> +    }
> +    assert(columnVector.isNullAt(10))
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("array") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
> +    val vector = ArrowUtils.toArrowField("array",
> ArrayType(IntegerType), nullable = true)
> +      .createVector(allocator).asInstanceOf[ListVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +    val elementVector =
> vector.getDataVector().asInstanceOf[NullableIntVector]
> +    val elementMutator = elementVector.getMutator()
> +
> +    // [1, 2]
> +    mutator.startNewValue(0)
> +    elementMutator.setSafe(0, 1)
> +    elementMutator.setSafe(1, 2)
> +    mutator.endValue(0, 2)
> +
> +    // [3, null, 5]
> +    mutator.startNewValue(1)
> +    elementMutator.setSafe(2, 3)
> +    elementMutator.setNull(3)
> +    elementMutator.setSafe(4, 5)
> +    mutator.endValue(1, 3)
> +
> +    // null
> +
> +    // []
> +    mutator.startNewValue(3)
> +    mutator.endValue(3, 0)
> +
> +    elementMutator.setValueCount(5)
> +    mutator.setValueCount(4)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === ArrayType(IntegerType))
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    val array0 = columnVector.getArray(0)
> +    assert(array0.numElements() === 2)
> +    assert(array0.getInt(0) === 1)
> +    assert(array0.getInt(1) === 2)
> +
> +    val array1 = columnVector.getArray(1)
> +    assert(array1.numElements() === 3)
> +    assert(array1.getInt(0) === 3)
> +    assert(array1.isNullAt(1))
> +    assert(array1.getInt(2) === 5)
> +
> +    assert(columnVector.isNullAt(2))
> +
> +    val array3 = columnVector.getArray(3)
> +    assert(array3.numElements() === 0)
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +
> +  test("struct") {
> +    val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
> +    val schema = new StructType().add("int", IntegerType).add("long",
> LongType)
> +    val vector = ArrowUtils.toArrowField("struct", schema, nullable =
> true)
> +      .createVector(allocator).asInstanceOf[NullableMapVector]
> +    vector.allocateNew()
> +    val mutator = vector.getMutator()
> +    val intVector =
> vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
> +    val intMutator = intVector.getMutator()
> +    val longVector =
> vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
> +    val longMutator = longVector.getMutator()
> +
> +    // (1, 1L)
> +    mutator.setIndexDefined(0)
> +    intMutator.setSafe(0, 1)
> +    longMutator.setSafe(0, 1L)
> +
> +    // (2, null)
> +    mutator.setIndexDefined(1)
> +    intMutator.setSafe(1, 2)
> +    longMutator.setNull(1)
> +
> +    // (null, 3L)
> +    mutator.setIndexDefined(2)
> +    intMutator.setNull(2)
> +    longMutator.setSafe(2, 3L)
> +
> +    // null
> +    mutator.setNull(3)
> +
> +    // (5, 5L)
> +    mutator.setIndexDefined(4)
> +    intMutator.setSafe(4, 5)
> +    longMutator.setSafe(4, 5L)
> +
> +    intMutator.setValueCount(5)
> +    longMutator.setValueCount(5)
> +    mutator.setValueCount(5)
> +
> +    val columnVector = new ArrowColumnVector(vector)
> +    assert(columnVector.dataType === schema)
> +    assert(columnVector.anyNullsSet)
> +    assert(columnVector.numNulls === 1)
> +
> +    val row0 = columnVector.getStruct(0, 2)
> +    assert(row0.getInt(0) === 1)
> +    assert(row0.getLong(1) === 1L)
> +
> +    val row1 = columnVector.getStruct(1, 2)
> +    assert(row1.getInt(0) === 2)
> +    assert(row1.isNullAt(1))
> +
> +    val row2 = columnVector.getStruct(2, 2)
> +    assert(row2.isNullAt(0))
> +    assert(row2.getLong(1) === 3L)
> +
> +    assert(columnVector.isNullAt(3))
> +
> +    val row4 = columnVector.getStruct(4, 2)
> +    assert(row4.getInt(0) === 5)
> +    assert(row4.getLong(1) === 5L)
> +
> +    columnVector.close()
> +    allocator.close()
> +  }
> +}
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail:

> commits-unsubscribe@.apache

> For additional commands, e-mail:

> commits-help@.apache

>
> ---------------------------------------------------------------------
> To unsubscribe e-mail:

> dev-unsubscribe@.apache





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-spark-git-commit-SPARK-21472-SQL-Introduce-ArrowColumnVector-as-a-reader-for-Arrow-vectors-tp22003p22004.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin
Loading...