HashingTFModel/IDFModel in Structured Streaming

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

HashingTFModel/IDFModel in Structured Streaming

Davis Varghese
 I have built a ML pipeline model on a static twitter data for sentiment
analysis. When I use the model on a structured stream, it always throws
"Queries with streaming sources must be executed with writeStream.start()".
This particular model doesn't contain any documented "unsupported"
operations. It only calls the transform() method of the stages. Anyone have
encountered the issue? if the model doesn't contain HashingTFModel/IDFModel,
it works fine, but then I can not create feature vectors from the tweet.

Thanks,
Davis



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Joseph Bradley
Hi Davis,
We've started tracking these issues under this umbrella: https://issues.apache.org/jira/browse/SPARK-21926
I'm hoping we can fix some of these for 2.3.
Thanks,
Joseph

On Mon, Oct 16, 2017 at 9:23 PM, Davis Varghese <[hidden email]> wrote:
 I have built a ML pipeline model on a static twitter data for sentiment
analysis. When I use the model on a structured stream, it always throws
"Queries with streaming sources must be executed with writeStream.start()".
This particular model doesn't contain any documented "unsupported"
operations. It only calls the transform() method of the stages. Anyone have
encountered the issue? if the model doesn't contain HashingTFModel/IDFModel,
it works fine, but then I can not create feature vectors from the tweet.

Thanks,
Davis



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Bago Amirbekian
Davis I'm looking into this. If you could include some code that I can use to reproduce the error & the stack trace it would be really helpful.

On Fri, Oct 20, 2017 at 11:01 AM Joseph Bradley <[hidden email]> wrote:
Hi Davis,
We've started tracking these issues under this umbrella: https://issues.apache.org/jira/browse/SPARK-21926
I'm hoping we can fix some of these for 2.3.
Thanks,
Joseph

On Mon, Oct 16, 2017 at 9:23 PM, Davis Varghese <[hidden email]> wrote:
 I have built a ML pipeline model on a static twitter data for sentiment
analysis. When I use the model on a structured stream, it always throws
"Queries with streaming sources must be executed with writeStream.start()".
This particular model doesn't contain any documented "unsupported"
operations. It only calls the transform() method of the stages. Anyone have
encountered the issue? if the model doesn't contain HashingTFModel/IDFModel,
it works fine, but then I can not create feature vectors from the tweet.

Thanks,
Davis



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Davis Varghese
Sure. I will get one over the weekend



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Bago Amirbekian
Davis, were you able to find an example? Anything you have could help help.

On Wed, Nov 1, 2017 at 8:53 PM Davis Varghese <[hidden email]> wrote:
Sure. I will get one over the weekend



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Davis Varghese
Bago,

The code I wrote is not generating the issue. In our case, we build a ML
pipeline from a UI and is done in a particular fashion so that a user can
create a pipeline behind the scene using drag and drop. I am yet to dig
deeper to recreate the same as a standalone code. Meanwhile I am sharing a
similar which I wrote here. Hope to find time next week to get the correct
one.

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class StreamingIssueCountVectorizerSplit {

  public static void main(String[] args) throws Exception{
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", "Positive"),
        RowFactory.create("fantastic morning match", "Positive"),
        RowFactory.create("good morning", "Positive"),
        RowFactory.create("boring evening", "Negative"),
        RowFactory.create("tragic evening event", "Negative"),
        RowFactory.create("today is bad ", "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning"),
        RowFactory.create("bad evening")
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("features")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2).setBinary(true);
    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);
    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);
    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);
   

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);

   
trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel =
TrainValidationSplitModel.load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) ( _loadedModel).bestModel();

    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    List<Row> rows = predicted.select("tweet", "predicted").collectAsList();
    for (Row r : rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data
    Dataset<Row> lines = sparkSession
        .readStream()
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load();
    lines = lines.withColumnRenamed("value", "tweet");


    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Davis Varghese
In reply to this post by Bago Amirbekian
Bago,

Finally I am able to create one which fails consistently. I think the issue
is caused by the VectorAssembler in the model. In the new code, I have 2
features(1 text and 1 number) and I have to run through a VectorAssembler
before giving to LogisticRegression. Code and test data below

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", 1, "Positive"),
        RowFactory.create("fantastic morning match", 1, "Positive"),
        RowFactory.create("good morning", 1, "Positive"),
        RowFactory.create("boring evening", 5, "Negative"),
        RowFactory.create("tragic evening event", 5, "Negative"),
        RowFactory.create("today is bad ", 5, "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning", 1),
        RowFactory.create("bad evening", 5)
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("wordfeatures")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2)
        .setBinary(true);

    VectorAssembler vectorAssembler = new VectorAssembler()
        .setInputCols(new String[]{"wordfeatures", "time"}).
            setOutputCol("features");

    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);

    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);

    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, vectorAssembler,
                lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);

   
trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
        .load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();

    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    predicted.show();
    List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
    for (Row r : _rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data

    Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
        .schema(testSchema).option("header", "true").option("inferSchema",
"true")
        .format("com.databricks.spark.csv")
        .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");

    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}

*##Test data csv file*
tweet,time
Today is a bright sunny day,2
How is everyone feeling in office?,2
I want beef cake. Where is it?,2
The weather sucks today,2
I like Vat69.,5
I don't care,5
Wassup,5
Skyfall sucks!,5


*Output*
*--------*


+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|        tweet|time|           words| wordfeatures|         features|      
rawPrediction|         probability|prediction|predicted|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|sunny morning|   1|[sunny,
morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|      
0.0| Positive|
|  bad evening|   5|  [bad,
evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|      
1.0| Negative|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+

[sunny morning], prediction=Positive
[bad evening], prediction=Negative
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
        at
org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
        at
StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Bago Amirbekian
There is a known issue with VectorAssembler which causes it to fail in streaming if any of the input columns are of VectorType & don't have size information, https://issues.apache.org/jira/browse/SPARK-22346.

This can be fixed by adding size information to the vector columns, I've made a PR to add a transformer to spark to help with this, https://github.com/apache/spark/pull/19746. It would be awesome if you could take a look and see if this would fix your issue.

On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <[hidden email]> wrote:
Bago,

Finally I am able to create one which fails consistently. I think the issue
is caused by the VectorAssembler in the model. In the new code, I have 2
features(1 text and 1 number) and I have to run through a VectorAssembler
before giving to LogisticRegression. Code and test data below

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", 1, "Positive"),
        RowFactory.create("fantastic morning match", 1, "Positive"),
        RowFactory.create("good morning", 1, "Positive"),
        RowFactory.create("boring evening", 5, "Negative"),
        RowFactory.create("tragic evening event", 5, "Negative"),
        RowFactory.create("today is bad ", 5, "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning", 1),
        RowFactory.create("bad evening", 5)
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("wordfeatures")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2)
        .setBinary(true);

    VectorAssembler vectorAssembler = new VectorAssembler()
        .setInputCols(new String[]{"wordfeatures", "time"}).
            setOutputCol("features");

    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);

    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);

    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, vectorAssembler,
                lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);


trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
        .load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();

    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    predicted.show();
    List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
    for (Row r : _rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data

    Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
        .schema(testSchema).option("header", "true").option("inferSchema",
"true")
        .format("com.databricks.spark.csv")
        .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");

    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}

*##Test data csv file*
tweet,time
Today is a bright sunny day,2
How is everyone feeling in office?,2
I want beef cake. Where is it?,2
The weather sucks today,2
I like Vat69.,5
I don't care,5
Wassup,5
Skyfall sucks!,5


*Output*
*--------*


+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|        tweet|time|           words| wordfeatures|         features|
rawPrediction|         probability|prediction|predicted|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|sunny morning|   1|[sunny,
morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|
0.0| Positive|
|  bad evening|   5|  [bad,
evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|
1.0| Negative|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+

[sunny morning], prediction=Positive
[bad evening], prediction=Negative
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
        at
org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
        at
StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Jorge Sánchez
Hi,

after seeing that IDF needed refactoring to use ML vectors instead of MLLib ones, I have created a Jira ticket in https://issues.apache.org/jira/browse/SPARK-22531 and submitted a PR for it.
If anyone can have a look and suggest any changes it would be really appreciated.

Thank you.


2017-11-15 1:11 GMT+00:00 Bago Amirbekian <[hidden email]>:
There is a known issue with VectorAssembler which causes it to fail in streaming if any of the input columns are of VectorType & don't have size information, https://issues.apache.org/jira/browse/SPARK-22346.

This can be fixed by adding size information to the vector columns, I've made a PR to add a transformer to spark to help with this, https://github.com/apache/spark/pull/19746. It would be awesome if you could take a look and see if this would fix your issue.

On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <[hidden email]> wrote:
Bago,

Finally I am able to create one which fails consistently. I think the issue
is caused by the VectorAssembler in the model. In the new code, I have 2
features(1 text and 1 number) and I have to run through a VectorAssembler
before giving to LogisticRegression. Code and test data below

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", 1, "Positive"),
        RowFactory.create("fantastic morning match", 1, "Positive"),
        RowFactory.create("good morning", 1, "Positive"),
        RowFactory.create("boring evening", 5, "Negative"),
        RowFactory.create("tragic evening event", 5, "Negative"),
        RowFactory.create("today is bad ", 5, "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning", 1),
        RowFactory.create("bad evening", 5)
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("wordfeatures")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2)
        .setBinary(true);

    VectorAssembler vectorAssembler = new VectorAssembler()
        .setInputCols(new String[]{"wordfeatures", "time"}).
            setOutputCol("features");

    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);

    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);

    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, vectorAssembler,
                lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);


trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
        .load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();

    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    predicted.show();
    List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
    for (Row r : _rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data

    Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
        .schema(testSchema).option("header", "true").option("inferSchema",
"true")
        .format("com.databricks.spark.csv")
        .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");

    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}

*##Test data csv file*
tweet,time
Today is a bright sunny day,2
How is everyone feeling in office?,2
I want beef cake. Where is it?,2
The weather sucks today,2
I like Vat69.,5
I don't care,5
Wassup,5
Skyfall sucks!,5


*Output*
*--------*


+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|        tweet|time|           words| wordfeatures|         features|
rawPrediction|         probability|prediction|predicted|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|sunny morning|   1|[sunny,
morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|
0.0| Positive|
|  bad evening|   5|  [bad,
evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|
1.0| Negative|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+

[sunny morning], prediction=Positive
[bad evening], prediction=Negative
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
        at
org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
        at
StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: HashingTFModel/IDFModel in Structured Streaming

Davis Varghese
In reply to this post by Bago Amirbekian
Since we are on spark 2.2, I backported/fixed it. Here is the diff file
comparing against
https://github.com/apache/spark/blob/73fe1d8087cfc2d59ac5b9af48b4cf5f5b86f920/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala

24c24
< import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
---
> import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators,
> IntParam}
44c44,46
<   val size = new Param[Int](this, "size", "Size of vectors in column.",
{s: Int => s >= 0})
---
>   val size: IntParam =
>     new IntParam(this, "size", "Size of vectors in column.",
> ParamValidators.gt(0))
>
57c59
<   @Since("2.3.0")
---
> /*  @Since("2.3.0")
64c66
<     ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))
---
>     ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))*/
134c136
<   override def copy(extra: ParamMap): VectorAssembler = defaultCopy(extra)
---
>   override def copy(extra: ParamMap): VectorSizeHint = defaultCopy(extra)



The first 2 changes are required to make it save the model with
VectorSizeHint info
3rd one is required as the overridden method is final in spark 2.2
4th one was wrong code as it was giving ClassCastException


Here is the working code after using this new transformer

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", 1, "Positive"),
        RowFactory.create("fantastic morning match", 1, "Positive"),
        RowFactory.create("good morning", 1, "Positive"),
        RowFactory.create("boring evening", 5, "Negative"),
        RowFactory.create("tragic evening event", 5, "Negative"),
        RowFactory.create("today is bad ", 5, "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning", 1),
        RowFactory.create("bad evening", 5)
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("wordfeatures")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2)
        .setBinary(true);

    VectorSizeHint wordfeatures = new VectorSizeHint();
    wordfeatures.setInputCol("wordfeatures");
    wordfeatures.setSize(3);

    VectorAssembler vectorAssembler = new VectorAssembler()
        .setInputCols(new String[]{"wordfeatures", "time"}).
            setOutputCol("features");

    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);

    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);

    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, wordfeatures, vectorAssembler, lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);

   
trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
        .load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();
    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    predicted.show();
    List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
    for (Row r : _rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data

    Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
        .schema(testSchema).option("header", "true").option("inferSchema",
"true")
        .format("com.databricks.spark.csv")
        .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");

    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]