spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dhaval Patel <mailto.dhava...@gmail.com>
Subject Re: Spark Kafka Streaming making progress but there is no data to be consumed
Date Thu, 12 Sep 2019 02:03:00 GMT
Hi Charles,

Can you check is any of the case related to output directory and checkpoint
location mentioned in below link is applicable in your case?

https://kb.databricks.com/streaming/file-sink-streaming.html

Regards
Dhaval

On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz <brkyvz@gmail.com> wrote:

> Hey Charles,
> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
> every microbatch, because:
>  1. Spark will figure out a range of offsets to process (let's call them x
> and y)
>  2. If these offsets have fallen out of the retention period, Spark will
> try to set the offset to x which is less than z > y > x.
>  3. Since z > y, Spark will not process any of the data
>  4. Goto 1
>
> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh <mig.flanker@gmail.com>
> wrote:
>
>> Hi Sandish,
>>
>> as I have said if the offset reset happens only once that would make
>> sense. But I am not sure how to explain why the offset reset is happening
>> for every micro-batch...
>> ideally once the offset reset happens the app should move to a valid
>> offset and start consuming data. but in my case for every batch the offset
>> is getting reset and no data is ever getting generated.
>>
>> Thanks,
>> Charles
>>
>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN <sanysandish@gmail.com>
>> wrote:
>>
>>> You can see this kind of error, if there is consumer lag more than Kafka
>>> retention period.
>>> You will not see any failures if below option is not set.
>>>
>>> Set failOnDataLoss=true option to see failures.
>>>
>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh <mig.flanker@gmail.com>
>>> wrote:
>>>
>>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
>>>> and *fetch.message.max.bytes. *
>>>>
>>>> *"*may be that you are trying to process records that have passed the
>>>> retention period within Kafka.*"*
>>>> If the above is true then I should have my offsets reset only once
>>>> ideally when my application starts. But mu offsets are resetting for every
>>>> batch. if my application is using offsets that are no longer available in
>>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>>> next request made to Kafka should provide proper data. But in case for all
>>>> micro-batches the offsets are getting reseted and the batch is producing
no
>>>> data.
>>>>
>>>>
>>>>
>>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brkyvz@gmail.com> wrote:
>>>>
>>>>> Do you have rate limiting set on your stream? It may be that you are
>>>>> trying to process records that have passed the retention period within
>>>>> Kafka.
>>>>>
>>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flanker@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to run a spark application ingesting data from Kafka
>>>>>> using the Spark structured streaming and the spark library
>>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very
weird
>>>>>> issue where during execution of all my micro-batches the Kafka consumer
is
>>>>>> not able to fetch the offsets and its having its offsets reset as
show
>>>>>> below in this log
>>>>>>
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting
offset for partition my-topic-5 to offset 1168959116.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting
offset for partition my-topic-1 to offset 1218619371.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting
offset for partition my-topic-8 to offset 1157205346.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting
offset for partition my-topic-21 to offset 1255403059.
>>>>>>
>>>>>>
>>>>>> It is reasonable if this resetting happens once in application due
to
>>>>>> the fact that the offsets stored in my checkpoint are no longer valid
and
>>>>>> will have to reset our offsets to a new value. But I am seeing this
reset
>>>>>> happening for every micro batch execution in my streaming job. In
at the
>>>>>> end the streaming query progress emits the following
>>>>>>
>>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made
progress: {
>>>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>>>>   "name" : null,
>>>>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>>>>   "batchId" : 189,
>>>>>>   "numInputRows" : 0,
>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>   "durationMs" : {
>>>>>>     "addBatch" : 127,
>>>>>>     "getBatch" : 0,
>>>>>>     "getEndOffset" : 0,
>>>>>>     "queryPlanning" : 24,
>>>>>>     "setOffsetRange" : 36,
>>>>>>     "triggerExecution" : 1859,
>>>>>>     "walCommit" : 1032
>>>>>>   },
>>>>>>   "stateOperators" : [ ],
>>>>>>   "sources" : [ {
>>>>>>     "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>>>>>     "startOffset" : {
>>>>>>       "my_kafka_topic" : {
>>>>>>         "23" : 1206926686,
>>>>>>         "8" : 1158514946,
>>>>>>         "17" : 1258387219,
>>>>>>         "11" : 1263091642,
>>>>>>         "2" : 1226741128,
>>>>>>         "20" : 1229560889,
>>>>>>         "5" : 1170304913,
>>>>>>         "14" : 1207333901,
>>>>>>         "4" : 1274242728,
>>>>>>         "13" : 1336386658,
>>>>>>         "22" : 1260210993,
>>>>>>         "7" : 1288639296,
>>>>>>         "16" : 1247462229,
>>>>>>         "10" : 1093157103,
>>>>>>         "1" : 1219904858,
>>>>>>         "19" : 1116269615,
>>>>>>         "9" : 1238935018,
>>>>>>         "18" : 1069224544,
>>>>>>         "12" : 1256018541,
>>>>>>         "3" : 1251150202,
>>>>>>         "21" : 1256774117,
>>>>>>         "15" : 1170591375,
>>>>>>         "6" : 1185108169,
>>>>>>         "24" : 1202342095,
>>>>>>         "0" : 1165356330
>>>>>>       }
>>>>>>     },
>>>>>>     "endOffset" : {
>>>>>>       "my_kafka_topic" : {
>>>>>>         "23" : 1206928043,
>>>>>>         "8" : 1158516721,
>>>>>>         "17" : 1258389219,
>>>>>>         "11" : 1263093490,
>>>>>>         "2" : 1226743225,
>>>>>>         "20" : 1229562962,
>>>>>>         "5" : 1170307882,
>>>>>>         "14" : 1207335736,
>>>>>>         "4" : 1274245585,
>>>>>>         "13" : 1336388570,
>>>>>>         "22" : 1260213582,
>>>>>>         "7" : 1288641384,
>>>>>>         "16" : 1247464311,
>>>>>>         "10" : 1093159186,
>>>>>>         "1" : 1219906407,
>>>>>>         "19" : 1116271435,
>>>>>>         "9" : 1238936994,
>>>>>>         "18" : 1069226913,
>>>>>>         "12" : 1256020926,
>>>>>>         "3" : 1251152579,
>>>>>>         "21" : 1256776910,
>>>>>>         "15" : 1170593216,
>>>>>>         "6" : 1185110032,
>>>>>>         "24" : 1202344538,
>>>>>>         "0" : 1165358262
>>>>>>       }
>>>>>>     },
>>>>>>     "numInputRows" : 0,
>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>   } ],
>>>>>>   "sink" : {
>>>>>>     "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> In the above StreamingQueryProgress event the numInputRows fields
 is
>>>>>> zero and this is the case for all micro batch executions and no data
is
>>>>>> being produced whatsoever. So basically for each batch my offsets
are being
>>>>>> reset and each batch is producing zero rows. Since there is no work
being
>>>>>> done and since dynamic allocation is enabled all my executors killed...
I
>>>>>> have tried deleting my checkpoint and started my application from
scratch
>>>>>> and I am still facing the same issue. What could possibly be wrong
this?...
>>>>>> what lines of investigation should I take?  If you are interested
in
>>>>>> getting Stackoverflow point you can answer my question in SO here
>>>>>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Charles
>>>>>>
>>>>>>
>>>>> --
>>> Sent from Gmail Mobile
>>>
>>

Mime
View raw message