flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
Date Thu, 06 Dec 2018 06:32:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711035#comment-16711035
] 

ASF GitHub Bot commented on FLINK-11010:
----------------------------------------

lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent
with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
     [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
     [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53)
     [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
(IncrementalAggregateWindowFunction.scala:74)
     [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
(IncrementalAggregateTimeWindowFunction.scala:72)
     [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
(IncrementalAggregateTimeWindowFunction.scala:39)
     [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
(InternalSingleValueWindowFunction.java:46)
     [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
(WindowOperator.java:550)
     [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
(WindowOperator.java:505)
     [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
(HeapInternalTimerService.java:266)
     [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
(SystemProcessingTimeService.java:281)
     [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
     [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
     [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201
(ScheduledThreadPoolExecutor.java:180)
     [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
     [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
     [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
     [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
     {
       return new Timestamp(v - LOCAL_TZ.getOffset(v));
     }
   `
   let us print the value of windowStart:v 
   print v
    v = 1544074830000
   let us print the value of windowEnd:v 
   print v
    v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
    if (windowStartOffset.isDefined) {
         output.setField(
           lastFieldPos + windowStartOffset.get,
           SqlFunctions.internalToTimestamp(windowStart))
       }
   
   if (windowEndOffset.isDefined) {
         output.setField(
           lastFieldPos + windowEndOffset.get,
           SqlFunctions.internalToTimestamp(windowEnd))
       }
   `
   
   before execute,the output is 
    output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
    output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06
05:40:33.0,null"
   
   so,do you think the 
   long value 1544074830000  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
    would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be write to kafka,before write ,the data will be serialized
   let us see what happened!
   
   the call stack is as follows:
   ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
(DateSerializer.java:41)
     [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
(DateSerializer.java:48)
     [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
(DateSerializer.java:15)
     [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
(DefaultSerializerProvider.java:130)
     [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
(ObjectMapper.java:2,444)
     [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
(ObjectMapper.java:2,586)
     [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189)
     [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128)
     [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102)
     [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51)
     [11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue
(KeyedSerializationSchemaWrapper.java:46)
     [12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355)
     [13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56)
     [14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
(OperatorChain.java:560)
     [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:535)
     [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:515)
     [17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:679)
     [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:657)
     [19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41)
     [20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
(OperatorChain.java:560)
     [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:535)
     [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:515)
     [23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:679)
     [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:657)
     [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
     [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
     [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
     [28] DataStreamCalcRule$88.processElement (null)
     [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66)
     [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35)
     [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
     [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
(OperatorChain.java:560)
     [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:535)
     [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
(OperatorChain.java:515)
     [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:679)
     [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
(AbstractStreamOperator.java:657)
     [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
     [38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65)
     [39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
(IncrementalAggregateWindowFunction.scala:74)
     [40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
(IncrementalAggregateTimeWindowFunction.scala:72)
     [41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
(IncrementalAggregateTimeWindowFunction.scala:39)
     [42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
(InternalSingleValueWindowFunction.java:46)
     [43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
(WindowOperator.java:550)
     [44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
(WindowOperator.java:505)
     [45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
(HeapInternalTimerService.java:266)
     [46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
(SystemProcessingTimeService.java:281)
     [47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
     [48] java.util.concurrent.FutureTask.run (FutureTask.java:266)
     [49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201
(ScheduledThreadPoolExecutor.java:180)
     [50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
     [51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
     [52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
     [53] java.lang.Thread.run (Thread.java:748)
   `
   and the code is as follows:
   `  protected long _timestamp(Date value)
     {
       return value == null ? 0L : value.getTime();
     }`
   
   here,use windowEnd for example,the value is  
   value = "2018-12-06 05:40:33.0"
    value.getTime() = 1544046033000
   
   see,the initial value is 1544074833000   and the final value is 1544046033000
   
   the minus value is 28800000,       --->    8 hours ,because I am in China.
   ------------------------------------------------------------------------------------------
   
   why? the key reason is SqlFunctions.internalToTimestamp
     public static Timestamp internalToTimestamp(long v)
     {
       return new Timestamp(v - LOCAL_TZ.getOffset(v));
     }
   
   in the code, It minus the LOCAL_TZ ,  I think it is redundant!
   
   
   
   
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> ----------------------------------------------------------------
>
>                 Key: FLINK-11010
>                 URL: https://issues.apache.org/jira/browse/FLINK-11010
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.6.2
>            Reporter: lamber-ken
>            Assignee: lamber-ken
>            Priority: Major
>              Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() but the
long value will be automatically wrapped to a Timestamp with the following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message