spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Joshi <mailtojoshia...@gmail.com>
Subject Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]
Date Tue, 08 Dec 2020 11:57:32 GMT
Hi All,

Can someone pls hellp with this.

Thanks

On Tuesday, December 8, 2020, Amit Joshi <mailtojoshiamit@gmail.com> wrote:

> Hi Gabor,
>
> Pls find the logs attached. These are truncated logs.
>
> Command used :
> spark-submit --verbose --packages org.apache.spark:spark-sql-
> kafka-0-10_2.12:3.0.1,com.typesafe:config:1.4.0 --master yarn
> --deploy-mode cluster --class com.stream.Main --num-executors 2
> --driver-memory 2g --executor-cores 1 --executor-memory 4g --files
> gs://x/jars_application.conf,gs://x/log4j.properties
> gs://x/a-synch-r-1.0-SNAPSHOT.jar
> For this I used a snapshot jar, not a fat jar.
>
>
> Regards
> Amit
>
> On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
> wrote:
>
>> Well, I can't do miracle without cluster and logs access.
>> What I don't understand why you need fat jar?! Spark libraries normally
>> need provided scope because it must exist on all machines...
>> I would take a look at the driver and executor logs which contains the
>> consumer configs + I would take a look at the exact version of the consumer
>> (this is printed also in the same log)
>>
>> G
>>
>>
>> On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi <mailtojoshiamit@gmail.com>
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> The code is very simple Kafka consumption of data.
>>> I guess, it may be the cluster.
>>> Can you please point out the possible problem toook for in the cluster?
>>>
>>> Regards
>>> Amit
>>>
>>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>> wrote:
>>>
>>>> + Adding back user list.
>>>>
>>>> I've had a look at the Spark code and it's not modifying "partition.assignment.strategy"
>>>> so the problem
>>>> must be either in your application or in your cluster setup.
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi <
>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>
>>>>> It's super interesting because that field has default value:
>>>>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>>>>
>>>>> On Mon, 7 Dec 2020, 10:51 Amit Joshi, <mailtojoshiamit@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thnks for the reply.
>>>>>> I did tried removing the client version.
>>>>>> But got the same exception.
>>>>>>
>>>>>>
>>>>>> Thnks
>>>>>>
>>>>>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>>>>> library:
>>>>>>>
>>>>>>> <kafka.version>2.4.1</kafka.version>
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>>>>> gschiavonspark@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I think the issue is that you are overriding the kafka-clients
that
>>>>>>>> comes with  <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'd try removing the kafka-clients and see if it works
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi <mailtojoshiamit@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am running the Spark Structured Streaming along with
Kafka.
>>>>>>>>> Below is the pom.xml
>>>>>>>>>
>>>>>>>>> <properties>
>>>>>>>>>     <maven.compiler.source>1.8</maven.compiler.source>
>>>>>>>>>     <maven.compiler.target>1.8</maven.compiler.target>
>>>>>>>>>     <encoding>UTF-8</encoding>
>>>>>>>>>     <!-- Put the Scala version of the cluster -->
>>>>>>>>>     <scalaVersion>2.12.10</scalaVersion>
>>>>>>>>>     <sparkVersion>3.0.1</sparkVersion>
>>>>>>>>> </properties>
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>org.apache.kafka</groupId>
>>>>>>>>>     <artifactId>kafka-clients</artifactId>
>>>>>>>>>     <version>2.1.0</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>     <artifactId>spark-core_2.12</artifactId>
>>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>>>     <scope>provided</scope>
>>>>>>>>> </dependency>
>>>>>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql
-->
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>     <artifactId>spark-sql_2.12</artifactId>
>>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>>>     <scope>provided</scope>
>>>>>>>>> </dependency>
>>>>>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
-->
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>> Building the fat jar with shade plugin. The jar is running
as expected in my local setup with the command
>>>>>>>>>
>>>>>>>>> *spark-submit --master local[*] --class com.stream.Main
--num-executors 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g prism-event-synch-rta.jar*
>>>>>>>>>
>>>>>>>>> But when I am trying to run same jar in spark cluster
using yarn with command:
>>>>>>>>>
>>>>>>>>> *spark-submit --master yarn --deploy-mode cluster --class
com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory
4g  gs://jars/prism-event-synch-rta.jar*
>>>>>>>>>
>>>>>>>>> Getting the this exception:
>>>>>>>>>
>>>>>>>>> 	
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org
<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
by: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy"
which has no default value.	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>>>>>>>
>>>>>>>>> I have tried setting up the "partition.assignment.strategy",
then also its not working.
>>>>>>>>>
>>>>>>>>> Please help.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>>
>>>>>>>>> Amit Joshi
>>>>>>>>>
>>>>>>>>>

Mime
View raw message