flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maidangdang <maidangdan...@126.com>
Subject FlinkSQL fails when rowtime meets dirty data
Date Wed, 15 May 2019 14:49:49 GMT
I use FlinkSQL to process Kafka data in the following format:
|  id |  server_time |
|  1  | 2019-05-15 10:00:00 |
|  2  | 2019-05-15 10:00:00 |
.......


and I define rowtime from the  server_time field:
new Schema()
    .field("rowtime", Types.SQL_TIMESTAMP)
       .rowtime(new Rowtime().timestampsFromField("server_time"))
    .field("id", Types.String)
    .field("server_time", Types.String)


when dirty data arrives, such as :
|  id   |  server_time |
|  99  | 11.22.33.44  |


My FlinkSQL job fails with exception:
java.lang.NumberFormatException: For input string: "11.22.33.44"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
at DataStreamSourceConversion$288.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)


Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint,
consumes dirty data again, fails again, and keeps looping like this.I would like to ask if
there are any good ways to solve this situation?


The Flink version I used was flink-1.7.2
Mime
View raw message