Bumping. All inputs appreciated
From: Sharma, Praneet
Sent: Friday, August 23, 2019 11:24 AM
To: 'user@spark.apache.org' <user@spark.apache.org>
Subject: Classloading issues when using connectors with Uber jars with improper Shading in single Spark job
Hi Guys
When using connectors with Uber jars, we are hitting classloading issues in Spark 2.3.0. Upon investigation we found out that the classloading issues were caused by improper shading of certain classes in these uber jars. The aim of this email is to start a discussion on whether such issues can be mitigated/avoided from Spark core, and if yes, then how.
Issue Summary
We have built a Spark job using Spark version 2.3.0 which reads from Azure Cosmos DB. In Spark, to read from cosmos DB, we are relying on an Uber jar provided by Azure - azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar. To add this jar to the Spark driver and executor classpaths, we are relying on the properties spark.driver.extraClasspath and spark.executor.extraClasspath, respectively. When this Spark job is run, we hit the following issue:
ERROR ApplicationMaster: User class threw exception: java.lang.VerifyError: Bad type on operand stack
Exception Details:
  Location:    org/apache/spark/metrics/sink/MetricsServlet.<init>(Ljava/util/Properties;Lcom/codahale/metrics/MetricRegistry;Lorg/apache/spark/SecurityManager;)V @116: invokevirtual
  Reason:    Type 'com/codahale/metrics/json/MetricsModule' (current frame, stack[2]) is not assignable to 'com/fasterxml/jackson/databind/Module'
We have done some analysis on this issue from our side which we are detailing in the below section.
Issue Details
We have a Spark 2.3.0 setup to work with Cloudera cluster in yarn-cluster mode. By default, Spark’s driver and executor classpaths, among others, have the following jars:
To work with cosmos DB, the jar we have explicitly added in the classpaths is azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar. This is an uber jar and is shaded with prefix “cosmosdb_connector_shaded”, but the shading is improper, meaning some classes are left with their original fully qualified names – this is the origin of the classloading issue I mentioned above. The below table highlights the classes of interest being present in one of the above mentioned jars and the order in which Spark’s MutableURLClassLoader might attempt to load them:
Order Jar Classes of Interest
1 azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar com.codahale.metrics.json.MetricsModule
2 jackson-databind-2.6.5.jar com.fasterxml.jackson.databind.Module
3 metrics-json-3.1.2.jar com.codahale.metrics.json.MetricsModule
4 spark-core_2.11-2.3.1.jar org.apache.spark.metrics.sink.MetricsServlet
Please note that classes from cosmosdb-spark TPL will get loaded first because this jar is added to the top of URL classpath pile of MutableURLClassLoader. To reiterate, the error we see is: “Type 'com/codahale/metrics/json/MetricsModule' (current frame, stack[2]) is not assignable to 'com/fasterxml/jackson/databind/Module'”. And according to our analysis, the following is the reason why this occurs:
This is one example where the issue occurs. We have seen similar issues occurring when we attempt to use both Cosmos DB and Snowflake connectors in a single Spark job. Please note that we only use those TPLs and their versions which the vendors officially announce as supported for a particular Spark version.
How the above error could have been avoided?
Child-first Classloader as a solution for wrapping connectors in Spark?
We are able to resolve these issues on Spark driver by wrapping load and save Dataframe calls in their separate child-first classloaders and ensuring the Uber TPLs are only present in these child-first classloaders. For example, the child-first classloader (child of MutableURLClassloader) wrapping cosmos db’s load method will only have azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar in its classpath. But we are not able to achieve the same with Spark executors because we don’t have much control there. To achieve a similar thing with cosmos DB spark executor code, we will be required to modify its open-source code and wrap the lambda within mapPartitionsWithIndex with a child-first classloader. This would mean doing this individually for each connector we want to work with, which is not viable.
Thus we reach out to you guys to figure out how such issues can be tackled. Is there a way to achieve isolated classloading for connectors reading and writing portions in Spark? If yes, how can that be achieved? If not, what other choices do we have here? What does spark community recommend we do to support our use cases mentioned here?