spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stelios Philippou <stevo...@gmail.com>
Subject Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext
Date Mon, 06 Sep 2021 13:07:10 GMT
Yes on Local mode both from intelli and using spark-submit on my machine
and on a windows machine work.

I have noticed the following error when adding this in the above
spark-submit for k8

--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \


:: resolving dependencies ::
org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0

confs: [default]

Exception in thread "main" java.io.FileNotFoundException:
/opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
(No such file or directory)



is there some way to verify that the k8 installation is correct ?

Other spark processes that do not have streaming involved do work
correctly.

On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

>
> Hi,
>
>
> Have you tried this on local mode as opposed to Kubernetes to see if it
> works?
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <stevoo82@gmail.com>
> wrote:
>
>> Hello Jacek,
>>
>> Yes this is a spark-streaming.
>>  I have removed all code and created a new project with just the base
>> code that is enough to open a stream and loop over it to see what i am
>> doing wrong.
>>
>> Not adding the packages would result me in the following error
>>
>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>> Which should not really be the case cause this should be included in the
>> kubernetes pod. Anyway I can confirm this ?
>>
>>
>> So my simple class is as follow :
>>
>>
>> streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5));
>>
>> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
>>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>
>> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>)
rdd -> {
>>    try {
>>       rdd.foreachPartition(partition -> {
>>          while (partition.hasNext()) {
>>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>>             LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition()
+ ": "+consumerRecord.offset());
>>          }
>>       });
>>    } catch (Exception e) {
>>       e.printStackTrace();
>>    }
>> });
>>
>> streamingContext.start();
>> try {
>>    streamingContext.awaitTermination();
>> } catch (InterruptedException e) {
>>    e.printStackTrace();
>> } finally {
>>    streamingContext.stop();
>>    javaSparkContext.stop();
>> }
>>
>>
>> This is all there is too the class which is a java boot @Component.
>>
>> Now in order my pom is as such
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>   <modelVersion>4.0.0</modelVersion>
>>
>>   <groupId>com.kafka</groupId>
>>   <artifactId>SimpleKafkaStream</artifactId>
>>   <version>1.0</version>
>>
>>   <packaging>jar</packaging>
>>
>>   <properties>
>>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>>     <maven.compiler.source>8</maven.compiler.source>
>>     <maven.compiler.target>8</maven.compiler.target>
>>     <start-class>com.kafka.Main</start-class>
>>   </properties>
>>
>>   <parent>
>>     <groupId>org.springframework.boot</groupId>
>>     <artifactId>spring-boot-starter-parent</artifactId>
>>     <version>2.4.2</version>
>>     <relativePath/>
>>   </parent>
>>
>>   <dependencies>
>>     <dependency>
>>       <groupId>org.springframework.boot</groupId>
>>       <artifactId>spring-boot-starter</artifactId>
>>       <exclusions>
>>         <exclusion>
>>           <groupId>org.springframework.boot</groupId>
>>           <artifactId>spring-boot-starter-logging</artifactId>
>>         </exclusion>
>>       </exclusions>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-core_2.12</artifactId>
>>       <version>3.1.2</version>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>>       <version>3.1.2</version>
>>       <scope>provided</scope>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming_2.12</artifactId>
>>       <version>3.1.2</version>
>>     </dependency>
>>
>>   </dependencies>
>>
>>   <build>
>>     <plugins>
>>       <plugin>
>>         <groupId>org.springframework.boot</groupId>
>>         <artifactId>spring-boot-maven-plugin</artifactId>
>>       </plugin>
>>
>>       <plugin>
>>         <groupId>org.apache.maven.plugins</groupId>
>>         <artifactId>maven-compiler-plugin</artifactId>
>>         <version>3.8.1</version>
>>         <configuration>
>>           <source>1.8</source>
>>           <target>1.8</target>
>>         </configuration>
>>       </plugin>
>>
>>     </plugins>
>>   </build>
>>
>> </project>
>>
>> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
>> provided or not it would stilly give the same error.
>>
>> I have tried to build an uber jar in order to test with that but i was
>> still unable to make it work as such :
>>
>> <build>
>>   <plugins>
>>     <plugin>
>>       <groupId>org.springframework.boot</groupId>
>>       <artifactId>spring-boot-maven-plugin</artifactId>
>>       <configuration>
>>         <fork>true</fork>
>>         <mainClass>com.kafka.Main</mainClass>
>>       </configuration>
>>       <executions>
>>         <execution>
>>           <goals>
>>             <goal>repackage</goal>
>>           </goals>
>>         </execution>
>>       </executions>
>>     </plugin>
>>     <plugin>
>>       <artifactId>maven-assembly-plugin</artifactId>
>>       <version>3.2.0</version>
>>       <configuration>
>>         <descriptorRefs>
>>           <descriptorRef>dependencies</descriptorRef>
>>         </descriptorRefs>
>>         <archive>
>>           <manifest>
>>             <addClasspath>true</addClasspath>
>>             <mainClass>com.kafka.Main</mainClass>
>>           </manifest>
>>         </archive>
>>       </configuration>
>>       <executions>
>>         <execution>
>>           <id>make-assembly</id>
>>           <phase>package</phase>
>>           <goals>
>>             <goal>single</goal>
>>           </goals>
>>         </execution>
>>       </executions>
>>     </plugin>
>>
>>     <plugin>
>>       <groupId>org.apache.maven.plugins</groupId>
>>       <artifactId>maven-compiler-plugin</artifactId>
>>       <version>3.8.1</version>
>>       <configuration>
>>         <source>1.8</source>
>>         <target>1.8</target>
>>       </configuration>
>>     </plugin>
>>
>>   </plugins>
>>
>> </build>
>>
>>  I am open to any suggestions and implementations in why this is not
>> working and what needs to be done.
>>
>>
>> Thank you for your time,
>>
>> Stelios
>>
>> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <jacek@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> No idea still, but noticed
>>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>> \" that bothers me quite a lot.
>>>
>>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>>> Correct? Please upgrade at your earliest convenience since it's no longer
>>> in active development (if supported at all).
>>>
>>> Secondly, why are these jars listed explicitly since they're part of
>>> Spark? You should not really be doing such risky config changes (unless
>>> you've got no other choice and you know what you're doing).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <stevoo82@gmail.com>
>>> wrote:
>>>
>>>> Yes you are right.
>>>> I am using Spring Boot for this.
>>>>
>>>> The same does work for the event that does not involve any kafka
>>>> events. But again i am not sending out extra jars there so nothing is
>>>> replaced and we are using the default ones.
>>>>
>>>> If i do not use the userClassPathFirst which will force the service to
>>>> use the newer version i will end up with the same problem
>>>>
>>>> We are using protobuf v3+ and as such we need to push that version
>>>> since apache core uses an older version.
>>>>
>>>> So all we should really need is the following : --jars
>>>> "protobuf-java-3.17.3.jar" \
>>>> and here we need the userClassPathFirst=true in order to use the latest
>>>> version.
>>>>
>>>>
>>>> Using only this jar as it works on local or no jars defined we ended up
>>>> with the following error.
>>>>
>>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager:
>>>> Lost task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>>> java.lang.ClassNotFoundException:
>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>
>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.Class.forName0(Native Method)
>>>>
>>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>>
>>>> at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>>
>>>>
>>>>
>>>>
>>>> Which can be resolved with passing more jars.
>>>>
>>>>
>>>> Any idea about this error ?
>>>>
>>>> K8 does not seem to like this, but Java Spring should be the one that
>>>> is responsible for the version but it seems K8 does not like this versions.
>>>>
>>>> Perhaps miss configuration on K8 ?
>>>>
>>>> I haven't set that up so i am not aware of what was done there.
>>>>
>>>>
>>>>
>>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>>> explore if there is something else before doing that as we will need to
>>>> spin off new instances of K8 to check that.
>>>>
>>>>
>>>>
>>>> Thank you for the time taken
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <jacek@japila.pl> wrote:
>>>>
>>>>> Hi Stelios,
>>>>>
>>>>> I've never seen this error before, but a couple of things caught
>>>>> my attention that I would look at closer to chase the root cause of the
>>>>> issue.
>>>>>
>>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>>> (that I know almost nothing about so take the following with a pinch
of
>>>>> salt :))
>>>>>
>>>>> Spring Boot manages the classpath by itself and together with another
>>>>> interesting option in your
>>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>>> much this exception:
>>>>>
>>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> could be due to casting compatible types from two different
>>>>> classloaders?
>>>>>
>>>>> Just a thought but wanted to share as I think it's worth investigating.
>>>>>
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://about.me/JacekLaskowski
>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>
>>>>> <https://twitter.com/jaceklaskowski>
>>>>>
>>>>>
>>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <stevoo82@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have been facing the current issue for some time now and I was
>>>>>> wondering if someone might have some inside on how I can resolve
the
>>>>>> following.
>>>>>>
>>>>>> The code (java 11) is working correctly on my local machine but
>>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>>> error.
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>> initializing SparkContext.
>>>>>>
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not
a
>>>>>> subtype
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have a spark that will monitor some directories and handle the
data
>>>>>> accordingly.
>>>>>>
>>>>>> That part is working correctly on K8 and the SparkContext has no
>>>>>> issue being initialized there.
>>>>>>
>>>>>>
>>>>>> This is the spark-submit for that
>>>>>>
>>>>>>
>>>>>> spark-submit \
>>>>>> --master=k8s://https://url:port \
>>>>>> --deploy-mode cluster \
>>>>>> --name a-name\
>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"
 \
>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
\
>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>> --driver-memory 525m --executor-memory 525m \
>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>> target/SparkStream.jar continuous-merge
>>>>>>
>>>>>>
>>>>>> My issue comes when I try to launch the service in order to listen
to
>>>>>> kafka events and store them in HDFS.
>>>>>>
>>>>>>
>>>>>> spark-submit \
>>>>>> --master=k8s://https://url:port \
>>>>>> --deploy-mode cluster \
>>>>>> --name consume-data \
>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"
 \
>>>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
\
>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
\
>>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark
\
>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>> --driver-memory 1g --executor-memory 1g \
>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>> target/SparkStream.jar consume
>>>>>>
>>>>>>
>>>>>> It could be that I am launching the application wrongly or perhaps
>>>>>> that my K8 is not configured correctly ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have stripped down my code and left it barebone and will end up
>>>>>> with the following issue :
>>>>>>
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>> initializing SparkContext.
>>>>>>
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not
a
>>>>>> subtype
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>>> Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>>> Source)
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>>
>>>>>>
>>>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>>> Exception encountered during context initialization - cancelling
refresh
>>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
Error
>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation
via
>>>>>> factory method failed; nested exception is
>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not
a
>>>>>> subtype
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>> Application run failed
>>>>>>
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
Error
>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation
via
>>>>>> factory method failed; nested exception is
>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not
a
>>>>>> subtype
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> It could be that i am launching the application for Kafka wrongly
>>>>>> with all the extra jars added ?
>>>>>>
>>>>>> Just that those seem to be needed or i am getting other errors when
>>>>>> not including those.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any help will be greatly appreciated.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Stelios
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Mime
View raw message