spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Joshi <mailtojoshia...@gmail.com>
Subject Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]
Date Mon, 07 Dec 2020 21:14:00 GMT
Hi Gabor,

Pls find the logs attached. These are truncated logs.

Command used :
spark-submit --verbose --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.typesafe:config:1.4.0
--master yarn --deploy-mode cluster --class com.stream.Main --num-executors
2 --driver-memory 2g --executor-cores 1 --executor-memory 4g --files
gs://x/jars_application.conf,gs://x/log4j.properties
gs://x/a-synch-r-1.0-SNAPSHOT.jar
For this I used a snapshot jar, not a fat jar.


Regards
Amit

On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
wrote:

> Well, I can't do miracle without cluster and logs access.
> What I don't understand why you need fat jar?! Spark libraries normally
> need provided scope because it must exist on all machines...
> I would take a look at the driver and executor logs which contains the
> consumer configs + I would take a look at the exact version of the consumer
> (this is printed also in the same log)
>
> G
>
>
> On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi <mailtojoshiamit@gmail.com>
> wrote:
>
>> Hi Gabor,
>>
>> The code is very simple Kafka consumption of data.
>> I guess, it may be the cluster.
>> Can you please point out the possible problem toook for in the cluster?
>>
>> Regards
>> Amit
>>
>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somogyi@gmail.com>
>> wrote:
>>
>>> + Adding back user list.
>>>
>>> I've had a look at the Spark code and it's not
>>> modifying "partition.assignment.strategy" so the problem
>>> must be either in your application or in your cluster setup.
>>>
>>> G
>>>
>>>
>>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>> wrote:
>>>
>>>> It's super interesting because that field has default value:
>>>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>>>
>>>> On Mon, 7 Dec 2020, 10:51 Amit Joshi, <mailtojoshiamit@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thnks for the reply.
>>>>> I did tried removing the client version.
>>>>> But got the same exception.
>>>>>
>>>>>
>>>>> Thnks
>>>>>
>>>>> On Monday, December 7, 2020, Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>>>> library:
>>>>>>
>>>>>> <kafka.version>2.4.1</kafka.version>
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>>>> gschiavonspark@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think the issue is that you are overriding the kafka-clients
that
>>>>>>> comes with  <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>>>
>>>>>>>
>>>>>>> I'd try removing the kafka-clients and see if it works
>>>>>>>
>>>>>>>
>>>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi <mailtojoshiamit@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am running the Spark Structured Streaming along with Kafka.
>>>>>>>> Below is the pom.xml
>>>>>>>>
>>>>>>>> <properties>
>>>>>>>>     <maven.compiler.source>1.8</maven.compiler.source>
>>>>>>>>     <maven.compiler.target>1.8</maven.compiler.target>
>>>>>>>>     <encoding>UTF-8</encoding>
>>>>>>>>     <!-- Put the Scala version of the cluster -->
>>>>>>>>     <scalaVersion>2.12.10</scalaVersion>
>>>>>>>>     <sparkVersion>3.0.1</sparkVersion>
>>>>>>>> </properties>
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>>     <groupId>org.apache.kafka</groupId>
>>>>>>>>     <artifactId>kafka-clients</artifactId>
>>>>>>>>     <version>2.1.0</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>     <artifactId>spark-core_2.12</artifactId>
>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>>     <scope>provided</scope>
>>>>>>>> </dependency>
>>>>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql
-->
>>>>>>>> <dependency>
>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>     <artifactId>spark-sql_2.12</artifactId>
>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>>     <scope>provided</scope>
>>>>>>>> </dependency>
>>>>>>>> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
-->
>>>>>>>> <dependency>
>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>     <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
>>>>>>>>     <version>${sparkVersion}</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> Building the fat jar with shade plugin. The jar is running
as expected in my local setup with the command
>>>>>>>>
>>>>>>>> *spark-submit --master local[*] --class com.stream.Main --num-executors
3 --driver-memory 2g --executor-cores 2 --executor-memory 3g prism-event-synch-rta.jar*
>>>>>>>>
>>>>>>>> But when I am trying to run same jar in spark cluster using
yarn with command:
>>>>>>>>
>>>>>>>> *spark-submit --master yarn --deploy-mode cluster --class
com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory
4g  gs://jars/prism-event-synch-rta.jar*
>>>>>>>>
>>>>>>>> Getting the this exception:
>>>>>>>>
>>>>>>>> 	
>>>>>>>>
>>>>>>>>
>>>>>>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org
<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
by: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy"
which has no default value.	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>>>>>>
>>>>>>>> I have tried setting up the "partition.assignment.strategy",
then also its not working.
>>>>>>>>
>>>>>>>> Please help.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>> Amit Joshi
>>>>>>>>
>>>>>>>>

Mime
View raw message