nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajansing <Jansing_Alexan...@bah.com>
Subject GetMongo GC Overflow
Date Tue, 01 Mar 2016 14:08:48 GMT
Running Mac OS X 10.10.5
             Apache Maven 3.3.9
             java version "1.8.0_72"
             Java(TM) SE Runtime Environment (build 1.8.0_72-b15)

I've been trying to figure out how to use the GetMongo processor to output
to a PutHDFS processor.

Some things I think I've figured out:

*Limit* acts exactly as .limit() for Mongo, where all it does it give you
the first *n* elements in a collections.
*Batch* isn't a command in Mongo (that I know of) and I can't see how this
entry does anything for the processor.

I'm working with a collection in the millions and I can't just simply leave
the limit blank because the JVM runs out of memory. I tried to write my own
processor and got it to compile under the *mvn clean install*, but when I
copy the .nar file from the '...nar/target' directory to the
'nifi-0.6.0/lib' folder and then try to 'sh nifi.sh run' or 'start', to nifi
refuses to finish booting up and terminates itself.

Taking  GetMongo.java
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java>
 
and it's respective other files. I modified them and changed the following
method:


    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
        final ProcessorLog logger = getLogger();
        final MongoCollection<Document> collection = getCollection(context);
        int count = (int)collection.count();
        int next = context.getProperty(BATCH_SIZE).asInteger();
        int current = next;
        while(count >= current){
            try {
                final FindIterable<Document> it =
collection.find().skip(current).limit(context.getProperty(LIMIT).asInteger());

                final MongoCursor<Document> cursor = it.iterator();
                try {
                    FlowFile flowFile = null;
                    while (cursor.hasNext()) {
                        flowFile = session.create();
                        flowFile = session.write(flowFile, new
OutputStreamCallback() {
                            @Override
                            public void process(OutputStream out) throws
IOException {
                                IOUtils.write(cursor.next().toJson(), out);
                            }
                        });

                        session.getProvenanceReporter().receive(flowFile,
context.getProperty(URI).getValue());
                        session.transfer(flowFile, REL_SUCCESS);
                    }

                    session.commit();

                } finally {
                    cursor.close();
                }
            } catch (final RuntimeException e) {
                context.yield();
                session.rollback();
            }
            current = current + next;
        }
    }


I also modified the test and abstracts so Maven would compile.

Any thoughts?

I'm trying to make a processor that can traverse over an entire collection
in the millions; and later /any/ size.

If anyone has already made one and can share, that'd be great too! Thanks!



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-tp7729.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Mime
View raw message