Re: Error Saving Dataframe to Hive with Spark 2.0.0

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Error Saving Dataframe to Hive with Spark 2.0.0

Chetan Khatri
Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc-- 
I don't like everytime changing the service versions !

Thanks.

On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski <[hidden email]> wrote:
Hi, 

I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR since. 

Jacek 


On 29 Jan 2017 9:24 a.m., "Chetan Khatri" <[hidden email]> wrote:
Hello Spark Users,

I am getting error while saving Spark Dataframe to Hive Table:
Hive 1.2.1
Spark 2.0.0
Local environment.
Note: Job is getting executed successfully and the way I want but still Exception raised.
Source Code:

package com.chetan.poc.hbase

/**
* Created by chetan on 24/1/17.
*/
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.KeyValue.Type
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._
import java.util.Date
import java.text.SimpleDateFormat


object IncrementalJob {
val APP_NAME: String = "SparkHbaseJob"
var HBASE_DB_HOST: String = null
var HBASE_TABLE: String = null
var HBASE_COLUMN_FAMILY: String = null
var HIVE_DATA_WAREHOUSE: String = null
var HIVE_TABLE_NAME: String = null
def main(args: Array[String]) {
// Initializing HBASE Configuration variables
HBASE_DB_HOST="127.0.0.1"
HBASE_TABLE="university"
HBASE_COLUMN_FAMILY="emp"
// Initializing Hive Metastore configuration
HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse"
// Initializing Hive table name - Target table
HIVE_TABLE_NAME = "employees"
// setting spark application
// val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local")
//initialize the spark context
//val sparkContext = new SparkContext(sparkConf)
//val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
// Enable Hive with Hive warehouse in SparkSession
val spark = SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir", HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE)
conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY)
// Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples from the table
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

println(hBaseRDD.count())
//hBaseRDD.foreach(println)

//keyValue is a RDD[java.util.list[hbase.KeyValue]]
val keyValue = hBaseRDD.map(x => x._2).map(_.list)

//outPut is a RDD[String], in which each line represents a record in HBase
val outPut = keyValue.flatMap(x => x.asScala.map(cell =>

HBaseResult(
Bytes.toInt(CellUtil.cloneRow(cell)),
Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
cell.getTimestamp,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date(cell.getTimestamp.toLong)),
Bytes.toStringBinary(CellUtil.cloneValue(cell)),
Type.codeToType(cell.getTypeByte).toString
)
)
).toDF()
// Output dataframe
outPut.show

// get timestamp
val datetimestamp_threshold = "2016-08-25 14:27:02:001"
val datetimestampformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime()

// Resultset filteration based on timestamp
val filtered_output_timestamp = outPut.filter($"colDatetime" >= datetimestampformat)
// Resultset filteration based on rowkey
val filtered_output_row = outPut.filter($"colDatetime".between(1668493360,1668493365))


// Saving Dataframe to Hive Table Successfully.
filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)
}
case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: String, colDatetime: Long, colDatetimeStr: String, colValue: String, colType: String)
}

Error:
17/01/29 13:51:53 INFO metastore.HiveMetaStore: 0: create_database: Database(name:default, description:default database, locationUri:hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{})
17/01/29 13:51:53 INFO HiveMetaStore.audit: ugi=hduser	ip=unknown-ip-addr	cmd=create_database: Database(name:default, description:default database, locationUri:hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{})	
17/01/29 13:51:53 ERROR metastore.RetryingHMSHandler: AlreadyExistsException(message:Database default already exists)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
	at com.sun.proxy.$Proxy21.create_database(Unknown Source)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
	at com.sun.proxy.$Proxy22.createDatabase(Unknown Source)
	at org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:280)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:269)
	at org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:308)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
	at org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:98)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89)
	at org.apache.spark.sql.hive.HiveSessionCatalog.<init>(HiveSessionCatalog.scala:51)
	at org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:49)
	at org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
	at org.apache.spark.sql.hive.HiveSessionState$$anon$1.<init>(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
	at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
	at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:441)
	at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:395)
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
	at com.chetan.poc.hbase.IncrementalJob$.main(IncrementalJob.scala:58)
	at com.chetan.poc.hbase.IncrementalJob.main(IncrementalJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Error Saving Dataframe to Hive with Spark 2.0.0

Michael Allman-2
That's understandable. Maybe I can help. :)

What happens if you set `HIVE_TABLE_NAME = "default.employees"`?

Also, does that table exist before you call `filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)`?

Cheers,

Michael

On Jan 29, 2017, at 9:52 PM, Chetan Khatri <[hidden email]> wrote:

Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc-- 
I don't like everytime changing the service versions !

Thanks.

On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski <[hidden email]> wrote:
Hi, 

I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR since. 

Jacek 


On 29 Jan 2017 9:24 a.m., "Chetan Khatri" <[hidden email]> wrote:
Hello Spark Users,

I am getting error while saving Spark Dataframe to Hive Table:
Hive 1.2.1
Spark 2.0.0
Local environment.
Note: Job is getting executed successfully and the way I want but still Exception raised.
Source Code:

package com.chetan.poc.hbase

/**
* Created by chetan on 24/1/17.
*/
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.KeyValue.Type
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._
import java.util.Date
import java.text.SimpleDateFormat


object IncrementalJob {
val APP_NAME: String = "SparkHbaseJob"
var HBASE_DB_HOST: String = null
var HBASE_TABLE: String = null
var HBASE_COLUMN_FAMILY: String = null
var HIVE_DATA_WAREHOUSE: String = null
var HIVE_TABLE_NAME: String = null
def main(args: Array[String]) {
// Initializing HBASE Configuration variables
HBASE_DB_HOST="127.0.0.1"
HBASE_TABLE="university"
HBASE_COLUMN_FAMILY="emp"
// Initializing Hive Metastore configuration
HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse"
// Initializing Hive table name - Target table
HIVE_TABLE_NAME = "employees"
// setting spark application
// val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local")
//initialize the spark context
//val sparkContext = new SparkContext(sparkConf)
//val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
// Enable Hive with Hive warehouse in SparkSession
val spark = SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir", HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE)
conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY)
// Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples from the table
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

println(hBaseRDD.count())
//hBaseRDD.foreach(println)

//keyValue is a RDD[java.util.list[hbase.KeyValue]]
val keyValue = hBaseRDD.map(x => x._2).map(_.list)

//outPut is a RDD[String], in which each line represents a record in HBase
val outPut = keyValue.flatMap(x => x.asScala.map(cell =>

HBaseResult(
Bytes.toInt(CellUtil.cloneRow(cell)),
Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
cell.getTimestamp,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date(cell.getTimestamp.toLong)),
Bytes.toStringBinary(CellUtil.cloneValue(cell)),
Type.codeToType(cell.getTypeByte).toString
)
)
).toDF()
// Output dataframe
outPut.show

// get timestamp
val datetimestamp_threshold = "2016-08-25 14:27:02:001"
val datetimestampformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime()

// Resultset filteration based on timestamp
val filtered_output_timestamp = outPut.filter($"colDatetime" >= datetimestampformat)
// Resultset filteration based on rowkey
val filtered_output_row = outPut.filter($"colDatetime".between(1668493360,1668493365))


// Saving Dataframe to Hive Table Successfully.
filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)
}
case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: String, colDatetime: Long, colDatetimeStr: String, colValue: String, colType: String)
}

Error:
17/01/29 13:51:53 INFO metastore.HiveMetaStore: 0: create_database: Database(name:default, description:default database, locationUri:<a href="hdfs://localhost:9" class="">hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{})
17/01/29 13:51:53 INFO HiveMetaStore.audit: ugi=hduser	ip=unknown-ip-addr	cmd=create_database: Database(name:default, description:default database, locationUri:<a href="hdfs://localhost:9" class="">hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{})	
17/01/29 13:51:53 ERROR metastore.RetryingHMSHandler: AlreadyExistsException(message:Database default already exists)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
	at com.sun.proxy.$Proxy21.create_database(Unknown Source)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
	at com.sun.proxy.$Proxy22.createDatabase(Unknown Source)
	at org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:280)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:269)
	at org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:308)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
	at org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:98)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89)
	at org.apache.spark.sql.hive.HiveSessionCatalog.<init>(HiveSessionCatalog.scala:51)
	at org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:49)
	at org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
	at org.apache.spark.sql.hive.HiveSessionState$$anon$1.<init>(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
	at org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
	at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
	at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:441)
	at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:395)
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
	at com.chetan.poc.hbase.IncrementalJob$.main(IncrementalJob.scala:58)
	at com.chetan.poc.hbase.IncrementalJob.main(IncrementalJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks.