spark main thread quit, but the driver don't crash at standalone cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

spark main thread quit, but the driver don't crash at standalone cluster

John Fang
My spark main thread create some daemon threads which maybe timer thread. Then the spark application throw some exceptions, and the main thread will quit. But the jvm of driver don't crash for standalone cluster. Of course the question don't happen at yarn cluster. Because the application master will monitor the main thread of applicaiton, but the stanalone cluster can't.
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
})

scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
def run() {
try {
System.out.println("runable")
} catch {
case e: Exception => {
System.out.println("ScheduledTask persistAllConsumerOffset exception", e)
}
}
}
}, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10)
wordCounts.foreachRDD{rdd =>
rdd.collect().foreach(println)
throw new RuntimeException //exception
}
ssc.start()
try {
ssc.awaitTermination()
} catch {
case e: Exception => {
System.out.println("end!!!!!")
throw e
}
}



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: spark main thread quit, but the driver don't crash at standalone cluster

Liang-Chi Hsieh

Can you just cancel the `ScheduledFuture` returned by `scheduleAtFixedRate` when catching the exception in the main thread of the spark driver?

John Fang wrote
My spark main thread create some daemon threads which maybe timer thread. Then the spark application throw some exceptions, and the main thread will quit. But the jvm of driver don't crash for standalone cluster. Of course the question don't happen at yarn cluster. Because the application master will monitor the main thread of applicaiton, but the stanalone cluster can't. for example:val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
    def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
    def run() {
      try {
        System.out.println("runable")
      } catch {
        case e: Exception => {
          System.out.println("ScheduledTask persistAllConsumerOffset exception", e)
        }
      }
    }
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException  //exception
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
    System.out.println("end!!!!!")
    throw e
  }
}
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
Loading...