samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini" <criccom...@apache.org>
Subject Re: Review Request 22215: SAMZA-123
Date Thu, 05 Jun 2014 21:23:57 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review44817
-----------------------------------------------------------


I'm not wild about SSP* for class names. Thus far the convention has been SystemStreamPartition*.
I'd rather have it all one way, or all the other. If the gripe is that the SystemStreamPartition
name is too long, that's being discussed in another ticket (rename to Stream/StreamPartition),
and I believe you had said you had a completely alternative naming idea as well. Can we just
stick with the existing class name convention, and discuss the naming issue separately?

The taskName: String style feels weird to me. First, does it make sense to use a class instead
of a string? That was the point of using Partition initially, instead of just an int. Second,
it seems to me that taskId is better than taskName now, after seeing it in code. I think either
Sriram or Jay was pushing this name as well. What do you think about a TaskId class instead
of a Partition class?

I'm not a fan of the way we handle the taskName/partition mapping in the checkpoint manager.
Can we just put the mapping in the checkpoint itself, rather than having distinct messages
for it?

All new classes need docs (e.g. SSPTaskNameGrouper, GroupByPartition, etc). It'd also be nice
to have some docs on the website as well. Maybe in the container section.


build.gradle
<https://reviews.apache.org/r/22215/#comment79359>

    Can we just move SSPGrouperTestBase to samza-core instead? Szczepan says that this method
of pulling in test source is not supported, and hacky, so I'd like to limit it as much as
possible.
    
    The recommended alternative is apparently to create a samza-test submodule that contains
all of the test code. Since we already have a samza-test submodule that's really integration
tests, we'd have to either move the samza-test stuff to an integration test submodule, or
create a second samza-test module (to avoid circular dependencies: samza-core -> samza-test
-> samza-core).
    
    To me, easiest fix seems to be to just move the one class to samza-core.



check
<https://reviews.apache.org/r/22215/#comment79357>

    Add license header.
    
    Add brief docs describing what this is for.



check
<https://reviews.apache.org/r/22215/#comment79358>

    Add eclipse.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79373>

    This all seems a bit hacky. It'd be better if we could have this passed in through the
constructor, and not have mutable methods.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79360>

    Is this code-gen'd? Wan't to make sure we're not manually writing equals() since it's
error-prone.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79361>

    Is this code-gen'd? Wan't to make sure we're not manually writing hashCode() since it's
error-prone.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79362>

    specified partition -> specified task



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79364>

    Why do we ned this method if we can get the taskName to partition mapping from checkpoints?
Is this just a convenience method to read the mapping across all taskNames?
    
    If so, I'd rather keep this out of the interface, and just provide a Util method to handle
this.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79365>

    Why do we ned this method if we can set the taskName to partition mapping for each checkpoint?
Is this just a convenience method to write the mapping once for all taskNames, since it's
static?
    
    If so, I'd rather keep this out of the interface, and just provide a Util method to handle
this.



samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java
<https://reviews.apache.org/r/22215/#comment79366>

    Docs are cute at the expense of readability. Can you just be direct here?



samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
<https://reviews.apache.org/r/22215/#comment79367>

    taskNames?
    
    Seems redundant to call it taskNameKeys since taskNames are keys by definition, right?



samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment79368>

    lost the "set" verb in method name here.



samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment79369>

    I'm a little confused here. Isn't the mapping a mapping from taskName to a set of partitions?
Why is the type a string here?



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/22215/#comment79370>

    I'm assuming this is still to-be-implemented. I think you mentioned this in the JIRA.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment79372>

    Can we just import scala.collection._ to make this a little more succinct?



samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
<https://reviews.apache.org/r/22215/#comment79377>

    I'm confused. These configs are named the same thing. Is this intentional? Seems like
maybe SSP_TASK_NAME_GROUPER_FACTORY should be deleted. It's unused.



samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
<https://reviews.apache.org/r/22215/#comment79401>

    Just so I'm clear, this is only used for the state store mapping? If a job doesn't use
state stores, this env variable doesn't really get used?



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79399>

    Can we break all of this into a separate method in the SamzaContainer object just to try
and keep apply() no worse than it was before. The method is already kind of a mess, so I'm
trying to not make it worse.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79400>

    This log line is a little dated. There is no longer a partition manager. A more accurate
statement would be that the SystemAdmin wasn't able to find any partitions for input streams.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79404>

    Do we need the asJavaCollection stuff? We're importing JavaConversions._ already



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79407>

    Do we need the type here? Seems redundant.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/22215/#comment79409>

    Any way to make this a little cleaner/more succinct? Totally a nit, but these long scala.*
calls just bug me. If you feel strongly about it, I'm ok the way it is, but just voicing my
preference.



samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala
<https://reviews.apache.org/r/22215/#comment79413>

    Oh dear. I feel like the better thing to do here is just to have a custom object that
doesn't have to deal with all the Scala junk.



samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala
<https://reviews.apache.org/r/22215/#comment79416>

    This should be plural (TaskNamesToSSPs) since its a map from multiple taskNames to multiple
SSPs.



samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala
<https://reviews.apache.org/r/22215/#comment79415>

    Can we just use the same pattern we've been using here? Either job.num.sets or task.num.sets
or something?



samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment79417>

    I think you've renamed it because it clashes with the new "taskName" nomenclature. I don't
think this is a jobName, though, right? Isn't it more of a containerName?



samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment79418>

    I am having trouble following this code. I don't really have any good suggestions, but
I think it could maybe use a little more cleanup. Maybe I'm in the minority, though. Anyway...
I'm confused. :)



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment79420>

    Clean up.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment79427>

    I think I'm confused. We can assign multiple taskNames to the same task? I thought we
had a 1:1 between TaskInstance -> taskName -> SSPs?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment79429>

    Skipping review of this class. I think I need a walkthrough to understand what changes
are being made here.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
<https://reviews.apache.org/r/22215/#comment79432>

    Delete or uncomment. Not sure what you're going for here.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment79437>

    Still confused. I thought task:taskName mapping was 1:1. Probably forgetting things. Can
you explain this to me again?



samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment79438>

    Not to excited to delete all of these tests without corresponding tests for taskNames.


- Chris Riccomini


On June 3, 2014, 7:29 p.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated June 3, 2014, 7:29 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   build.gradle 1a1db16 
>   check PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java dcf81bf 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 34f50fd

>   samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java PRE-CREATION

>   samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 5aa7a8f

>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 5ec6433 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507e 
>   samza-api/src/test/java/org/apache/samza/container/SSPGrouperTestBase.java PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 5735a39

>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 9487b58 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 4c2d365

>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 7ca8af6 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 99a9841 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 7502124

>   samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySSP.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSSPTaskNameGrouper.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala e20e7c1

>   samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 2ed8d7d

>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 7214151

>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1b548fd 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala bc54f9e

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 552f8c2

>   samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
50d9a05 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala fa10231 
>   samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 190bdfe

>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 1f5e3bb

>   samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySSP.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupInNSets.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala 70d8c80

>   samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 12f1e03

>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b8c369b 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
62c91e8 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
cb6dbdf 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
92ac61e 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala dae3c2c

>   samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 222c130

>   samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
5b9b926 
>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
10502a9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala c28c9a6 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 01a2683

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
eb1ff54 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
17a96f0 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
0442580 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message