spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ruan.answer" <>
Subject handle data skew problem when calculating word count and word dependency
Date Mon, 14 Nov 2016 07:26:28 GMT
I am planning to calculating word count and two word dependency via spark,
but the data is skew, how can i solve this problem.
And do you have some suggest about double level data slice?

I have some topics, and each topic corresponding to lots of text. so I have
a RDD structure like this:
JavaPairRDD<String, Iterable&lt;String>> topicGroup
Then I want compute word count and word dependency to generate some text
pattern for each topic.
But unfortunately, some topic has a little data, such as 10k, some topic has
a large data, such as 5G, this imbalance drive me crazy.  So I have this

       List<String> topics = topicsGroup.keys().collect();
       LOG.warn("topics group key: " + topics.size());
       for (String key : topics) {
           JavaRDD<String> valuesRDD = getValueRDDByKey(topicsGroup, key);
           long lineCount = valuesRDD.count();
           int minNums = (int)10;
           List<String> lines = valuesRDD.collect();
           JavaRDD<String[]> agentWordsRDD =<String,
String[]>) v1 -> splitAndParseLogLine(v1, logMiningOptions)).cache();

           Map<String, Integer> dict =
agentWordsRDD.flatMap((FlatMapFunction<String[], String>) strings ->
                   .mapToPair((PairFunction<String, String, Integer>) s ->
new Tuple2<>(s, 1))
                   .reduceByKey((Function2<Integer, Integer, Integer>) (v1,
v2) -> v1 + v2)
                   .filter((Function<Tuple2&lt;String, Integer>, Boolean>)
v1 -> {
               if (v1._2() < minNums || v1._1().length() < 1)
                   return true;
               return false;

           JavaRDD<Tuple2&lt;List&lt;String>, List<Integer> > >
=<String[], Tuple2&lt;List&lt;String>,
List<Integer>>>) words -> {
               List<String> tuple = new ArrayList<String>();
               List<Integer> vars = new ArrayList<Integer>();
               int v = 0;
               for (String word : words) {
                   if (dict.containsKey(word)) {
                       v = 0;
                   } else
               return new Tuple2<>(tuple, vars);
           List<Tuple2&lt;List&lt;String>, List<Integer> > > tupleVars

           LogMining logMining = new LogMining(logMiningOptions);
           Tuple2<List&lt;Candidates>, Integer> patterns =  new
Tuple2<>(logMining.batchProcess(tupleVars, dict), logMining.getLineCount());
           putPatternsToDatabase(logFileDate, key, patterns);
But, you can see, I still don't solve the problem of data skew problem, and
for each key i will generate a RDD, it has a lot of redundant operator. So
my question is "do you have some good advice or example about how to handle
this problem" or "when data is skew, how can i cut data by length, not sort
by length, then cut avg like JavaRDD<String> valuesRDDPartition =
valuesRDD.sortBy(new Function<String, Integer>() {
                    public Integer call(String v1) throws Exception {
                }, true, 16);"  it is 16, or some other numbers, not by

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe e-mail:

View raw message