flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.
Date Mon, 01 Oct 2018 11:59:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633902#comment-16633902
] 

ASF GitHub Bot commented on FLINK-10447:
----------------------------------------

SuXingLee closed pull request #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table
Sink
URL: https://github.com/apache/flink/pull/6769
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index 592db4b5106..55425277cbc 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -65,6 +65,15 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
@@ -105,6 +114,14 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
new file mode 100644
index 00000000000..4c2899bd172
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSink.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.util.TableConnectorUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * An bucketing Table sink.
+ */
+public class BucketingTableSink implements AppendStreamTableSink<Row> {
+
+	private final BucketingSink<Row> sink;
+	private String[] fieldNames;
+	private TypeInformation<?>[] fieldTypes;
+
+	/**
+	 * Creates a new {@code BucketingTableSink} that writes table rows to the given base directory.
+	 *
+	 * @param sink The BucketingSink to which to write the table rows.
+	 */
+	public BucketingTableSink(BucketingSink<Row> sink) {
+		this.sink = sink;
+	}
+
+	/**
+	 * A builder to configure and build the BucketingTableSink.
+	 *
+	 * @param basePath The directory to which to write the bucket files.
+	 */
+	public static BucketingTableSinkBuilder builder(String basePath) {
+		return new BucketingTableSinkBuilder(basePath);
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		dataStream.addSink(sink).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(fieldTypes, fieldNames);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes)
{
+		BucketingTableSink copy;
+		try {
+			copy = new BucketingTableSink(InstantiationUtil.clone(sink));
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		copy.fieldNames = fieldNames;
+		copy.fieldTypes = fieldTypes;
+		return copy;
+	}
+}
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
new file mode 100644
index 00000000000..87ee877f3d1
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkBuilder.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.types.Row;
+
+/**
+ * A builder to configure and build the BucketingTableSink.
+ */
+public class BucketingTableSinkBuilder {
+
+	private final BucketingSink<Row> sink;
+
+	/**
+	 * Specify the basePath of BucketingTableSink.
+	 *
+	 * @param basePath the basePath of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder(String basePath) {
+		this.sink = new BucketingSink<Row>(basePath);
+	}
+
+	/**
+	 * Specify the batchSize of BucketingTableSink.
+	 *
+	 * @param batchSize the batchSize of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setBatchSize(long batchSize) {
+		this.sink.setBatchSize(batchSize);
+		return this;
+	}
+
+	/**
+	 * Specify the batchRolloverInterval of BucketingTableSink.
+	 *
+	 * @param batchRolloverInterval the batchRolloverInterval of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setBatchRolloverInterval(long batchRolloverInterval) {
+		this.sink.setBatchRolloverInterval(batchRolloverInterval);
+		return this;
+	}
+
+	/**
+	 * Specify the bucketCheckInterval of BucketingTableSink.
+	 *
+	 * @param interval the bucketCheckInterval of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setInactiveBucketCheckInterval(long interval) {
+		this.sink.setInactiveBucketCheckInterval(interval);
+		return this;
+	}
+
+	/**
+	 * Specify the inactiveBucketThreshold of BucketingTableSink.
+	 *
+	 * @param threshold the inactiveBucketThreshold of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setInactiveBucketThreshold(long threshold) {
+		this.sink.setInactiveBucketThreshold(threshold);
+		return this;
+	}
+
+	/**
+	 * Specify the bucketer of BucketingTableSink.
+	 *
+	 * @param bucketer the bucketer of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setBucketer(Bucketer<Row> bucketer) {
+		this.sink.setBucketer(bucketer);
+		return this;
+	}
+
+	/**
+	 * Specify the writer of BucketingTableSink.
+	 *
+	 * @param writer the writer of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setWriter(Writer<Row> writer) {
+		this.sink.setWriter(writer);
+		return this;
+	}
+
+	/**
+	 * Specify the inProgressSuffix of BucketingTableSink.
+	 *
+	 * @param inProgressSuffix the inProgressSuffix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setInProgressSuffix(String inProgressSuffix) {
+		this.sink.setInProgressSuffix(inProgressSuffix);
+		return this;
+	}
+
+	/**
+	 * Specify the inProgressPrefix of BucketingTableSink.
+	 *
+	 * @param inProgressPrefix the inProgressPrefix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setInProgressPrefix(String inProgressPrefix) {
+		this.sink.setInProgressPrefix(inProgressPrefix);
+		return this;
+	}
+
+	/**
+	 * Specify the pendingSuffix of BucketingTableSink.
+	 *
+	 * @param pendingSuffix the pendingSuffix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setPendingSuffix(String pendingSuffix) {
+		this.sink.setPendingSuffix(pendingSuffix);
+		return this;
+	}
+
+	/**
+	 * Specify the pendingPrefix of BucketingTableSink.
+	 *
+	 * @param pendingPrefix the pendingPrefix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setPendingPrefix(String pendingPrefix) {
+		this.sink.setPendingPrefix(pendingPrefix);
+		return this;
+	}
+
+	/**
+	 * Specify the validLengthSuffix of BucketingTableSink.
+	 *
+	 * @param validLengthSuffix the validLengthSuffix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setValidLengthSuffix(String validLengthSuffix) {
+		this.sink.setValidLengthSuffix(validLengthSuffix);
+		return this;
+	}
+
+	/**
+	 * Specify the validLengthPrefix of BucketingTableSink.
+	 *
+	 * @param validLengthPrefix the validLengthPrefix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setValidLengthPrefix(String validLengthPrefix) {
+		this.sink.setValidLengthPrefix(validLengthPrefix);
+		return this;
+	}
+
+	/**
+	 * Specify the partSuffix of BucketingTableSink.
+	 *
+	 * @param partSuffix the partSuffix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setPartSuffix(String partSuffix) {
+		this.sink.setPartSuffix(partSuffix);
+		return this;
+	}
+
+	/**
+	 * Specify the partPrefix of BucketingTableSink.
+	 *
+	 * @param partPrefix the partPrefix of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setPartPrefix(String partPrefix) {
+		this.sink.setPartPrefix(partPrefix);
+		return this;
+	}
+
+	/**
+	 * Specify the useTruncate of BucketingTableSink.
+	 *
+	 * @param useTruncate the useTruncate of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setUseTruncate(boolean useTruncate) {
+		this.sink.setUseTruncate(useTruncate);
+		return this;
+	}
+
+	/**
+	 * Specify the asyncTimeout of BucketingTableSink.
+	 *
+	 * @param timeout the asyncTimeout of the BucketingTableSink.
+	 */
+	public BucketingTableSinkBuilder setAsyncTimeout(long timeout) {
+		this.sink.setAsyncTimeout(timeout);
+		return this;
+	}
+
+	/**
+	 * Finalizes the configuration and checks validity.
+	 *
+	 * @return BucketingTableSink
+	 */
+	public BucketingTableSink build() {
+		return new BucketingTableSink(sink);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
new file mode 100644
index 00000000000..4753369af1f
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingTableSinkTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+/**
+ * Test for BucketingTableSinkTest.
+ */
+public class BucketingTableSinkTest {
+	private static final String[] FIELD_NAMES = new String[] { "foo" };
+	private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO
};
+	private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+	@Test
+	public void testBucketingTableSink() {
+		BucketingTableSink sink = BucketingTableSink.builder("/data/tmp/foo")
+				.setBatchSize(384 * 1024 * 1024)
+				.setBatchRolloverInterval(5 * 60 * 1000)
+				.setBucketer(new DateTimeBucketer<Row>("yyyy/MM/dd"))
+				.build();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
+		sink.emitDataStream(ds);
+
+		Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
+		assertEquals(1, sinkIds.size());
+
+		int sinkId = sinkIds.iterator().next();
+		StreamSink planSink = (StreamSink) env.getStreamGraph().getStreamNode(sinkId).getOperator();
+		assertTrue(planSink.getUserFunction() instanceof BucketingSink<?>);
+	}
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Create Bucketing Table Sink.
> ----------------------------
>
>                 Key: FLINK-10447
>                 URL: https://issues.apache.org/jira/browse/FLINK-10447
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Suxing Lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> It would be nice to integrate the table APIs with the HDFS connectors so that the rows
in the tables can be directly pushed into HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message