spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Python UDF to convert timestamps (performance question)
Date Wed, 30 Aug 2017 19:34:12 GMT
1. Generally, adding columns, etc. will not affect performance because the
Spark's optimizer will automatically figure out columns that are not needed
and eliminate in the optimization step. So that should never be a concern.
2. Again, this is generally not a concern as the optimizer will take care
of moving such expressions around
3. However, using Python UDF is baaaad for perf. In your case, if the
problem is that the timestamp is in float, you can cast to the float to
timestamp type, and it should automatically convert it correctly.
Something like this *col("ts").cast("timestamp")*

On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie <briford.wylie@gmail.com>
wrote:

> Hi All,
>
> I'm using structured streaming in Spark 2.2.
>
> I'm using PySpark and I have data (from a Kafka publisher) where the
> timestamp is a float that looks like this:  1379288667.631940
>
> So here's my code (which is working fine)
>
> # SUBSCRIBE: Setup connection to Kafka Stream
> raw_data = spark.readStream.format('kafka') \
>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>   .option('subscribe', 'dns') \
>   .option('startingOffsets', 'latest') \
>   .load()
>
> # ETL: Hardcoded Schema for DNS records (do this better later)
> from pyspark.sql.types import StructType, StringType, BooleanType,
> IntegerType, FloatType
> from pyspark.sql.functions import from_json, to_json, col, struct
>
> dns_schema = StructType() \
>     .add('ts', FloatType()) \
>     .add('uid', StringType()) \
>     .add('id.orig_h', StringType()) \
>   ....
>
> # ETL: Convert raw data into parsed and proper typed data
> from pyspark.sql.functions import col, length, to_timestamp
>
> parsed_data = raw_data \
>   .select(from_json(col("value").cast("string"),
> dns_schema).alias('data')) \
>   .select('data.*')
>
> # Convert Bro IDS time to an actual TimeStamp type
> from pyspark.sql.functions import udf
> import datetime
> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
> TimestampType())
> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>
> # Then a writestream later...
>
> Okay so all this code works fine (the 'dt' field has exactly what I
> want)... but I'll be streaming in a lot of data.... so here's the questions:
>
> - Will the creation of a new dataframe withColumn basically kill
> performance?
> - Should I move my UDF into the parsed_data.select(...)  part?
> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
> without luck)
>
> Any suggestions/pointers are greatly appreciated.
>
> -Brian Wylie
>
>
>

Mime
View raw message