spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Egor Pahomov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19523) Spark streaming+ insert into table leaves bunch of trash in table directory
Date Thu, 09 Feb 2017 19:17:44 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860025#comment-15860025
] 

Egor Pahomov commented on SPARK-19523:
--------------------------------------

Probably my bad. I have {code} rdd.sparkContext {code} inside lambda. I do not have {code}
rdd.sqlContext {code}. where I can take it? 

> Spark streaming+ insert into table leaves bunch of trash in table directory
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-19523
>                 URL: https://issues.apache.org/jira/browse/SPARK-19523
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams, SQL
>    Affects Versions: 2.0.2
>            Reporter: Egor Pahomov
>            Priority: Minor
>
> I have very simple code, which transform coming json files into pq table:
> {code}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.io.{LongWritable, Text}
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
> import org.apache.spark.sql.SaveMode
> object Client_log {
>   def main(args: Array[String]): Unit = {
>     val resultCols = new HiveContext(Spark.ssc.sparkContext).sql(s"select * from temp.x_streaming
where year=2015 and month=12 and day=1").dtypes
>     var columns = resultCols.filter(x => !Commons.stopColumns.contains(x._1)).map({
case (name, types) => {
>       s"""cast (get_json_object(s, '""" + '$' + s""".properties.${name}') as ${Commons.mapType(types)})
as $name"""
>     }
>     })
>     columns ++= List("'streaming' as sourcefrom")
>     def f(path:Path): Boolean = {
>       true
>     }
>     val client_log_d_stream = Spark.ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/egor/test2",
f _ , newFilesOnly = false)
>     client_log_d_stream.foreachRDD(rdd => {
>       val localHiveContext = new HiveContext(rdd.sparkContext)
>       import localHiveContext.implicits._
>       var input = rdd.map(x => Record(x._2.toString)).toDF()
>       input = input.selectExpr(columns: _*)
>       input =
>         SmallOperators.populate(input, resultCols)
>       input
>         .write
>         .mode(SaveMode.Append)
>         .format("parquet")
>         .insertInto("temp.x_streaming")
>     })
>     Spark.ssc.start()
>     Spark.ssc.awaitTermination()
>   }
>   case class Record(s: String)
> }
> {code}
> This code generates a lot of trash directories in resalt table like:
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-00_298_7130707897870357017-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-00_309_6225285476054854579-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-06_305_2185311414031328806-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-06_309_6331022557673464922-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-12_334_1333065569942957405-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-12_387_3622176537686712754-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-18_339_1008134657443203932-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-18_421_3284019142681396277-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-24_291_5985064758831763168-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-24_300_6751765745457248879-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-30_314_2987765230093671316-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-30_331_2746678721907502111-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-36_311_1466065813702202959-1
> drwxrwxrwt   3 egor   nobody 4096 Feb  8 14:15 .hive-staging_hive_2017-02-08_14-15-36_317_7079974647544197072-1



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message