flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
Date Wed, 25 Nov 2015 17:09:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15027162#comment-15027162
] 

ASF GitHub Bot commented on FLINK-2837:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1398#discussion_r45892911
  
    --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
---
    @@ -15,75 +16,474 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
     package org.apache.flink.storm.api;
     
    +import backtype.storm.generated.ComponentCommon;
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.generated.Grouping;
     import backtype.storm.generated.StormTopology;
    +import backtype.storm.topology.IRichBolt;
    +import backtype.storm.topology.IRichSpout;
    +import backtype.storm.topology.IRichStateSpout;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.base.Preconditions;
     import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.storm.util.SplitStreamMapper;
    +import org.apache.flink.storm.util.SplitStreamType;
    +import org.apache.flink.storm.util.StormStreamSelector;
    +import org.apache.flink.storm.wrappers.BoltWrapper;
    +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
    +import org.apache.flink.storm.wrappers.SpoutWrapper;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.datastream.SplitStream;
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
     
     /**
    - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of
a {@link
    - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment},
a {@link FlinkTopology}
    - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster},
{@link FlinkSubmitter}, or
    - * {@link FlinkClient}.
    + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program.
    + * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
      */
    -public class FlinkTopology extends StreamExecutionEnvironment {
    +public class FlinkTopology {
    +
    +	/** All declared streams and output schemas by operator ID */
    +	private final HashMap<String, HashMap<String, Fields>> outputStreams = new
HashMap<String, HashMap<String, Fields>>();
    +	/** All spouts&bolts declarers by their ID */
    +	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String,
FlinkOutputFieldsDeclarer>();
    +
    +	private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>
unprocessdInputsPerBolt =
    +			new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
    +
    +	final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs
= new HashMap<>();
     
    -	/** The number of declared tasks for the whole program (ie, sum over all dops) */
    -	private int numberOfTasks = 0;
    +	private final TopologyBuilder builder;
     
    -	public FlinkTopology() {
    -		// Set default parallelism to 1, to mirror Storm default behavior
    -		super.setParallelism(1);
    +	// needs to be a class member for internal testing purpose
    +	private final StormTopology stormTopology;
    +
    +	private final Map<String, IRichSpout> spouts;
    +	private final Map<String, IRichBolt> bolts;
    +
    +	private final StreamExecutionEnvironment env;
    +
    +	private FlinkTopology(TopologyBuilder builder) {
    +		this.builder = builder;
    +		this.stormTopology = builder.createTopology();
    +		// extract the spouts and bolts
    +		this.spouts = getPrivateField("_spouts");
    +		this.bolts = getPrivateField("_bolts");
    +
    +		this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		// Kick off the translation immediately
    +		translateTopology();
     	}
     
     	/**
    -	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter},
or {@link
    -	 * FlinkClient}.
     	 *
    -	 * @throws UnsupportedOperationException
    -	 * 		at every invocation
    +	 * Creates a Flink program that uses the specified spouts and bolts.
    +	 * @param stormBuilder The storm topology builder to use for creating the Flink topology.
    +	 * @return A Flink Topology which may be executed.
     	 */
    -	@Override
    -	public JobExecutionResult execute() throws Exception {
    -		throw new UnsupportedOperationException(
    -				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter,
or FlinkClient " +
    -				"instead.");
    +	public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
    +		return new FlinkTopology(stormBuilder);
     	}
     
     	/**
    -	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}
or {@link
    -	 * FlinkClient}.
    -	 *
    -	 * @throws UnsupportedOperationException
    -	 * 		at every invocation
    +	 * Returns the underlying Flink ExecutionEnvironment for the Storm topology.
    +	 * @return The contextual environment.
     	 */
    -	@Override
    -	public JobExecutionResult execute(final String jobName) throws Exception {
    -		throw new UnsupportedOperationException(
    -				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter,
or FlinkClient " +
    -				"instead.");
    +	public StreamExecutionEnvironment getExecutionEnvironment() {
    +		return this.env;
     	}
     
     	/**
    -	 * Increased the number of declared tasks of this program by the given value.
    -	 *
    -	 * @param dop
    -	 * 		The dop of a new operator that increases the number of overall tasks.
    +	 * Directly executes the Storm topology based on the current context (local when in
IDE and
    +	 * remote when executed thorugh ./bin/flink).
    +	 * @return The execution result
    +	 * @throws Exception
     	 */
    --- End diff --
    
    Yes, execute just throws an Exception. Nothing to explain here. StreamExecutionEnvironment
says
    > 	 * @throws Exception which occurs during job execution.



> FlinkTopologyBuilder cannot handle multiple input streams
> ---------------------------------------------------------
>
>                 Key: FLINK-2837
>                 URL: https://issues.apache.org/jira/browse/FLINK-2837
>             Project: Flink
>          Issue Type: Bug
>          Components: Storm Compatibility
>            Reporter: Matthias J. Sax
>            Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead of union
the incoming streams, it replicates the consuming bolt and each (logical) instance processes
one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
> 	.shuffleGrouping(spoutId1)
> 	.shuffleGrouping(spoutId2)
> 	.shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
> 	.shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message