spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brian Wylie <>
Subject Python UDF to convert timestamps (performance question)
Date Wed, 30 Aug 2017 18:45:40 GMT
Hi All,

I'm using structured streaming in Spark 2.2.

I'm using PySpark and I have data (from a Kafka publisher) where the
timestamp is a float that looks like this:  1379288667.631940

So here's my code (which is working fine)

# SUBSCRIBE: Setup connection to Kafka Stream
raw_data = spark.readStream.format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'dns') \
  .option('startingOffsets', 'latest') \

# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType,
IntegerType, FloatType
from pyspark.sql.functions import from_json, to_json, col, struct

dns_schema = StructType() \
    .add('ts', FloatType()) \
    .add('uid', StringType()) \
    .add('id.orig_h', StringType()) \

# ETL: Convert raw data into parsed and proper typed data
from pyspark.sql.functions import col, length, to_timestamp

parsed_data = raw_data \
  .select(from_json(col("value").cast("string"), dns_schema).alias('data'))

# Convert Bro IDS time to an actual TimeStamp type
from pyspark.sql.functions import udf
import datetime
my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))

# Then a writestream later...

Okay so all this code works fine (the 'dt' field has exactly what I
want)... but I'll be streaming in a lot of data.... so here's the questions:

- Will the creation of a new dataframe withColumn basically kill
- Should I move my UDF into the  part?
- Can my UDF be done by spark.sql directly? (I tried to_timestamp but
without luck)

Any suggestions/pointers are greatly appreciated.

-Brian Wylie

View raw message