spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From muru <mmur...@gmail.com>
Subject Re: Use case advice
Date Fri, 15 Jan 2021 01:45:51 GMT
You need to make sure the delta-core_2.11-0.6.1. jar file in your
$SPARK_HOME/jars folder.

On Thu, Jan 14, 2021 at 4:59 AM András Kolbert <kolbertandras@gmail.com>
wrote:

> sorry missed out a bit. Added, highlighted with yellow.
>
> On Thu, 14 Jan 2021 at 13:54, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely
>> changed a few of my projects!
>>
>> One question regarding that.
>> When I use the following statement, all works fine and I can use delta
>> properly, in the spark context that jupyter initiates automatically.
>>
>> export PYSPARK_DRIVER_PYTHON=jupyter
>> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'
>>
>> PYSPARK_PYTHON=pyspark \
>>         --master yarn \
>>         --deploy-mode client \
>>         --driver-memory 4g \
>>         --executor-memory 16G \
>>         --executor-cores 1 \
>>         --num-executors 8 \
>>         --conf
>> spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
>>         --jars
>> hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar
>>
>>
>> However, I would like to have a local pyspark initially, and only connect
>> to YARN when the specific notebook is configured in that way.
>>
>> 1)
>>
>> export PYSPARK_DRIVER_PYTHON=jupyter
>> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'
>>
>> PYSPARK_PYTHON=pyspark
>>
>> 2)
>> conf = spark.sparkContext._conf.setAll([
>>     ('spark.app.name', 'Delta Demo'),
>>     ('spark.yarn.jars',
>> 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
>>
>         ('spark.master', 'yarn-client'),
>         ('spark.executor.memory', '16g'),
>         ('spark.executor.instances', '8'),
>         ('spark.executor.cores', '1'),
>         ('spark.driver.memory', '4g'),
>
>>     ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
>>     ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
>>     ("spark.sql.catalog.spark_catalog",
>> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
>>     ])
>> spark.sparkContext.stop()
>>
>> spark = SparkSession \
>>     .builder \
>>     .config(conf=conf) \
>>     .getOrCreate()
>> sc = spark.sparkContext
>>
>>
>> spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
>> from delta.tables import *
>> delta_path = "/data/delta-table"
>> data = spark.range(0, 5)
>> data.show()
>> data.write.format("delta").mode("overwrite").save(delta_path)
>>
>>
>> This way, I keep facing with the ' Error:
>> java.lang.ClassNotFoundException: Failed to find data source: delta. '
>> error message.
>>
>> What did I miss in my configuration/env variables?
>>
>> Thanks
>> Andras
>>
>>
>>
>> On Sun, 10 Jan 2021, 3:33 am muru, <mmuru98@gmail.com> wrote:
>>
>>> You could try Delta Lake or Apache Hudi for this use case.
>>>
>>> On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <kolbertandras@gmail.com>
>>> wrote:
>>>
>>>> Sorry if my terminology is misleading.
>>>>
>>>> What I meant under driver only is to use a local pandas dataframe
>>>> (collect the data to the master), and keep updating that instead of dealing
>>>> with a spark distributed dataframe for holding this data.
>>>>
>>>> For example, we have a dataframe with all users and their corresponding
>>>> latest activity timestamp. After each streaming batch, aggregations are
>>>> performed and the calculation is collected to the driver to update a subset
>>>> of users latest activity timestamp.
>>>>
>>>>
>>>>
>>>> On Sat, 9 Jan 2021, 6:18 pm Artemis User, <artemis@dtechspace.com>
>>>> wrote:
>>>>
>>>>> Could you please clarify what do you mean by 1)? Driver is only
>>>>> responsible for submitting Spark job, not performing.
>>>>>
>>>>> -- ND
>>>>>
>>>>> On 1/9/21 9:35 AM, András Kolbert wrote:
>>>>> > Hi,
>>>>> > I would like to get your advice on my use case.
>>>>> > I have a few spark streaming applications where I need to keep
>>>>> > updating a dataframe after each batch. Each batch probably affects
a
>>>>> > small fraction of the dataframe (5k out of 200k records).
>>>>> >
>>>>> > The options I have been considering so far:
>>>>> > 1) keep dataframe on the driver, and update that after each batch
>>>>> > 2) keep dataframe distributed, and use checkpointing to mitigate
>>>>> lineage
>>>>> >
>>>>> > I solved previous use cases with option 2, but I am not sure if
it
>>>>> is
>>>>> > the most optimal as checkpointing is relatively expensive. I also
>>>>> > wondered about HBASE or some sort of quick access memory storage,
>>>>> > however it is currently not in my stack.
>>>>> >
>>>>> > Curious to hear your thoughts
>>>>> >
>>>>> > Andras
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>

Mime
View raw message