spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: [Spark CSV]: Use Custom TextInputFormat to Prevent Exceptions
Date Wed, 15 Mar 2017 17:11:01 GMT
Hi,

The Spark CSV parser has different parsing modes:
* permissive (default) tries to read everything and missing tokens are interpreted as null
and extra tokens are ignored
* dropmalformed drops lines which have more or less tokens
* failfast - runtimexception if there is a malformed line
Obvious this does not handle malformed gzip (you may ask the sender of the gzip to improve
their application).

You can adapt the line you mentioned (not sure which Spark version this is), but you may not
want to do it, because this would mean to maintain an own Spark version.

You can write your own datasource (i.e. different namespace than Spark CSV) Then, you can
also think about a lot of optimisations compared to the Spark csv parser, which - depending
on the csv and your analysis needs - can make processing much more faster. 

You could also add a new compressioncodec that ignores broken gzips. In this case you would
not need an own data source.

Best regards

> On 15 Mar 2017, at 16:56, Nathan Case <ncase@gravyanalytics.com> wrote:
> 
> Accidentally sent this to the dev mailing list, meant to send it here. 
> 
> I have a spark java application that in the past has used the hadoopFile interface to
specify a custom TextInputFormat to be used when reading files.  This custom class would gracefully
handle exceptions like EOF exceptions caused by corrupt gzip files in the input data.  I recently
switched to using the csv parser built into spark but am now faced with the problem that anytime
a bad input file is encountered my whole job fails.
> 
> My code to load the data using csv is:
> Dataset<Row> csv = sparkSession.read()
>         .option("delimiter", parseSettings.getDelimiter().toString())
>         .option("quote", parseSettings.getQuote())
>         .option("parserLib", "UNIVOCITY")
>         .csv(paths);
> Previously I would load the data using:
> JavaRDD<String> lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class,
>         LongWritable.class, Text.class, sc.hadoopConfiguration())
>         .values()
>         .map(Text::toString);
> 
> Looking at the CSVFileFormat.scala class it looks like in the private readText method
if I would overwrite where it passes TextInputFormat to the hadoopFile method with my customer
format I would be able to achieve what I want.
> 
> private def readText(
>     sparkSession: SparkSession,
>     options: CSVOptions,
>     location: String): RDD[String] = {
>   if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
>     sparkSession.sparkContext.textFile(location)
>   } else {
>     val charset = options.charset
>     sparkSession.sparkContext
>        // This is where I would want to be able to specify my
>        // input format instead of TextInputFormat
>       .hadoopFile[LongWritable, Text, TextInputFormat](location)
>       .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength,
charset)))
>   }
> }
> 
> Does anyone know if there is another way to prevent the corrupt files from failing my
job or could help to make the required changes to make the TextInputFormat customizable as
I have only just started looking at scala.
> 
> Thanks,
> Nathan
> 

Mime
View raw message