kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: Camus Issue about Output File EOF Issue
Date Tue, 03 Mar 2015 01:19:40 GMT
Hi Gwen,

We are using MapR (Sorry no Cloudera) distribution.


I am suspecting it is code issue.  I am in-processor review the code about
MultiOutputFormat class.

https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35

I am suspecting that due to some concurrency, it is replacing older writer
with new one (old writer does not close).   The file it crates is usually
small,and has very small content for problematic files (EOF file).

https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91


Based on above code, Do you think there is likelihood that output file may
be unclosed file ?  Also, my plan is to add isClose() api to each writer,
and if you have time, you can quickly review them (suggest or your
feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API (
so I was thinking about
http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
and make sure within the close we close all the File Writers.. let me know
if this is good or not do final clean-up).


public interface RecordWriterWithCloseStatus<K, V> extends RecordWriter<K,
V>{

/**

 * Give Ability to check if close has been called on the writer or File has
been closed on not..

 * @return

 */

public boolean isClose();

}

And each of the writer will have ability check for clean at all the time:

eg:

{code}

      return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() {

            private volatile boolean close;


            @Override

            public void write(IEtlKey key, CamusWrapper data) throws
IOException, InterruptedException {




            /**

             * What if file is closed ?  Should we create a new one here..?

             */



                // Use the timestamp from the EtlKey as the key for this
record.

                // TODO: Is there a better key to use here?

                writer.append(new LongWritable(key.getTime()), new Text(
record));

            }


            @Override

            public void close(TaskAttemptContext context) throws
IOException, InterruptedException {

                writer.close();

                close = true;

            }



            protected void finalize() throws Throwable {

                if(this.close){

             log.error("This file was not closed so try to close during the
JVM finalize..");

             try{

             writer.close();

             }catch(Throwable th){

             log.error("File Close erorr during finalize()");

             }

            }

            super.finalize();

            }

            @Override

            public boolean isClose() {

  return close;

}

            @Override

            public boolean isClose() {

  return close;

 }

        };


Thanks for your quick input and response.


Thanks,

Bhavesh

On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshapira@cloudera.com> wrote:

> Do you have the command you used to run Camus? and the config files?
>
> Also, I noticed your file is on maprfs - you may want to check with
> your vendor... I doubt Camus was extensively tested on that particular
> FS.
>
> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry
> <mistry.p.bhavesh@gmail.com> wrote:
> > Hi Kakfa User Team,
> >
> > I have been encountering two issues with Camus Kafka ETL Job:
> >
> > 1) End Of File (unclosed files)
> >
> > 2) Not SequenceFile Error
> > The details of issues can be found at
> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc.
> >
> > If you guys have faced similar issue, please let me know how to go about
> > solving them.
> >
> > Thanks,
> >
> > Bhavesh
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message