samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini" <criccom...@apache.org>
Subject Re: Review Request 32147: SAMZA-465
Date Thu, 19 Mar 2015 20:55:07 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review77063
-----------------------------------------------------------


* Comb over all docs and update checkpoint manager docs. 
* Add some documentation to docs directory for coordinator stream. This is a new component,
and fairly critical to how Samza works. We should document it clearly.
* Patch doesn't seem to apply against master.
* Will need a follow-on ticket to write a topic migrator, so we can migrate 0.8.0 checkpoint
topics to 0.9.0. This should be done automatically, without any human intervention.
* It seems like JsonSerde is deleted. Was this intentional?


samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124887>

    Does Checkpoint need to be in the samza-api package? Seems like we should just move it
to core to hide it from users, since they don't need it.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124888>

    I'm confused. getMessageMap always returns a message with no values?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124894>

    I think the source is usually the container ID (unless a CLI is used).



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124895>

    Might want to provide an explicit example of SSP to show how it's serialized (as system.stream.partition
.. kafka.AdViewEvent.0)



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124891>

    I think that there might be enough messages (config, checkpoint, changelog) to warrnat
distinct class files for each one. Probably in a package like org.apache.samza.coordinator.stream.messages



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124896>

    Will this work? If getMessageMap is always returning a map with no values...



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124899>

    Nit: SetChangelog -> SetChangelogMapping



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124903>

    Same source comment as above.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124904>

    Constant for "Partition", like set-changelog
    
    Nit: lowercase "partition" for consistency with other message constants.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124907>

    We don't care about ordering for anything right now, right?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124905>

    Nit: single-line if() {



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124906>

    Probably debug here, otherwise it'll be a bit verbose, I think.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124909>

    Javadocs.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124908>

    nit: if() {
      ...
    }



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
<https://reviews.apache.org/r/32147/#comment124910>

    nit: if() {
      ...
    }



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124947>

    I think this can be deleted.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124946>

    protected



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124948>

    Map[,]()
    
    with `import scala.collection.JavaConversions._`



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124967>

    Should not be static objects. Create in apply, and pass around as needed. Given how simple
getCheckpointManager and getChangelogManager are, I think those methods should be deleted,
and everything should just be done in apply()



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124969>

    Do you need to register taskNames here? Seems like kind of a chicken-and-egg problem.
I think taskNames aren't used in changelogManager.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124974>

    Can we do:
    
        val offsetMap = systemStreamPartition
          .map(ssp => (ssp -> null))
          .toMap ++ checkpoint.getOffsets



samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
<https://reviews.apache.org/r/32147/#comment124976>

    This is really weird. JsonSerde with TestJsonSerde class in it? Is RB messed up?



samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124983>

    See comment in CheckpointTool about making CheckpointManager injectable.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124984>

    See comment in CheckpointTool about making CheckpointManager injectable.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124980>

    I remember that we talked about this, but why are we deleting this rather than fixing
again?



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124985>

    nit: double new line



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124986>

    nit: = {
    on single line



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
<https://reviews.apache.org/r/32147/#comment124987>

    I love to see a plan come together. :D



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
<https://reviews.apache.org/r/32147/#comment124868>

    Waiting for an hour seems a bit excessive. Was there any reason this was changed?



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/32147/#comment124859>

    Move below the apache docs.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
<https://reviews.apache.org/r/32147/#comment124860>

    Does this class do anything?



samza-api/src/main/java/org/apache/samza/container/TaskName.java
<https://reviews.apache.org/r/32147/#comment124861>

    Could you add an explanation as to why it should only contain the task name?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124880>

    Can we put this in org.apache.samza.storae?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124877>

    A slightly better name for this is probably ChangelogPartitionManager, or something like
that. I worry that ChangelogManager is too close to TaskStorageManager.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124886>

    Never used. Did you mean to validate like you do in CheckpointManager?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124871>

    Checkpoints? This is the changelog manager.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124873>

    Same as above.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124874>

    Is calling register() on the coordinatorStreamConsumer multiple times OK? Since it takes
no param, it seems like you'd only need to call it once, or not at all (could just be invoked
interally when start() is called).



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124875>

    Seems like this logic should get folded into the CoordinatorStreamConsumer. CheckpointManager
and ChangelogManager both seem to have to nearly the same logic.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124878>

    Nit: might want to import CoordinatorStreamMessage.SetChangelogMapping, just to make things
a bit more succinct.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124879>

    "changelogEntries", since each row results in a new message.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124882>

    Given how similar this class is to ChangelogManager, does it make sense to just have one
CoordinatorStreamManager that has read/write methods for all of the CoordinatStreamMessage
types? Seems like it would allow us to delete a fair amount of code.
    
    Or, perhaps we should write a base class, and extend from it on a per-message-type basis?



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124863>

    Javadocs.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124881>

    Seems like a dupe of what's in ChangelogManager. Think this logic should be moved to CoordinatorStreamConsumer,
and used in both places. Or maybe moved to a shared base class.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124885>

    else we should probably do something nasty like throw an exception.



samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
<https://reviews.apache.org/r/32147/#comment124934>

    Did IntelliJ generate this (and hashCode)?



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment124943>

    Is this in Util, or does any other code do something similar? Seems like it belongs in
Util.



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment124944>

    Same Util question as above.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124982>

    Can you make CheckpointTool take a CheckpointManager param, and add an apply() method
to the companion object, which builds a CheckpointTool object given config and newOffsets?
That way you can inject CheckpointManagers for unit testing rather than having the setCheckpointManager
method below.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124949>

    filter(_._2 != null)



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/32147/#comment124965>

    nit: I think we've been using "Checkpoint" not "CheckPoint" in most places.



samza-test/src/main/config/join/common.properties
<https://reviews.apache.org/r/32147/#comment124989>

    Should this be kafka-checkpoints? Also, should we rename that system? also^2, do we need
two systems anymore?



samza-test/src/main/config/negate-number.properties
<https://reviews.apache.org/r/32147/#comment124988>

    Is this required? There's only one system defined.



samza-test/src/main/config/perf/container-performance.properties
<https://reviews.apache.org/r/32147/#comment124990>

    Does this matter? Should coordinator stream always start from the oldest offset?



samza-test/src/main/python/samza_failure_testing.py
<https://reviews.apache.org/r/32147/#comment124869>

    Convention is usually #!/.... with no space.
    
    Also, `#!/usr/bin/env python` is usually the preferred location, I think (space after
env is intentional). See:
    
    http://stackoverflow.com/questions/2429511/why-do-people-write-usr-bin-env-python-on-the-first-line-of-a-python-script


- Chris Riccomini


On March 17, 2015, 11:25 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated March 17, 2015, 11:25 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 0a268ac7a3819cf46b54a93e0e3171455371456a 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 593d11872430100e000c7d4b6edc5ef29649d8d4

>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 092cb910b40d312217e86420bf1ddfbaf605e9e5

>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java a97ff0919d8205928efee1a2a20780659180849d

>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 083358686fc69ab45bbc73e898f419224ebc3a9f

>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 8995ba30c823bddcdfd3af7100e1440e71ef7998

>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d

>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java eb22d2ec5f09ca59790e2871d9bff9745fe925dc

>   samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ddc30af7c52d8a4d5c5de02f6757c040b1f31c93

>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala a40c87fa7865746a5612c55a4cf24c8d005be7e0

>   samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 3b6685e00837a4aaf809813e62b7e52823bc07a9

>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02

>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala adef09e15c666cb2dbb2e4c5507fc2d605b82a1e

>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 1ca9e2cc5673c537b6a48224809847e94da81fca

>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 275eb1a924d09a0a43efe6273e0d2af9217e1c74

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala c14f2f623bb4bae911dd3085ce428175930e4545

>   samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala 10986a49b39cda703a0e54688dc914f2465c79c9

>   samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala 635c3531a897a369c813821f7b901186e1281ed1

>   samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc

>   samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b80d34953a54ada461ed1d4b0dcfa07f4435f877

>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 530255e5866bc49ec5ce1a0b7437470cd4e17010

>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 744eec05857a4ea14c718e3750fb575d3678b1f8

>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1a67586eeec95dabfeb3b6881af9b3865c3029ca

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 9348c7de956ebf02f58a163dc6fb391a7e29ae64

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala af800dfeedbfea75abaac3f15fd53bc55b743daf

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala d18d4c45c5de3b50a24d6c776364e1f589db8f4d

>   samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
10ff1f437220b38810f8a32903cc72df20f206d3 
>   samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe

>   samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala a8e5d36921464a2e36693279e8083e4544c4e289

>   samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 60460713a2d4f7b7b389f21c1450d45c1afaa0f4

>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
4a1b31f025ba7b05a7b46041aa8e12074599ce24 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
7fc6d89b5d703a7c10a212aaa8d3f9231996b897 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 02a6ea94daf9eb597c9ecef5d63062df5861efc8

>   samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala a34c3f2738855dbf3737639c33846fcad23bd3b9

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b790be17cfe08da28220ffb381cbd618ebe25cf0

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 4f15002325bc0154991f9a35312e903d15ef81e7

>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
b76d5ad68640908bef552125d405b467386025f8 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
7d4bea8398794c2325f9c022074303a83cfb164a 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 
>   samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 4ef3551f470e77e27bd156e81ce96486f25c21bf

>   samza-test/src/main/config/join/common.properties ad10aac090beb072ecce3546f06279a7a6113970

>   samza-test/src/main/config/negate-number.properties b9f898c745250252461c833adb05e24ece2d4a89

>   samza-test/src/main/config/perf/container-performance.properties 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053

>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java c0a20af5a2f4329ad4a2cff378ced3bececbc1cb

>   samza-test/src/main/python/samza_failure_testing.py 198db26528cab8b473f794a922848a60299dc825

>   samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
a8b724bf781003142e455fdf1fed2f13d6c18353 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 03395e2efa0fec723e354177d06bfacf7d2a9215

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala a1dbe0435ae08c710d4bfc871458ed386e275cd2

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 24b11da06a69da734c85720ef39d65ee46d821d5

>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to a single
entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called changelog manager
will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting offsets from
jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of existing Unit tests
as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the right reference
to it
> 2. The coordinator stream does not use the same consumer and producer from the "systemconsumers"
and "systemproducers" we have (This will be done after SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the coordinator stream
and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message