

Hi Spark devs,
The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
Just a rough idea so any feedbacks are appreciated.
Thanks, Jungtaek Lim (HeartSaVioR)


Another option could be to use a sketch to get approx median(extendable to quantiles as well) for a large number of tasks sketch would give accurate value as tasks are few, for larger task the benefit will be good. Regards, Mayur Rustagi Ph: +1 (650) 937 9673 Hi Spark devs,
The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
Just a rough idea so any feedbacks are appreciated.
Thanks, Jungtaek Lim (HeartSaVioR)


How big is the overhead, at scale?
If it has a nontrivial effect for most jobs, I could imagine reusing
the existing approximate quantile support to more efficiently find a
prettyclose median.
On Wed, Nov 27, 2019 at 3:55 AM Jungtaek Lim
< [hidden email]> wrote:
>
> Hi Spark devs,
>
> The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
>
> When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
>
> I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
>
> Just a rough idea so any feedbacks are appreciated.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)

To unsubscribe email: [hidden email]


Thanks all for providing inputs! Maybe I wasn't clear about my intention.
The issue I focus on is; there're plenty of metrics being defined in a stage for SQL, and each metric has values for each task and being grouped later to calculate aggregated values. (e.g. metric for "elapsed time" is shown in UI as sum, min, med, max  which source values come from each task)
Due to the nature of exact calculation of "median", we can't apply accumulation  we are now storing all values for all metrics till the end of stage. Given the default value of sql shuffle partition is 200, a stage would have 200 tasks when we deal with shuffle (grouping, join, etc.). If we have 50 metrics in a stage, 10000 Long values are maintained in driver side which may ideally just need to be 50 * number of aggregation (at most 4) if all of aggregations support accumulation. So I'm wondering something which could support accumulation and closer to median. (I guess it's intentional to not take average here so...)
What's more on SQLAppStatusListener, they're calculated altogether at the end of SQL execution, which may contain multiple jobs. (Oh wait... Hmm... Looks like I missed the another point of optimization here which might mitigate the issue heavily... so please treat my idea as rough idea just for possible optimization.)
But again that's very rough idea, and it won't make sense if the expected output is not acceptable as representation.
Jungtaek Lim (HeartSaVioR)
How big is the overhead, at scale?
If it has a nontrivial effect for most jobs, I could imagine reusing
the existing approximate quantile support to more efficiently find a
prettyclose median.
On Wed, Nov 27, 2019 at 3:55 AM Jungtaek Lim
<[hidden email]> wrote:
>
> Hi Spark devs,
>
> The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
>
> When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
>
> I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
>
> Just a rough idea so any feedbacks are appreciated.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)


Yep, that's clear. That's a reasonable case. There are already
approximate median computations that can be done cumulatively as you
say, implemented in Spark. I think it's reasonable to consider this
for performance, as it can be faster with just a small error
tolerance. But yeah up to you if you have better ideas.
On Wed, Nov 27, 2019 at 7:57 PM Jungtaek Lim
< [hidden email]> wrote:
>
> Thanks all for providing inputs! Maybe I wasn't clear about my intention.
>
> The issue I focus on is; there're plenty of metrics being defined in a stage for SQL, and each metric has values for each task and being grouped later to calculate aggregated values. (e.g. metric for "elapsed time" is shown in UI as sum, min, med, max  which source values come from each task)
>
> Due to the nature of exact calculation of "median", we can't apply accumulation  we are now storing all values for all metrics till the end of stage. Given the default value of sql shuffle partition is 200, a stage would have 200 tasks when we deal with shuffle (grouping, join, etc.). If we have 50 metrics in a stage, 10000 Long values are maintained in driver side which may ideally just need to be 50 * number of aggregation (at most 4) if all of aggregations support accumulation. So I'm wondering something which could support accumulation and closer to median. (I guess it's intentional to not take average here so...)
>
> What's more on SQLAppStatusListener, they're calculated altogether at the end of SQL execution, which may contain multiple jobs.
> (Oh wait... Hmm... Looks like I missed the another point of optimization here which might mitigate the issue heavily... so please treat my idea as rough idea just for possible optimization.)
>
> But again that's very rough idea, and it won't make sense if the expected output is not acceptable as representation.
>
> Jungtaek Lim (HeartSaVioR)
>
>
> On Wed, Nov 27, 2019 at 11:25 PM Sean Owen < [hidden email]> wrote:
>>
>> How big is the overhead, at scale?
>> If it has a nontrivial effect for most jobs, I could imagine reusing
>> the existing approximate quantile support to more efficiently find a
>> prettyclose median.
>>
>> On Wed, Nov 27, 2019 at 3:55 AM Jungtaek Lim
>> < [hidden email]> wrote:
>> >
>> > Hi Spark devs,
>> >
>> > The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
>> >
>> > When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
>> >
>> > I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
>> >
>> > Just a rough idea so any feedbacks are appreciated.
>> >
>> > Thanks,
>> > Jungtaek Lim (HeartSaVioR)

To unsubscribe email: [hidden email]


Ah yes, right I forgot about the existence. Thanks!
I'm aware of some implementations for approximate calculations (I guess what we say approximate median is approximate percentile with 50%) but I didn't know about implementation details like supporting accumulative. Given current source values of metric are highly optimized (stored in Array[Long] and median is calculated with just one sort), it may need to figure out the overhead of using implemented data structure. I'd expect it would be reasonable if the number of tasks are higher, just not sure how high it will start to give clear benefits.
Maybe worth to explore it after trying out possible optimization on lowerhanging fruit. Still curious whether we agree about approximate median is viable here anyway. Yep, that's clear. That's a reasonable case. There are already
approximate median computations that can be done cumulatively as you
say, implemented in Spark. I think it's reasonable to consider this
for performance, as it can be faster with just a small error
tolerance. But yeah up to you if you have better ideas.
On Wed, Nov 27, 2019 at 7:57 PM Jungtaek Lim
<[hidden email]> wrote:
>
> Thanks all for providing inputs! Maybe I wasn't clear about my intention.
>
> The issue I focus on is; there're plenty of metrics being defined in a stage for SQL, and each metric has values for each task and being grouped later to calculate aggregated values. (e.g. metric for "elapsed time" is shown in UI as sum, min, med, max  which source values come from each task)
>
> Due to the nature of exact calculation of "median", we can't apply accumulation  we are now storing all values for all metrics till the end of stage. Given the default value of sql shuffle partition is 200, a stage would have 200 tasks when we deal with shuffle (grouping, join, etc.). If we have 50 metrics in a stage, 10000 Long values are maintained in driver side which may ideally just need to be 50 * number of aggregation (at most 4) if all of aggregations support accumulation. So I'm wondering something which could support accumulation and closer to median. (I guess it's intentional to not take average here so...)
>
> What's more on SQLAppStatusListener, they're calculated altogether at the end of SQL execution, which may contain multiple jobs.
> (Oh wait... Hmm... Looks like I missed the another point of optimization here which might mitigate the issue heavily... so please treat my idea as rough idea just for possible optimization.)
>
> But again that's very rough idea, and it won't make sense if the expected output is not acceptable as representation.
>
> Jungtaek Lim (HeartSaVioR)
>
>
> On Wed, Nov 27, 2019 at 11:25 PM Sean Owen <[hidden email]> wrote:
>>
>> How big is the overhead, at scale?
>> If it has a nontrivial effect for most jobs, I could imagine reusing
>> the existing approximate quantile support to more efficiently find a
>> prettyclose median.
>>
>> On Wed, Nov 27, 2019 at 3:55 AM Jungtaek Lim
>> <[hidden email]> wrote:
>> >
>> > Hi Spark devs,
>> >
>> > The change might be specific to the SQLAppStatusListener, but given it may change the value of metric being shown in UI, so would like to hear some voices on this.
>> >
>> > When we aggregate the SQL metric between tasks, we apply "sum", "min", "median", "max", which all are cumulative except "median". That's different from "average" given it helps to get rid of outliers, but if that's the only purpose, it may not strictly need to have exact value of median.
>> >
>> > I'm not sure how much the value is losing the meaning of representation, but if it doesn't hurt much, what about taking median of medians? For example, taking median of nearest 10 tasks and store it as one of median values, and finally taking median of medians. If I calculate correctly, that would only require 11% of slots if the number of tasks is 100, and replace sorting 100 elements with sorting 10 elements 11 times. The difference would be bigger if the number of tasks is bigger.
>> >
>> > Just a rough idea so any feedbacks are appreciated.
>> >
>> > Thanks,
>> > Jungtaek Lim (HeartSaVioR)

