Fwd: Check

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Fwd: Check

Aniket Khandelwal
Hi all,

I was stuck on a problem that I faced recently. The problem statement is like : 
Event Bean consists of eventId, eventTag, text, ..... 
We need to run a spark job that aggregates the eventTag column and picks top K1 of them.
Additionally, we need for each eventTag, list of eventIds (first K2 size) which contain that eventTag.
K1, K2 < 100

*Like for top 1 eventTag*
K1 = 1, K2 = 2
id_1   eventTag1  text1
id_2   eventTag2  text2
id_3   eventTag3  text3
id_4   eventTag1  text4

Ans would be:   eventTag1 - >  id_1, id_4   

The job would run over data of the last 30 days (will iterate over 200million * 30 records).
The records are flattened and saved in parquet.

What could be the solution for the same?
We can group and sum the eventTag count and order by count limit K. (Aggregate Task) 
We can run MapTask and fitter out nonrelevant eventTag column.
Then I can aggregate and define User Defined Aggregate Function and Create Column of eventIds.
One problem :
We need to run the operation on all files multiple times (2 here).
Is it possible after we find top K elements, that we re-read the rows again which are already present in memory?
I can persist initial dataset but it will be large. 

We write custom UDAF  and in reduce and merge phase, I add the count of grouped eventTag.
During the final phase, if I can somehow broadcast all the count and add them in the priority queue of size K. If I can do above, I can actually filter at the last phase if
the column is needed or not by setting the boolean variable.  We can actually collect eventIds in UDAF  if we want and for the column with boolean true, 
we filter them out and get the result.
Two problems : 
How will I achieve broadcast? can we use accumulators?
For each eventTag, I will end up having K2 eventIds (Wastage of Memory)

We write custom UDAF and add count, eventIds for each eventTag.
Create a string column with some delimiter consisting of the count, K2 eventIds. 
Break columns on the basis of delimiter and then order by count column and pick K1 rows.

Can someone help with the better approach?
1. I want to reduce num of times all files are listed, footers read and they are brought in memory.
2.  If possible do not collect eventIds for each eventTag.


This e-mail may include confidential and/or
proprietary information and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.