spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19523) Spark streaming+ insert into table leaves bunch of trash in table directory
Date Thu, 09 Feb 2017 10:49:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15859342#comment-15859342
] 

Sean Owen commented on SPARK-19523:
-----------------------------------

Not sure what this means -- there is one HiveContext per driver program and you use it where
it's needed, including inside foreachRDD calls. If you are getting that error, you are not
creating one but many.

> Spark streaming+ insert into table leaves bunch of trash in table directory
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-19523
>                 URL: https://issues.apache.org/jira/browse/SPARK-19523
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams, SQL
>    Affects Versions: 2.0.2
>            Reporter: Egor Pahomov
>            Priority: Minor
>
> I have very simple code, which transform coming json files into pq table:
> {code}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.io.{LongWritable, Text}
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
> import org.apache.spark.sql.SaveMode
> object Client_log {
>   def main(args: Array[String]): Unit = {
>     val resultCols = new HiveContext(Spark.ssc.sparkContext).sql(s"select * from temp.x_streaming
where year=2015 and month=12 and day=1").dtypes
>     var columns = resultCols.filter(x => !Commons.stopColumns.contains(x._1)).map({
case (name, types) => {
>       s"""cast (get_json_object(s, '""" + '$' + s""".properties.${name}') as ${Commons.mapType(types)})
as $name"""
>     }
>     })
>     columns ++= List("'streaming' as sourcefrom")
>     def f(path:Path): Boolean = {
>       true
>     }
>     val client_log_d_stream = Spark.ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/egor/test2",
f _ , newFilesOnly = false)
>     client_log_d_stream.foreachRDD(rdd => {
>       val localHiveContext = new HiveContext(rdd.sparkContext)
>       import localHiveContext.implicits._
>       var input = rdd.map(x => Record(x._2.toString)).toDF()
>       input = input.selectExpr(columns: _*)
>       input =
>         SmallOperators.populate(input, resultCols)
>       input
>         .write
>         .mode(SaveMode.Append)
>         .format("parquet")
>         .insertInto("temp.x_streaming")
>     })
>     Spark.ssc.start()
>     Spark.ssc.awaitTermination()
>   }
>   case class Record(s: String)
> }
> {code}
> This code generates a lot of trash directories in resalt table like:
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-00_298_7130707897870357017-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-00_309_6225285476054854579-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-06_305_2185311414031328806-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-06_309_6331022557673464922-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-12_334_1333065569942957405-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-12_387_3622176537686712754-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-18_339_1008134657443203932-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-18_421_3284019142681396277-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-24_291_5985064758831763168-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-24_300_6751765745457248879-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-30_314_2987765230093671316-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-30_331_2746678721907502111-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-36_311_1466065813702202959-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-36_317_7079974647544197072-1



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message