spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Metzger <m...@flexiblecreations.com>
Subject Re: Spark 2.0 - Insert/Update to a DataFrame
Date Sat, 27 Aug 2016 03:50:10 GMT
Pyspark example based on the data you provided (obviously your dataframes
will come from whatever source you have, not entered directly).  This uses
an intermediary dataframe with grouped data for clarity, but you could pull
this off in other ways.

-- Code --

from pyspark.sql.types import *
from pyspark.sql.functions import col

fcst_schema = StructType([
    StructField('Product', StringType(), nullable=False),
    StructField('fcst_qty', IntegerType(), nullable=False)
  ])

fcst_data = sc.parallelize([Row("A", 100),
             Row("B", 50)])

so_schema = StructType([
    StructField('OrderNum', IntegerType(), nullable=False),
    StructField('ItemNum', StringType(), nullable=False),
    StructField('Sales_qty', IntegerType(), nullable=False)
  ])
so_data = sc.parallelize([
           Row(101, "A", 10),
           Row(101, "B", 5),
           Row(102, "A", 5),
           Row(102, "B", 10)])

fcst_df = sqlContext.createDataFrame(fcst_data, fcst_schema)
so_df = sqlContext.createDataFrame(so_data, so_schema)

fcst_df.show()
so_df.show()
orderTotals_df =
so_df.groupBy('ItemNum').sum('Sales_qty').select('ItemNum',col('sum(Sales_qty)').alias('Sales_qty'))
orderTotals_df.show()

fcst_df.join(orderTotals_df, fcst_df.Product == orderTotals_df.ItemNum,
'left_outer').select(fcst_df.Product, (fcst_df.fcst_qty -
orderTotals_df.Sales_qty).alias('fcst_qty')).show()


-- Output examples (fcst_df, so_df, orderTotals_df, and the resultant df) --

+-------+--------+ |Product|fcst_qty| +-------+--------+ | A| 100| | B| 50|
+-------+--------+ +--------+-------+---------+
|OrderNum|ItemNum|Sales_qty| +--------+-------+---------+ | 101| A| 10| |
101| B| 5| | 102| A| 5| | 102| B| 10| +--------+-------+---------+
+-------+---------+ |ItemNum|Sales_qty| +-------+---------+ | B| 15| | A|
15| +-------+---------+ +-------+--------+ |Product|fcst_qty|
+-------+--------+ | B| 35| | A| 85| +-------+--------+

The other languages should work similarly. Honestly, I'd probably just
setup the dataframes and write it in SQL, possibly with a UDF, to keep
things a little more clear.

Thanks

Mike


On Fri, Aug 26, 2016 at 4:45 PM, Subhajit Purkayastha <spurkaya@p3si.net>
wrote:

> So the data in the fcst dataframe is like this
>
>
>
> Product, fcst_qty
>
> A             100
>
> B             50
>
>
>
> Sales DF has data like this
>
>
>
> Order# Item#    Sales qty
>
> 101         A             10
>
> 101         B             5
>
> 102         A             5
>
> 102         B             10
>
>
>
> I want to update the FCSt DF data, based on Product=Item#
>
>
>
> So the resultant FCST DF should have data
>
> Product, fcst_qty
>
> A             85
>
> B             35
>
>
>
> Hope it helps
>
>
>
> If I join the data between the 2 DFs (based on Product# and item#), I will
> get a cartesion join and my result will not be what I want
>
>
>
> Thanks for your help
>
>
>
>
>
> *From:* Mike Metzger [mailto:mike@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 2:12 PM
>
> *To:* Subhajit Purkayastha <spurkaya@p3si.net>
> *Cc:* user @spark <user@spark.apache.org>
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing exactly what you were wanting to accomplish, it's hard to
> say.  A Join is still probably the method I'd suggest using something like:
>
>
>
> select (FCST.quantity - SO.quantity) as quantity
>
> <other needed columns>
>
> from FCST
>
> LEFT OUTER JOIN
>
> SO ON FCST.productid = SO.productid
>
> WHERE
>
> <conditions>
>
>
>
> with specifics depending on the layout and what language you're using.
>
>
>
> Thanks
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha <spurkaya@p3si.net>
> wrote:
>
> Mike,
>
>
>
> The grains of the dataFrame are different.
>
>
>
> I need to reduce the forecast qty (which is in the FCST DF)  based on the
> sales qty (coming from the sales  order DF)
>
>
>
> Hope it helps
>
>
>
> Subhajit
>
>
>
> *From:* Mike Metzger [mailto:mike@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 1:13 PM
> *To:* Subhajit Purkayastha <spurkaya@p3si.net>
> *Cc:* user @spark <user@spark.apache.org>
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing the makeup of the Dataframes nor what your logic is for
> updating them, I'd suggest doing a join of the Forecast DF with the
> appropriate columns from the SalesOrder DF.
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha <spurkaya@p3si.net>
> wrote:
>
> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>
>
>
>
>

Mime
View raw message