flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Robert Bradshaw (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-10566) Flink Planning is exponential in the number of stages
Date Tue, 16 Oct 2018 13:40:00 GMT
Robert Bradshaw created FLINK-10566:
---------------------------------------

             Summary: Flink Planning is exponential in the number of stages
                 Key: FLINK-10566
                 URL: https://issues.apache.org/jira/browse/FLINK-10566
             Project: Flink
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 1.5.4
            Reporter: Robert Bradshaw
         Attachments: chart.png

This makes it nearly impossible to run graphs with 100 or more stages. (The execution itself
is still sub-second, but the job submission takes increasingly long.)

I can reproduce this with the following pipeline, which resembles my real-world workloads
(with depth up to 10 and width up, and past, 50). On Flink it seems getting width beyond
width 10 is problematic (times out after hours). Note the log scale on the chart for time. 

 
{code:java}
  public static void runPipeline(int depth, int width) throws Exception {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> input = env.fromElements("a", "b", "c");
    DataSet<String> stats = null;

    for (int i = 0; i < depth; i++) {
      stats = analyze(input, stats, width / (i + 1) + 1);
    }

    stats.writeAsText("out.txt");
    env.execute("depth " + depth + " width " + width);
  }

  public static DataSet<String> analyze(DataSet<String> input, DataSet<String>
stats, int branches) {
    System.out.println("analyze " + branches);
    for (int i = 0; i < branches; i++) {
      final int ii = i;

      if (stats != null) {
        input = input.map(new RichMapFunction<String, String>() {
            @Override
            public void open(Configuration parameters) throws Exception {
              Collection<String> broadcastSet = getRuntimeContext().getBroadcastVariable("stats");
            }
            @Override
            public String map(String value) throws Exception {
              return value;
            }
          }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
      }

      DataSet<String> branch = input
                               .map(s -> new Tuple2<Integer, String>(0, s + ii))
                               .groupBy(0)
                               .minBy(1)
                               .map(kv -> kv.f1);
      if (stats == null) {
        stats = branch;
      } else {
        stats = stats.union(branch);
      }
    }
    return stats.map(s -> "(" + s + ").stats");
  }

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message