spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Khatri <chetan.opensou...@gmail.com>
Subject Re: Kafka Topic to Parquet HDFS with Structured Streaming
Date Mon, 10 Jun 2019 20:25:12 GMT
Hello Deng, Thank you for your email.
Issue was with Spark - Hadoop / HDFS configuration settings.

Thanks

On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete <oching@apache.org>
wrote:

> Hi Chetan,
>
> Best to check if the user account that you're using to run the job has
> permission to write to the path in HDFS. I would suggest to write the
> parquet files to a different path, perhaps to a project space or user home,
> rather than at the root directory.
>
> HTH,
> Deng
>
> On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri <chetan.opensource@gmail.com>
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am trying to write data from Kafka Topic to Parquet HDFS with
>> Structured Streaming but Getting failures. Please do help.
>>
>> val spark: SparkSession = SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
>> import spark.implicits._
>> val dataFromTopicDF = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "test")
>>   .option("startingOffsets", "earliest")
>>   .load()
>>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>
>> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
>> val topicQuery = dataFromTopicDF.writeStream
>>   .format("console")
>>   .option("truncate", false)
>>   .option("checkpointLocation", "/tmp/checkpoint")
>>   .trigger(Trigger.ProcessingTime(10.seconds))
>>   .start()
>>
>> topicQuery.awaitTermination()
>> topicQuery.stop()
>>
>>
>> Above code is working well but when I am trying to write to Parquet at HDFS getting
exceptions.
>>
>>
>> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>>
>> val parquetQuery = dataFromTopicDF.writeStream
>> .format("parquet")
>> .option("startingOffsets", "earliest")
>> .option("checkpointLocation", "/tmp/checkpoint")
>> .option("path", "/sample-topic")
>> .start()
>>
>> parquetQuery.awaitTermination()
>> parquetQuery.stop()
>>
>>
>> *Exception Details:*
>>
>>
>> Exception in thread "main" java.io.IOException: mkdir of /sample-topic/_spark_metadata
failed
>> 	at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>> 	at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>> 	at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>> 	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>> 	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>> 	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>> 	at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>> 	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>> 	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66)
>> 	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46)
>> 	at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85)
>> 	at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98)
>> 	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>> 	at com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>> 	at com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
>> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
>> 	at scala.collection.immutable.List.foreach(List.scala:381)
>> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> 	at scala.App$class.main(App.scala:76)
>> 	at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>> 	at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>> 	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>> 	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>> 	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> Thanks
>>
>>
>

Mime
View raw message