beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rick Lin (JIRA)" <j...@apache.org>
Subject [jira] [Created] (BEAM-3770) The problem of kafkaIO sdk for data latency
Date Fri, 02 Mar 2018 01:05:00 GMT
Rick Lin created BEAM-3770:
------------------------------

             Summary: The problem of kafkaIO sdk for data latency
                 Key: BEAM-3770
                 URL: https://issues.apache.org/jira/browse/BEAM-3770
             Project: Beam
          Issue Type: Improvement
          Components: io-java-kafka
    Affects Versions: 2.0.0
         Environment: For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS
JAVA: JDK 1.7
Beam 2.0.0 (with Direct runner)
Kafka 2.10-0.10.1.1
Maven 3.5.0, in which dependencies are listed in pom.xml:
<dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>2.0.0</version>
    </dependency>
<dependency>
   <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>2.0.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-kafka</artifactId>
   <version>2.0.0</version>       
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.0.1</version>
</dependency>
            Reporter: Rick Lin
            Assignee: Raghu Angadi
             Fix For: 2.0.0


Dear all,

 I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

 With using this sdk, there are a situation about *data* *latency*, and the description of
situation is in the following.

 The data come from kafak with a fixed speed: 100 data size/ 1 sec.

 I create a fixed window within 1 sec without delay. I found that the data size is 70, 80,
104, or greater than or equal to 104.

 After one day, the data latency happens in my running time, and the data size will be only
10 in each window.

 *In order to clearly explain it, I also provide my code in the following.* 

" PipelineOptions readOptions = PipelineOptionsFactory._create_();

*final* Pipeline p = Pipeline._create_(readOptions);

 PCollection<TimestampedValue<KV<String, String>>> readData =

  p.apply(KafkaIO.<String, String>_read_()       

     .withBootstrapServers("127.0.0.1:9092")

     .withTopic("kafkasink")

     .withKeyDeserializer(StringDeserializer.*class*)

     .withValueDeserializer(StringDeserializer.*class*)

     .withoutMetadata())

     .apply(ParDo._of_(*new* +DoFn<KV<String, String>, TimestampedValue<KV<String,
String>>>()+ {

        @ProcessElement

        *public* *void* test(ProcessContext c) *throws* ParseException {

            String element = c.element().getValue();

            *try* {

              JsonNode arrNode = *new* ObjectMapper().readTree(element);

              String t = arrNode.path("v").findValue("Timestamp").textValue();

              DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/uuuu
HH:mm:ss.SSSS");

             LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);

             java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant();

             Instant timestamp  = *new* Instant(java_instant.toEpochMilli());

              c.output(TimestampedValue._of_(c.element(), timestamp));

            } *catch* (JsonGenerationException e) {

                e.printStackTrace();

            } *catch* (JsonMappingException e) {

                e.printStackTrace();

          } *catch* (IOException e) {

                e.printStackTrace();

          }

        }}));

 PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(

      Window.<TimestampedValue<KV<String, String>>> _into_(FixedWindows._of_(Duration._standardSeconds_(1))

          .withOffset(Duration.*_ZERO_*))

          .triggering(AfterWatermark._pastEndOfWindow_()           

             .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()

               .plusDelayOf(Duration.*_ZERO_*)))

          .withAllowedLateness(Duration.*_ZERO_*)

          .discardingFiredPanes());"

 *In addition, the running result is as shown in the following.*

"data-size=104

coming-data-time=2018-02-27 02:00:49.117

window-time=2018-02-27 02:00:49.999

 data-size=78

coming-data-time=2018-02-27 02:00:50.318

window-time=2018-02-27 02:00:50.999

 data-size=104

coming-data-time=2018-02-27 02:00:51.102

window-time=2018-02-27 02:00:51.999

 After one day:

data-size=10

coming-data-time=2018-02-28 02:05:48.217

window-time=2018-03-01 10:35:16.999 "

If you have any idea about the problem (data latency), I am looking forward to hearing from
you.

Thanks

Rick



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message