Spark - Kinesis integration needs improvements

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
Report Content as Inappropriate

Spark - Kinesis integration needs improvements

Yash Sharma
Hello fellow spark devs, hope you are doing fabulous,

Dropping a brain dump here about the Spark kinesis integration. I am able to get spark kinesis to work perfectly under ideal conditions, but see a lot of open ends when things are not so ideal. I feel there are lot of open ends and are specific to Kinesis (and need attention).

Unlike Kafka, which works flawlessly with Spark, Kinesis has its own sets of issues involving Throttling and recovery. I would like to volunteer to fix few such issues faced by us. Proposing some critical improvements which would make Kinesis more reliable with Spark:

1. Kinesis Retries : Right now kinesis retries are hard coded with a very small value of 100 ms. While this is ok for normal errors while fetching, its not ok for Throttling errors. The Kinesis throttles are lot more frequent in a recovery situation (will talk more on it in point 3).The Kinesis throttles are per second, hence a retry in 100 ms will not take it any far. We should have these values configurable to have more control over the retries. https://github.com/apache/spark/pull/17467

2. Kinesis recovery from provided timestamp : Kinesis client lib has the capability of resuming from a TIMESTAMP and this is very useful when we would want to start from a specified time in past rather than reading from INITIAL position. We have few scenarios where we need to read a huge stream and filter out all data before a timestamp. This change will push this to kinesis client and get only relevant data. I have got a half-baked patch for this. https://issues.apache.org/jira/browse/SPARK-20168

3. Kinesis recovery : This issue is a bit tricky and I would need help of experienced contributors for a proper design. 
Kinesis in ideal scenarios uses the kinesis consumer library, and the library takes care of the requests made to shards. But in case of recovery the Kinesis consumer makes direct requests to Kinesis from all the tasks. Thousands of tasks request to the all the shards (multiple requests per shards). Unlike Kafka where the requests are just network bound, Kinesis requests are Read throttled if lot of requests are made in a second. Not sure what would be the best way to make these requests organized but we should have some way of limiting number of request per shard. We use workaround of increasing the fail tasks tolerance to a ridiculously high value and the job completes after a thousand failures. Point 1 mentioned above ^ can also be used in this workaround to delay the retries so that the request volume decreases per shard. I don't have a ticket for it yet but will get one soon.

4. Spark doesn't work with AWS temporary tokens : Currently spark just reads the aws tokens using various strategies. Temporary session tokens are more secure and (probably) encouraged way of using the AWS credentials. Temporary tokens are also useful for accessing cross account aws resources. Currently not lot of usecases are using temporary tokens, but soon this will be a common scenario. We are already facing some minor challenges and resorting to workarounds until its supported in Spark. We should have a mechanism to make Spark work with the temporary aws tokens. Thoughts ?

I am trying to get familiar to Spark's code base along the process. I will be primarily focussing on the spark-kinesis bit for now. I would like to have your valuable inputs on these.