flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
Date Mon, 03 Dec 2018 02:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706598#comment-16706598
] 

ASF GitHub Bot commented on FLINK-10134:
----------------------------------------

XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat
bug
URL: https://github.com/apache/flink/pull/7157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..e13560d4823 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.LRUCache;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 /**
  * Base implementation for input formats that split the input at a delimiter into records.
@@ -62,6 +64,23 @@
 	// Charset is not serializable
 	private transient Charset charset;
 
+	/**
+	 * The charset of bom in the file to process.
+	 */
+	private transient Charset bomIdentifiedCharset;
+	/**
+	 * This is the charset that is configured via setCharset().
+	 */
+	private transient Charset configuredCharset;
+	/**
+	 * The Map to record the BOM encoding of all files.
+	 */
+	private transient final Map<String, Charset> fileBomCharsetMap;
+	/**
+	 * The bytes to BOM check.
+	 */
+	byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte)
0xBB, (byte) 0xBF};
+
 	/**
 	 * The default read buffer size = 1MB.
 	 */
@@ -184,6 +203,7 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration)
{
 			configuration = GlobalConfiguration.loadConfiguration();
 		}
 		loadConfigParameters(configuration);
+		this.fileBomCharsetMap = new LRUCache<>(1024);
 	}
 
 	/**
@@ -195,12 +215,25 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration)
{
 	 */
 	@PublicEvolving
 	public Charset getCharset() {
-		if (this.charset == null) {
+		if (this.configuredCharset != null) {
+			this.charset = this.configuredCharset;
+		} else if (this.bomIdentifiedCharset != null) {
+			this.charset = this.bomIdentifiedCharset;
+		} else {
 			this.charset = Charset.forName(charsetName);
 		}
 		return this.charset;
 	}
 
+	/**
+	 * get the charsetName.
+	 *
+	 * @return the charsetName
+	 */
+	public String getCharsetName() {
+		return charsetName;
+	}
+
 	/**
 	 * Set the name of the character set used for the row delimiter. This is
 	 * also used by subclasses to interpret field delimiters, comment strings,
@@ -214,7 +247,7 @@ public Charset getCharset() {
 	@PublicEvolving
 	public void setCharset(String charset) {
 		this.charsetName = Preconditions.checkNotNull(charset);
-		this.charset = null;
+		this.configuredCharset = getSpecialCharset(charset);
 
 		if (this.delimiterString != null) {
 			this.delimiter = delimiterString.getBytes(getCharset());
@@ -472,6 +505,7 @@ public void open(FileInputSplit split) throws IOException {
 
 		this.offset = splitStart;
 		if (this.splitStart != 0) {
+			setBomFileCharset(split);
 			this.stream.seek(offset);
 			readLine();
 			// if the first partial record already pushes the stream over
@@ -481,6 +515,7 @@ public void open(FileInputSplit split) throws IOException {
 			}
 		} else {
 			fillBuffer(0);
+			setBomFileCharset(split);
 		}
 	}
 
@@ -536,6 +571,71 @@ public void close() throws IOException {
 		super.close();
 	}
 
+	/**
+	 * Special default processing for utf-16 and utf-32 is performed.
+	 *
+	 * @param charsetName
+	 * @return
+	 */
+	private Charset getSpecialCharset(String charsetName) {
+		Charset charset;
+		switch (charsetName.toUpperCase()) {
+			case "UTF-16":
+				charset = Charset.forName("UTF-16BE");
+				break;
+			case "UTF-32":
+				charset = Charset.forName("UTF-32BE");
+				break;
+			default:
+				charset = Charset.forName(charsetName);
+				break;
+		}
+		return charset;
+	}
+	/**
+	 * Set file bom encoding.
+	 *
+	 * @param split
+	 */
+	private void setBomFileCharset(FileInputSplit split) {
+		try {
+			if (configuredCharset == null) {
+				String filePath = split.getPath().toString();
+				if (this.fileBomCharsetMap.containsKey(filePath)) {
+					this.bomIdentifiedCharset = this.fileBomCharsetMap.get(filePath);
+				} else {
+					byte[] bomBuffer = new byte[4];
+					if (this.splitStart != 0) {
+						this.stream.seek(0);
+						this.stream.read(bomBuffer, 0, bomBuffer.length);
+						this.stream.seek(split.getStart());
+					} else {
+						System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 4);
+					}
+					if ((bomBuffer[0] == bomBytes[0]) && (bomBuffer[1] == bomBytes[0]) &&
(bomBuffer[2] == bomBytes[1])
+						&& (bomBuffer[3] == bomBytes[2])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-32BE");
+					} else if ((bomBuffer[0] == bomBytes[2]) && (bomBuffer[1] == bomBytes[1]) &&
(bomBuffer[2] == bomBytes[0])
+						&& (bomBuffer[3] == bomBytes[0])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-32LE");
+					} else if ((bomBuffer[0] == bomBytes[3]) && (bomBuffer[1] == bomBytes[4]) &&
(bomBuffer[2] == bomBytes[5])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-8");
+					} else if ((bomBuffer[0] == bomBytes[1]) && (bomBuffer[1] == bomBytes[2])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-16BE");
+					} else if ((bomBuffer[0] == bomBytes[2]) && (bomBuffer[1] == bomBytes[1])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-16LE");
+					} else {
+						this.bomIdentifiedCharset = Charset.forName(charsetName);
+					}
+					this.fileBomCharsetMap.put(filePath, this.bomIdentifiedCharset);
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Failed to get file bom encoding.");
+			this.bomIdentifiedCharset = Charset.forName(charsetName);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	protected final boolean readLine() throws IOException {
diff --git a/flink-core/src/main/java/org/apache/flink/util/LRUCache.java b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
new file mode 100644
index 00000000000..ee7aa2516ca
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A LRUCache by LinkedHashMap.
+ */
+public class LRUCache<K, V> extends LinkedHashMap<K, V> implements java.io.Serializable
{
+
+	private final int maxCacheSize;
+
+	public LRUCache(int cacheSize) {
+		super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
+		maxCacheSize = cacheSize;
+	}
+
+	@Override
+	protected boolean removeEldestEntry(Map.Entry eldest) {
+		return size() > maxCacheSize;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<K, V> entry : entrySet()) {
+			sb.append(String.format("%s:%s ", entry.getKey(), entry.getValue()));
+		}
+		return sb.toString();
+	}
+}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index 82793adc137..6c21743e859 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
@@ -58,16 +59,30 @@ public TextInputFormat(Path filePath) {
 
 	// --------------------------------------------------------------------------------------------
 
-	public String getCharsetName() {
-		return charsetName;
-	}
-
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("Charset must not be null.");
 		}
 
 		this.charsetName = charsetName;
+		this.setCharset(charsetName);
+	}
+
+	/**
+	 * Processing for Delimiter special cases.
+	 */
+	private void setSpecialDelimiter() {
+		String delimiterString = "\n";
+		if (this.getDelimiter() != null && this.getDelimiter().length == 1
+			&& this.getDelimiter()[0] == (byte) '\n') {
+			this.setDelimiter(delimiterString);
+		}
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		setSpecialDelimiter();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -92,7 +107,7 @@ public String readRecord(String reusable, byte[] bytes, int offset, int
numBytes
 			numBytes -= 1;
 		}
 
-		return new String(bytes, offset, numBytes, this.charsetName);
+		return new String(bytes, offset, numBytes, this.getCharset());
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index e78232ac1e5..6de946b8585 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -25,16 +25,20 @@
 
 import org.junit.Test;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -215,4 +219,243 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter)
{
 		}
 	}
 
+	/**
+	 * Test different file encodings,for example:UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE,
UTF-32LE, UTF-32BE.
+	 */
+	@Test
+	public void testFileCharset() {
+		String data = "Hello|ハロー|при\\вет|Bon^*|\\|<>|jour|Сайн. байна
уу|안녕*하세요.";
+		// Default separator
+		testAllFileCharset(data);
+		// Specified separator
+		testAllFileCharset(data, "^*|\\|<>|");
+	}
+
+	private void testAllFileCharset(String data) {
+		testAllFileCharset(data, "");
+	}
+
+	private void testAllFileCharset(String data, String delimiter) {
+		try {
+			// test UTF-8, no bom, UTF-8
+			testFileCharset(data, "UTF-8", false, "UTF-8", delimiter);
+			// test UTF-8, have bom, UTF-8
+			testFileCharset(data, "UTF-8", true, "UTF-8", delimiter);
+			// test UTF-16BE, no, UTF-16
+			testFileCharset(data, "UTF-16BE", false, "UTF-16", delimiter);
+			// test UTF-16BE, yes, UTF-16
+			testFileCharset(data, "UTF-16BE", true, "UTF-16", delimiter);
+			// test UTF-16LE, no, UTF-16LE
+			testFileCharset(data, "UTF-16LE", false, "UTF-16LE", delimiter);
+			// test UTF-16LE, yes, UTF-16
+			testFileCharset(data, "UTF-16LE", true, "UTF-16", delimiter);
+			// test UTF-16BE, no, UTF-16BE
+			testFileCharset(data, "UTF-16BE", false, "UTF-16BE", delimiter);
+			// test UTF-16BE, yes, UTF-16LE
+			testFileCharset(data, "UTF-16BE", true, "UTF-16LE", delimiter);
+			// test UTF-16LE, yes, UTF-16BE
+			testFileCharset(data, "UTF-16LE", true, "UTF-16BE", delimiter);
+			// test UTF-32BE, no, UTF-32
+			testFileCharset(data, "UTF-32BE", false, "UTF-32", delimiter);
+			// test UTF-32BE, yes, UTF-32
+			testFileCharset(data, "UTF-32BE", true, "UTF-32", delimiter);
+			// test UTF-32LE, yes, UTF-32
+			testFileCharset(data, "UTF-32LE", true, "UTF-32", delimiter);
+			// test UTF-32LE, no, UTF-32LE
+			testFileCharset(data, "UTF-32LE", false, "UTF-32LE", delimiter);
+			// test UTF-32BE, no, UTF-32BE
+			testFileCharset(data, "UTF-32BE", false, "UTF-32BE", delimiter);
+			// test UTF-32BE, yes, UTF-32LE
+			testFileCharset(data, "UTF-32BE", true, "UTF-32LE", delimiter);
+			// test UTF-32LE, yes, UTF-32BE
+			testFileCharset(data, "UTF-32LE", true, "UTF-32BE", delimiter);
+			//------------------Don't set the charset------------------------
+			// test UTF-8, have bom, Don't set the charset
+			testFileCharset(data, "UTF-8", true, delimiter);
+			// test UTF-8, no bom, Don't set the charset
+			testFileCharset(data, "UTF-8", false, delimiter);
+			// test UTF-16BE, no bom, Don't set the charset
+			testFileCharset(data, "UTF-16BE", false, delimiter);
+			// test UTF-16BE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-16BE", true, delimiter);
+			// test UTF-16LE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-16LE", true, delimiter);
+			// test UTF-32BE, no bom, Don't set the charset
+			testFileCharset(data, "UTF-32BE", false, delimiter);
+			// test UTF-32BE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-32BE", true, delimiter);
+			// test UTF-32LE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-32LE", true, delimiter);
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
+
+	/**
+	 * To create UTF EncodedFile.
+	 *
+	 * @param data
+	 * @param fileCharset
+	 * @param hasBom
+	 * @return
+	 */
+	private File createUTFEncodedFile(String data, String fileCharset, boolean hasBom) throws
Exception {
+		BufferedWriter bw = null;
+		OutputStreamWriter osw = null;
+		FileOutputStream fos = null;
+
+		byte[] bom = new byte[]{};
+		if (hasBom) {
+			switch (fileCharset) {
+				case "UTF-8":
+					bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+					break;
+				case "UTF-16":
+					bom = new byte[]{(byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-16LE":
+					bom = new byte[]{(byte) 0xFF, (byte) 0xFE};
+					break;
+				case "UTF-16BE":
+					bom = new byte[]{(byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-32":
+					bom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-32LE":
+					bom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00};
+					break;
+				case "UTF-32BE":
+					bom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF};
+					break;
+				default:
+					throw new Exception("can not find the utf code");
+			}
+		}
+
+		// create input file
+		File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+		fos = new FileOutputStream(tempFile, true);
+
+		if (tempFile.length() < 1) {
+			if (hasBom) {
+				fos.write(bom);
+			}
+		}
+
+		osw = new OutputStreamWriter(fos, fileCharset);
+		bw = new BufferedWriter(osw);
+		bw.write(data);
+		bw.newLine();
+
+		bw.close();
+		fos.close();
+
+		return tempFile;
+	}
+
+	private void testFileCharset(String data, String fileCharset, Boolean hasBom, String delimiter)
{
+		testFileCharset(data, fileCharset, hasBom, null, delimiter);
+	}
+
+	private void testFileCharset(String data, String fileCharset, Boolean hasBom, String specifiedCharset,
String delimiter) {
+		try {
+			int offset = 0;
+			String carriageReturn = java.security.AccessController.doPrivileged(
+				new sun.security.action.GetPropertyAction("line.separator"));
+			String delimiterString = delimiter.isEmpty() ? carriageReturn : delimiter;
+			byte[] delimiterBytes = delimiterString.getBytes(fileCharset);
+			String[] utfArray = {"UTF-8", "UTF-16", "UTF-16LE", "UTF-16BE"};
+			if (hasBom) {
+				if (Arrays.asList(utfArray).contains(fileCharset)) {
+					offset = 1;
+				}
+			}
+
+			File tempFile = createUTFEncodedFile(data, fileCharset, hasBom);
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			if (specifiedCharset != null) {
+				inputFormat.setCharsetName(specifiedCharset);
+			}
+			if (delimiterBytes.length > 0) {
+				inputFormat.setDelimiter(delimiterBytes);
+			}
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(1);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			inputFormat.open(splits[0]);
+
+			String result = "";
+			int i = 0;
+			data = data + carriageReturn;
+			String delimiterStr = new String(delimiterBytes, 0, delimiterBytes.length, fileCharset);
+			String[] strArr = data.split(delimiterStr
+				.replace("\\", "\\\\")
+				.replace("^", "\\^")
+				.replace("|", "\\|")
+				.replace("[", "\\[")
+				.replace("*", "\\*")
+				.replace(".", "\\.")
+			);
+
+			while (!inputFormat.reachedEnd()) {
+				if (i < strArr.length) {
+					result = inputFormat.nextRecord("");
+					if (i == 0) {
+						result = result.substring(offset);
+					}
+					if (Charset.forName(fileCharset) != inputFormat.getCharset()) {
+						assertNotEquals(strArr[i], result);
+					} else {
+						assertEquals(strArr[i], result);
+					}
+					i++;
+				} else {
+					inputFormat.nextRecord("");
+				}
+			}
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
+
+	@Test
+	public void testFileCharsetReadByMultiSplits() {
+		String carriageReturn = java.security.AccessController.doPrivileged(
+			new sun.security.action.GetPropertyAction("line.separator"));
+		final String data = "First line" + carriageReturn + "Second line";
+		try {
+			File tempFile = createUTFEncodedFile(data, "UTF-16", false);
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			inputFormat.setCharsetName("UTF-32");
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(3);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			String result = "";
+			for (FileInputSplit split : splits) {
+				inputFormat.open(split);
+				result = inputFormat.nextRecord("");
+			}
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> UTF-16 support for TextInputFormat
> ----------------------------------
>
>                 Key: FLINK-10134
>                 URL: https://issues.apache.org/jira/browse/FLINK-10134
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.4.2
>            Reporter: David Dreyfus
>            Priority: Critical
>              Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It particular,
it doesn't appear that Flink consumes the Byte Order Mark (BOM) to establish whether a UTF-16
file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), which sets
TextInputFormat.charsetName and then modifies the previously set delimiterString to construct
the proper byte string encoding of the the delimiter. This same charsetName is also used in
TextInputFormat.readRecord() to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will return a
Big Endian byte sequence including the Byte Order Mark (BOM). The actual text file will not
contain a BOM at each line ending, so the delimiter will never be read. Moreover, if the actual
byte encoding of the file is Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a byte sequence
with the String(bytes, offset, numBytes, charset) call. Therefore, it will assume Big Endian,
which may not always be correct. [1] [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would have to start
by reading the BOM from the file when a Split is opened and then using that BOM to modify
the specified encoding to a BOM specific one when the caller doesn't specify one, and to overwrite
the caller's specification if the BOM is in conflict with the caller's specification. That
is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, Flink should rewrite
the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or misreading the
code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message