nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From olegz <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-748 Fixed logic around handling partial qu...
Date Sun, 15 Nov 2015 14:34:57 GMT
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/123#discussion_r44872506
  
    --- Diff: nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
---
    @@ -100,101 +96,61 @@ private ProvenanceEventRecord getRecord(final Document d, final
RecordReader rea
                 }
             }
     
    -        if ( record == null ) {
    -            throw new IOException("Failed to find Provenance Event " + d);
    -        } else {
    -            return record;
    +        if (record == null) {
    +            logger.warn("Failed to read Provenance Event for '" + d + "'. The event file
may be missing or corrupted");
             }
    -    }
     
    +        return record;
    +    }
     
         public Set<ProvenanceEventRecord> read(final List<Document> docs, final
Collection<Path> allProvenanceLogFiles,
    -        final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars)
throws IOException {
    -        if (retrievalCount.get() >= maxResults) {
    -            return Collections.emptySet();
    -        }
    -
    -        LuceneUtil.sortDocsForRetrieval(docs);
    -
    -        RecordReader reader = null;
    -        String lastStorageFilename = null;
    -        final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
    +            final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars)
throws IOException {
     
             final long start = System.nanoTime();
    -        int logFileCount = 0;
    -
    -        final Set<String> storageFilesToSkip = new HashSet<>();
    -        int eventsReadThisFile = 0;
     
    -        try {
    -            for (final Document d : docs) {
    -                final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
    -                if ( storageFilesToSkip.contains(storageFilename) ) {
    -                    continue;
    -                }
    -
    -                try {
    -                    if (reader != null && storageFilename.equals(lastStorageFilename))
{
    -                        matchingRecords.add(getRecord(d, reader));
    -                        eventsReadThisFile++;
    -
    -                        if ( retrievalCount.incrementAndGet() >= maxResults ) {
    -                            break;
    -                        }
    -                    } else {
    -                        logger.debug("Opening log file {}", storageFilename);
    -
    -                        logFileCount++;
    -                        if (reader != null) {
    -                            reader.close();
    -                        }
    +        Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
    +        if (retrievalCount.get() >= maxResults) {
    +            return matchingRecords;
    +        }
     
    -                        final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename,
allProvenanceLogFiles);
    -                        if (potentialFiles.isEmpty()) {
    -                            logger.warn("Could not find Provenance Log File with basename
{} in the "
    -                                    + "Provenance Repository; assuming file has expired
and continuing without it", storageFilename);
    -                            storageFilesToSkip.add(storageFilename);
    -                            continue;
    -                        }
    +        Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
     
    -                        if (potentialFiles.size() > 1) {
    -                            throw new FileNotFoundException("Found multiple Provenance
Log Files with basename " +
    -                                    storageFilename + " in the Provenance Repository");
    -                        }
    +        int eventsReadThisFile = 0;
    +        int logFileCount = 0;
     
    -                        for (final File file : potentialFiles) {
    -                            try {
    -                                if (reader != null) {
    -                                    logger.debug("Read {} records from previous file",
eventsReadThisFile);
    -                                }
    -
    -                                reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles,
maxAttributeChars);
    -                                matchingRecords.add(getRecord(d, reader));
    -                                eventsReadThisFile = 1;
    -
    -                                if ( retrievalCount.incrementAndGet() >= maxResults
) {
    -                                    break;
    -                                }
    -                            } catch (final IOException e) {
    -                                throw new IOException("Failed to retrieve record " +
d + " from Provenance File " + file + " due to " + e, e);
    +        for (String storageFileName : byStorageNameDocGroups.keySet()) {
    +            File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName,
allProvenanceLogFiles);
    +            if (provenanceEventFile != null) {
    +                try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile,
allProvenanceLogFiles,
    +                        maxAttributeChars)) {
    +                    for (Document document : byStorageNameDocGroups.get(storageFileName))
{
    +                        ProvenanceEventRecord eRec = this.getRecord(document, reader);
    +                        if (eRec != null) {
    +                            matchingRecords.add(eRec);
    +                            eventsReadThisFile++;
    +
    +                            if (retrievalCount.incrementAndGet() >= maxResults) {
    +                                break;
                                 }
                             }
                         }
    -                } finally {
    -                    lastStorageFilename = storageFilename;
    +                } catch (Exception e) {
    --- End diff --
    
    Good point, but at the time I don't feel the need since we don't have specific requirements
for different behavior based on exception type. As for the close() I'd be interested to see
what's wrong. May be in the future we'll earn enough not to care, then we can change to close
quietly 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message