flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zheng Hu (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-14941) The AbstractTableInputFormat#nextRecord in hbase connector will handle the same rowkey twice once encountered any exception
Date Wed, 27 Nov 2019 01:40:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983073#comment-16983073
] 

Zheng Hu commented on FLINK-14941:
----------------------------------

> but I agree with you we should only re-scan if it is an HBase exception. 
+1,  will update the patch. Thanks [~modavis] & [~jark].

> The AbstractTableInputFormat#nextRecord in hbase connector will handle the same rowkey
twice once encountered any exception
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14941
>                 URL: https://issues.apache.org/jira/browse/FLINK-14941
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>            Reporter: Zheng Hu
>            Assignee: Zheng Hu
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the mail list [1].   The user complain that it will see the same row twice if encountered
any HBase exception. 
> The problem is here: 
> {code}
> public T nextRecord(T reuse) throws IOException {
> 		if (resultScanner == null) {
> 			throw new IOException("No table result scanner provided!");
> 		}
> 		try {
> 			Result res = resultScanner.next();
> 			if (res != null) {
> 				scannedRows++;
> 				currentRow = res.getRow();
> 				return mapResultToOutType(res);
> 			}
> 		} catch (Exception e) {
> 			resultScanner.close();
> 			//workaround for timeout on scan
> 			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...",
e);
> 			scan.setStartRow(currentRow);
> 			resultScanner = table.getScanner(scan);
> 			Result res = resultScanner.next();
> 			if (res != null) {
> 				scannedRows++;
> 				currentRow = res.getRow();
> 				return mapResultToOutType(res);
> 			}
> 		}
> 		endReached = true;
> 		return null;
> 	}
> {code}
> We will set the startRow of the new scan to the currentRow which has been seen,  that
means the currentRow will be seen twice.   Actually, we should replace the scan.setStartRow(currentRow)
as scan.withStartRow(currentRow, false) , the false means exclude the currentRow. 
> [1]. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataSet-API-HBase-ScannerTimeoutException-and-double-Result-processing-td31174.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message