Spark SQL StructType error

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark SQL StructType error

nsalian
This post has NOT been accepted by the mailing list yet.
Hello,

I am parsing a text file and inserting the parsed values into a Hive table.

Code:

files = sc.wholeTextFiles("hdfs://nameservice1:8020/user/root/email.txt", minPartitions=16, use_unicode=True)
# Putting unicode to False didn't help either


sqlContext.sql("DROP TABLE emails")
sqlContext.sql("CREATE TABLE IF NOT EXISTS emails (subject STRING, email_to STRING, email_from STRING, date DATE, from_name STRING, to_name STRING)")
#df = sqlContext.sql("SELECT * FROM emails limit 0")

fields = [StructField("subject", StringType(), True),
          StructField("email_to", StringType(), True),
          StructField("email_to", IntegerType(), True),
          StructField("date", DateType(), True),
          StructField("from_name", StringType(), True),
          StructField("to_name", StringType(), True)]

schema = StructType(fields)


for x,v in files.collect():
    j = Parser()
    headers = j.parsestr(str(v))
    subject = str(headers['subject'])
    email_to = str(headers['to'])
    email_from = str(headers['from'])
    date = str(headers['date'])
    from_name = str(headers['x-from'])
    to_name = str(headers['x-to'])
    emaildf = sqlContext.createDataFrame(Row(subject,email_to,email_from,date,from_name,to_name),schema)\
        .registerTempTable("emailTemp")

sc.stop()


Error:
Traceback (most recent call last):
  File "project.py", line 49, in <module>
    emaildf = sqlContext.createDataFrame(Row(subject,email_to,email_from,date,from_name,to_name),schema)\
  File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 425, in createDataFrame
  File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 350, in _createFromLocal
  File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1134, in _verify_type
   
TypeError: StructType can not accept object 'Hello Subject' in type <type 'str'>

Not sure if this is a bug or something I am doing wrong.
Any thoughts?
Neelesh S. Salian  
Cloudera