[ https://issues.apache.org/jira/browse/FLUME-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227148#comment-17227148
]
Fabien Carrion commented on FLUME-3279:
---------------------------------------
Hello, I patched this issue with this
{code:java}
flume ((release-1.9.0-rc3))$ git diff
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index 7106696a..10746b96 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -171,6 +171,8 @@ class HiveWriter {
throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
} catch (TimeoutException e) {
throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
+ } catch (IllegalStateException e) {
+ throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
}
}
@@ -284,6 +286,9 @@ class HiveWriter {
} catch (TimeoutException e) {
LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e);
return;
+ } catch (IllegalStateException e) {
+ LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e);
+ return;
}
}{code}
> hive sink cannot work after a HeartBeatFailure
> ----------------------------------------------
>
> Key: FLUME-3279
> URL: https://issues.apache.org/jira/browse/FLUME-3279
> Project: Flume
> Issue Type: Bug
> Environment: system:CentOS Linux release 7.3.1611 (Core)
> flumn:apache-flume-1.8.0-bin
> hive:hive-2.1.1
> Reporter: xiaomajia700
> Priority: Major
> Attachments: error.txt
>
>
> Flume throws a HeartBeatFailure after running a few hours. After that flume cannot work
normally, and throws error continuously.
> I see the code, it seems like that hive transaction closed and not removed from active
writers.
> log below:
> 2018-11-02 04:04:19,493 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)]
Closed file: /home/lmode/logs/rs_ab_shunt/rs_ab_shunt.log, inode: 76496002, pos: 444761
> 2018-11-02 04:04:22,975 (Log-BackgroundWorker-c1) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:138)]
Checkpoint not required
> 2018-11-02 04:04:33,938 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
Checking file:/home/lmode/soft/apache-flume-1.8.0-bin/conf/hive.conf for changes
> 2018-11-02 04:04:36,497 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)]
Opening file: /home/lmode/logs/rs_ab_shunt/rs_ab_shunt.log, inode: 76496002, pos: 444761
> 2018-11-02 04:04:40,732 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:40,732 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:40,732 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:40,732 (hive-k1-call-runner-0) [INFO - org.apache.flume.sink.hive.HiveWriter$2.call(HiveWriter.java:236)]
Sending heartbeat on batch TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083',
database='test', table='flume_test', partitionVals=[] }
> 2018-11-02 04:04:40,765 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hive.HiveWriter.heartBeat(HiveWriter.java:244)]
Unable to send heartbeat on Txn Batch TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083',
database='test', table='flume_test', partitionVals=[] }
> org.apache.hive.hcatalog.streaming.HeartBeatFailure: Heart beat error. InvalidTxns: [].
AbortedTxns: [17290, 17291, 17292, 17293, 17294, 17295, 17296, 17297, 17298, 17299, 17300]
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.heartbeat(HiveEndPoint.java:953)
> at org.apache.flume.sink.hive.HiveWriter$2.call(HiveWriter.java:237)
> at org.apache.flume.sink.hive.HiveWriter$2.call(HiveWriter.java:233)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2018-11-02 04:04:40,771 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.commitTxn(HiveWriter.java:337)]
Committing Txn 17290 on EndPoint: {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] }
> 2018-11-02 04:04:40,811 (hive-k1-call-runner-0) [ERROR - org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.markDead(HiveEndPoint.java:756)]
Fatal error on TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083',
database='test', table='flume_test', partitionVals=[] }; cause Unable to abort invalid transaction
id : 17290: No such transaction txnid:17290
> org.apache.hive.hcatalog.streaming.TransactionError: Unable to abort invalid transaction
id : 17290: No such transaction txnid:17290
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abortImpl(HiveEndPoint.java:936)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abort(HiveEndPoint.java:894)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.markDead(HiveEndPoint.java:753)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:853)
> at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:343)
> at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:340)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: NoSuchTxnException(message:No such transaction txnid:17290)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result$abort_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result$abort_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result.read(ThriftHiveMetastore.java)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_abort_txn(ThriftHiveMetastore.java:4477)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.abort_txn(ThriftHiveMetastore.java:4464)
> at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.rollbackTxn(HiveMetaStoreClient.java:2093)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:150)
> at com.sun.proxy.$Proxy7.rollbackTxn(Unknown Source)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abortImpl(HiveEndPoint.java:923)
> ... 10 more
> 2018-11-02 04:04:40,836 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:323)]
k1 : Commit of Txn 17290 failed on EndPoint: {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] }
> org.apache.flume.sink.hive.HiveWriter$CommitException: Commit of Txn 17290 failed on
EndPoint: {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test', partitionVals=[]
}
> at org.apache.flume.sink.hive.HiveWriter.commitTxn(HiveWriter.java:348)
> at org.apache.flume.sink.hive.HiveWriter.flush(HiveWriter.java:199)
> at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:316)
> at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.TransactionError: Aborted transaction cannot
be committed: Transaction txnid:17290 already aborted
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:866)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:834)
> at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:343)
> at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:340)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: TxnAbortedException(message:Transaction txnid:17290 already aborted)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result$commit_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result$commit_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result.read(ThriftHiveMetastore.java)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_commit_txn(ThriftHiveMetastore.java:4523)
> at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.commit_txn(ThriftHiveMetastore.java:4510)
> at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.commitTxn(HiveMetaStoreClient.java:2099)
> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:150)
> at com.sun.proxy.$Proxy7.commitTxn(Unknown Source)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:860)
> ... 8 more
> 2018-11-02 04:04:40,837 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.abortTxn(HiveWriter.java:353)]
Aborting Txn id 17290 on End Point {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] }
> 2018-11-02 04:04:40,839 (hive-k1-call-runner-0) [INFO - org.apache.flume.sink.hive.HiveWriter$4.call(HiveWriter.java:297)]
Aborted txn 17290
> 2018-11-02 04:04:40,840 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.Log.rollback(Log.java:736)]
Rolling back 1541064961501
> 2018-11-02 04:04:40,843 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)]
Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch
TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] } has been closed()
> at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[17201...17300] on
endPoint = {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test', partitionVals=[]
} has been closed()
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:739)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.beginNextTransaction(HiveEndPoint.java:650)
> at org.apache.flume.sink.hive.HiveWriter$3.call(HiveWriter.java:275)
> at org.apache.flume.sink.hive.HiveWriter$3.call(HiveWriter.java:272)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> 2018-11-02 04:04:45,844 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:45,845 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:45,845 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:45,845 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.Log.rollback(Log.java:736)]
Rolling back 1541064961502
> 2018-11-02 04:04:45,846 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)]
Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch
TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] } has been closed()
> at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[17201...17300] on
endPoint = {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test', partitionVals=[]
} has been closed()
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:739)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:778)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735)
> at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:50)
> at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158)
> at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> 2018-11-02 04:04:50,847 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:50,847 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:50,847 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:50,847 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:298)]
k1 : Writing event to {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test',
partitionVals=[] }
> 2018-11-02 04:04:50,848 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.Log.rollback(Log.java:736)]
Rolling back 1541064961504
> 2018-11-02 04:04:50,848 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)]
Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch
TxnIds=[17201...17300] on endPoint = {metaStoreUri='thrift://query-01:9083', database='test',
table='flume_test', partitionVals=[] } has been closed()
> at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[17201...17300] on
endPoint = {metaStoreUri='thrift://query-01:9083', database='test', table='flume_test', partitionVals=[]
} has been closed()
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:739)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:778)
> at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735)
> at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:50)
> at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158)
> at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152)
> at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org
|