> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line
31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> > How likely are we to collide with this? Thats the problem with using a user
definable token. I see two options:
> >
> > If null is not supported (and thus not usable by user-defined implementations)
I would use that and mark it as reserved.
> >
> > Otherwise I would probably do something more to make this unlikely to collide
(call me paranoid). Something like use a NUL byte as the first character and document that
offsets with such an encoding are reserved. I would also check that this sort of string doesn't
make it to user code in the task.
>
> Jagadish Venkatraman wrote:
> Returning a null is not possible (because a null offset could mean that we don't
have messages at this moment instead of meaning end-of-stream. While we should poll again
when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping
to use a special offset.
>
> I like your suggestion of using a NUL byte as the first character (and calling that
out). I'll update the RB with that.
>
> Jagadish Venkatraman wrote:
> There seem to be inter-operability versions of strings in Java vs strings in scala
(esp - around handling NUL bytes in the string- Scala appears to strip out NUL bytes). Hence,
I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a string. Let me know if there's a better way
to handle this.
>
> Chris Pettitt wrote:
> Scala should definitely not be dropping any bytes. How could it safely do so?
>
> FWIW, you can verify:
>
> ```
> % scala
> scala> "\u0000".length
> res0: Int = 1
> ```
I managed to dig into the problem more.
Case 1: `public static final String STR = "\0END_OF_STREAM";`
When this Java constant is accessed from scala code - it seems to be encoded as
STR[0]= 65533, STR[1] = 65533, STR[2]= 'E', STR[3]='N'... and so on. When accessed from Java,
it 's correctly encoded with STR[0] = '\0'
Case 2 (current updated RB that solves this) : `private static final byte[] END_OF_STREAM_BYTES
= "\0END_OF_STREAM".getBytes();`
`public static final String STR = new String(END_OF_STREAM_BYTES, Charset.defaultCharset());`
When `STR` is now accessed from scala code - it seems to be encoded (correctly) as
STR[0] = '\0' STR[1]='E' STR[2]='N' STR[3]='D'..
While Case 2 fixes this, I wonder how portable the solution is. Do you have insight into what
could go on with different encodings?
- Jagadish
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------
On Sept. 6, 2016, 4:02 p.m., Jagadish Venkatraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
>
> (Updated Sept. 6, 2016, 4:02 p.m.)
>
>
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure),
Navina Ramesh, and Xinyu Liu.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Samza currently works with unbounded data sources (kafka streams). However, for bounded
data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'.
>
> This is a step towards realizing a 'finite' Samza job that terminates once data processing
is complete.(as opposed to an infinite stream job that keeps running)
>
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When
end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window
or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
>
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
>
>
> Diffs
> -----
>
> build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2
> checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06
> samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
a8f858aa7e4f4ce436f450cf439fe1a102983c64
> samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION
> samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296
> samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5
> samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652
> samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757
> samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764
>
> Diff: https://reviews.apache.org/r/51346/diff/
>
>
> Testing
> -------
>
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit
semantics.
>
>
> Thanks,
>
> Jagadish Venkatraman
>
>
|