spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alz2 <>
Subject Re: Writing custom Structured Streaming receiver
Date Tue, 05 Jun 2018 15:55:02 GMT
I'm implementing a simple Structured Streaming Source with the V2 API in
Java. I've taken the Offset logic (regarding startOffset, endOffset,
lastCommittedOffset, etc) from the socket source and also your receivers. 

However, upon start up for some reason Spark says that the initial offset or
-1, is immediately available. Because -1 is available and not committed, my
streaming query gets triggered with an empty data buffer. After the query
runs, -1 is added to the StreamExecution's commitedOffsets. The issue gets
worse from here: as new data is pushed into the internal data buffer, my
currentOffset gets immediately committed (it appears in the
StreamExecution's commitedOffsets). So as my currentOffset changes because
new data is pushed into the data buffer, it appears both in availableOffsets
and committedOffsets, causing no new batches to run. 

The interesting thing is my commit function never gets run -- printing out
stuff from inside the function doesn't change behavior and even providing an
empty commit function doesn't change behavior.

Any ideas where or why my Offsets are getting committed?

Any help would be appreciated!

Here are my relevant code snippets
// instance var declarations
private Offset startOffset = null;
private Offset endOffset = null;
private volatile static currentOffset = new SocketOffset(-1);
private SocketOffset lastOffsetCommitted = new SocketOffset(-1);

public Offset getEndOffset() { // getStartOffset is the same except with
  if (this.endOffset == null) throw....
  return this.endOffset

public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
  this.startOffset = start.orElse(new SocketOffset(-1));
  this.endOffset = end.orElse(currentOffset);

Sent from:

To unsubscribe e-mail:

View raw message