spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Riccardo Ferrari <ferra...@gmail.com>
Subject Re: PySpark Streaming S3 checkpointing
Date Thu, 03 Aug 2017 21:52:02 GMT
Hi Steve,

Thank you for your answer, much appreciated.

Reading the code seems that:

   - Python StreamingContext.getOrCreate
   <https://github.com/apache/spark/blob/master/python/pyspark/streaming/context.py#L119>calls
   Scala StreamingContextPythonHelper().tryRecoverFromCheckpoint(
   checkpointPath)
   <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L871>
   - tryRecoverFromCheckpoint calls CheckpointReader.read(..., new
   SparkConf(), SparkHadoopUtil.get.conf,...)
   - SparkHadoopUtil.get.conf
   <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L49>
   (when not using yarn) do:
      - sparkConf = new SparkConf(false).loadFromSystemProperties(true)
      - Configuration = newConfiguration(sparkConf)

I have to admit I have not tested (read: debug) it and might not be
completely accurate  (checkpointing is not the highest priority), however I
have the feeling I can not provide those properties via code because a new
configuration gets instantiated/read from system properties and whatever I
set to the current running context is ignored (or at least this happens in
python).

What do you (or any in the list) think?

Thanks,



On Wed, Aug 2, 2017 at 6:04 PM, Steve Loughran <stevel@hortonworks.com>
wrote:

>
> On 2 Aug 2017, at 10:34, Riccardo Ferrari <ferrarir@gmail.com> wrote:
>
> Hi list!
>
> I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
> checkpointing. At high level my python script goes like this:
>
> class StreamingJob():
>
> def __init__(..):
> ...
>    sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',....)
>    sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',....)
>
> def doJob(self):
>    ssc = StreamingContext.getOrCreate('<S3-location>', <function to
> create ssc>)
>
> and I run it:
>
> myJob = StreamingJob(...)
> myJob.doJob()
>
> The problem is that StreamingContext.getOrCreate is not able to have
> access to hadoop configuration configured in the constructor and fails to
> load from checkpoint with
>
> "com.amazonaws.AmazonClientException: Unable to load AWS credentials from
> any provider in the chain"
>
> If I export AWS credentials to the system ENV before starting the script
> it works!
>
>
> Spark magically copies the env vars over for you when you launch a job
>
> I see the Scala version has an option to provide the hadoop configuration
> that is not available in python
>
> I don't have the whole Hadoop, just Spark, so I don't really want to
> configure hadoop's xmls and such
>
>
> when you set up the context, as in spark-defaults.conf
>
> spark.hadoop.fs.s3a.access.key=access key
> spark.hadoop.fs.s3a.secret.key=secret key
>
> Reminder: Do keep your secret key a secret, avoid checking it in to any
> form of revision control.
>

Mime
View raw message