ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexey Goncharuk <alexey.goncha...@gmail.com>
Subject Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()
Date Mon, 10 Sep 2018 08:25:32 GMT
Hi,

Please send your jira account ID so we can add you to the contributors
list. Then you will be able to assign tickets to yourself and contribute to
the project according to the process.

You can get more info here:

https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute

--AG

пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com>:

> Hi,
> I have not been assigned yet. But i really want to.
>
> On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <ilya.kasnacheev@gmail.com>
> wrote:
>
> > Hello!
> >
> > Can you please frame it as Github pull request as per our process? Do you
> > have ticket for that?
> >
> > Regards,
> > --
> > Ilya Kasnacheev
> >
> >
> > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com
> >:
> >
> > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > ```java
> > > /*
> > >  * Licensed to the Apache Software Foundation (ASF) under one or more
> > >  * contributor license agreements.  See the NOTICE file distributed
> with
> > >  * this work for additional information regarding copyright ownership.
> > >  * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > >  * (the "License"); you may not use this file except in compliance with
> > >  * the License.  You may obtain a copy of the License at
> > >  *
> > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > >  *
> > >  * Unless required by applicable law or agreed to in writing, software
> > >  * distributed under the License is distributed on an "AS IS" BASIS,
> > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > >  * See the License for the specific language governing permissions and
> > >  * limitations under the License.
> > >  */
> > >
> > > package org.apache.ignite.internal.processors.query.h2.opt;
> > >
> > > import java.io.IOException;
> > > import java.util.Collection;
> > > import java.util.concurrent.atomic.AtomicLong;
> > > import org.apache.ignite.IgniteCheckedException;
> > > import org.apache.ignite.internal.GridKernalContext;
> > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > import org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > import
> > > org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > import
> > > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > import
> > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > import org.apache.ignite.lang.IgniteBiTuple;
> > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > import org.apache.lucene.document.Document;
> > > import org.apache.lucene.document.Field;
> > > import org.apache.lucene.document.LongField;
> > > import org.apache.lucene.document.StoredField;
> > > import org.apache.lucene.document.StringField;
> > > import org.apache.lucene.document.TextField;
> > > import org.apache.lucene.index.DirectoryReader;
> > > import org.apache.lucene.index.IndexReader;
> > > import org.apache.lucene.index.IndexWriter;
> > > import org.apache.lucene.index.IndexWriterConfig;
> > > import org.apache.lucene.index.Term;
> > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > import org.apache.lucene.search.BooleanClause;
> > > import org.apache.lucene.search.BooleanQuery;
> > > import org.apache.lucene.search.IndexSearcher;
> > > import org.apache.lucene.search.NumericRangeQuery;
> > > import org.apache.lucene.search.Query;
> > > import org.apache.lucene.search.ScoreDoc;
> > > import org.apache.lucene.search.TopDocs;
> > > import org.apache.lucene.util.BytesRef;
> > > import org.h2.util.JdbcUtils;
> > > import org.jetbrains.annotations.Nullable;
> > >
> > > import static
> > > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > import static
> > > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > >
> > > /**
> > >  * Lucene fulltext index.
> > >  */
> > > public class GridLuceneIndex implements AutoCloseable {
> > >     /** Field name for string representation of value. */
> > >     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
> > >
> > >     /** Field name for value version. */
> > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > >
> > >     /** Field name for value expiration time. */
> > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > "_gg_expires__";
> > >
> > >     /** */
> > >     private final String cacheName;
> > >
> > >     /** */
> > >     private final GridQueryTypeDescriptor type;
> > >
> > >     /** */
> > >     private final IndexWriter writer;
> > >
> > >     /** */
> > >     private final String[] idxdFields;
> > >
> > >     /** */
> > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > >
> > >     /** */
> > >     private final GridLuceneDirectory dir;
> > >
> > >     /** */
> > >     private final GridKernalContext ctx;
> > >
> > >     /**
> > >      * Constructor.
> > >      *
> > >      * @param ctx Kernal context.
> > >      * @param cacheName Cache name.
> > >      * @param type Type descriptor.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > cacheName, GridQueryTypeDescriptor type)
> > >         throws IgniteCheckedException {
> > >         this.ctx = ctx;
> > >         this.cacheName = cacheName;
> > >         this.type = type;
> > >
> > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > >
> > >         try {
> > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > StandardAnalyzer()));
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         GridQueryIndexDescriptor idx = type.textIndex();
> > >
> > >         if (idx != null) {
> > >             Collection<String> fields = idx.fields();
> > >
> > >             idxdFields = new String[fields.size() + 1];
> > >
> > >             fields.toArray(idxdFields);
> > >         }
> > >         else {
> > >             assert type.valueTextIndex() || type.valueClass() ==
> > > String.class;
> > >
> > >             idxdFields = new String[1];
> > >         }
> > >
> > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > >     }
> > >
> > >     /**
> > >      * @return Cache object context.
> > >      */
> > >     private CacheObjectContext objectContext() {
> > >         if (ctx == null)
> > >             return null;
> > >
> > >         return
> > > ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > >     }
> > >
> > >     /**
> > >      * Stores given data in this fulltext index.
> > >      *
> > >      * @param k Key.
> > >      * @param v Value.
> > >      * @param ver Version.
> > >      * @param expires Expiration time.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     @SuppressWarnings("ConstantConditions")
> > >     public void store(CacheObject k, CacheObject v, GridCacheVersion
> ver,
> > > long expires) throws IgniteCheckedException {
> > >         CacheObjectContext coctx = objectContext();
> > >
> > >         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
> > >         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
> > >
> > >         Document doc = new Document();
> > >
> > >         boolean stringsFound = false;
> > >
> > >         if (type.valueTextIndex() || type.valueClass() ==
> String.class) {
> > >             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> > > Field.Store.YES));
> > >
> > >             stringsFound = true;
> > >         }
> > >
> > >         for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
> > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > >
> > >             if (fieldVal != null) {
> > >                 doc.add(new TextField(idxdFields[i],
> fieldVal.toString(),
> > > Field.Store.YES));
> > >
> > >                 stringsFound = true;
> > >             }
> > >         }
> > >
> > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > >
> > >         try {
> > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > >
> > >             if (!stringsFound) {
> > >                 writer.deleteDocuments(term);
> > >
> > >                 return; // We did not find any strings to be indexed,
> > will
> > > not store data at all.
> > >             }
> > >
> > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > Field.Store.YES));
> > >
> > >             if (type.valueClass() != String.class)
> > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > v.valueBytes(coctx)));
> > >
> > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > ver.toString().getBytes()));
> > >
> > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
> > > Field.Store.YES));
> > >
> > >             // Next implies remove than add atomically operation.
> > >             writer.updateDocument(term, doc);
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >         finally {
> > >             updateCntr.incrementAndGet();
> > >         }
> > >     }
> > >
> > >     /**
> > >      * Removes entry for given key from this index.
> > >      *
> > >      * @param key Key.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public void remove(CacheObject key) throws IgniteCheckedException {
> > >         try {
> > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > >                 new BytesRef(key.valueBytes(objectContext()))));
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >         finally {
> > >             updateCntr.incrementAndGet();
> > >         }
> > >     }
> > >
> > >     /**
> > >      * Runs lucene fulltext query over this index.
> > >      *
> > >      * @param qry Query.
> > >      * @param filters Filters over result.
> > >      * @param pageSize Size of batch
> > >      * @return Query result.
> > >      * @throws IgniteCheckedException If failed.
> > >      */
> > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
> query(String
> > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > IgniteCheckedException {
> > >         IndexReader reader;
> > >
> > >         try {
> > >             long updates = updateCntr.get();
> > >
> > >             if (updates != 0) {
> > >                 writer.commit();
> > >
> > >                 updateCntr.addAndGet(-updates);
> > >             }
> > >
> > >             //We can cache reader\searcher and change this to
> > > 'openIfChanged'
> > >             reader = DirectoryReader.open(writer, true);
> > >         }
> > >         catch (IOException e) {
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         IndexSearcher searcher;
> > >
> > >         Query query;
> > >
> > >         try {
> > >             searcher = new IndexSearcher(reader);
> > >
> > >             MultiFieldQueryParser parser = new
> > > MultiFieldQueryParser(idxdFields,
> > >                 writer.getAnalyzer());
> > >
> > > //            parser.setAllowLeadingWildcard(true);
> > >
> > >             // Filter expired items.
> > >             Query filter =
> > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > U.currentTimeMillis(),
> > >                 null, false, false);
> > >
> > >             query = new BooleanQuery.Builder()
> > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > >                 .add(filter, BooleanClause.Occur.FILTER)
> > >                 .build();
> > >         }
> > >         catch (Exception e) {
> > >             U.closeQuiet(reader);
> > >
> > >             throw new IgniteCheckedException(e);
> > >         }
> > >
> > >         IndexingQueryCacheFilter fltr = null;
> > >
> > >         if (filters != null)
> > >             fltr = filters.forCache(cacheName);
> > >
> > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > >     }
> > >
> > >     /** {@inheritDoc} */
> > >     @Override public void close() {
> > >         U.closeQuiet(writer);
> > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > >     }
> > >
> > >     /**
> > >      * Key-value iterator over fulltext search result.
> > >      */
> > >     private class It<K, V> extends
> > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > >         private final int BatchPosBeforeHead = -1;
> > >
> > >         /** */
> > >         private static final long serialVersionUID = 0L;
> > >
> > >         /** */
> > >         private final int pageSize;
> > >
> > >         /** */
> > >         private final IndexReader reader;
> > >
> > >         /** */
> > >         private final Query query;
> > >
> > >         /** */
> > >         private final IndexSearcher searcher;
> > >
> > >         /** current batch docs*/
> > >         private ScoreDoc[] batch;
> > >
> > >         /** current position in batch*/
> > >         private int batchPos = BatchPosBeforeHead;
> > >
> > >         /** */
> > >         private final IndexingQueryCacheFilter filters;
> > >
> > >         /** */
> > >         private IgniteBiTuple<K, V> curr;
> > >
> > >         /** */
> > >         private CacheObjectContext coctx;
> > >
> > >         /**
> > >          * Constructor.
> > >          *
> > >          * @param reader Reader.
> > >          * @param searcher Searcher.
> > >          * @param filters Filters over result.
> > >          * @throws IgniteCheckedException if failed.
> > >          */
> > >         private It(IndexReader reader, IndexSearcher searcher, Query
> > query,
> > > IndexingQueryCacheFilter filters, int pageSize)
> > >             throws IgniteCheckedException {
> > >             this.reader = reader;
> > >             this.searcher = searcher;
> > >             this.filters = filters;
> > >             this.query = query;
> > >             this.pageSize = pageSize;
> > >
> > >             coctx = objectContext();
> > >
> > >             findNext();
> > >         }
> > >
> > >         /**
> > >          * @param bytes Bytes.
> > >          * @param ldr Class loader.
> > >          * @return Object.
> > >          * @throws IgniteCheckedException If failed.
> > >          */
> > >         @SuppressWarnings("unchecked")
> > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
> > > IgniteCheckedException {
> > >             if (coctx == null) // For tests.
> > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > >
> > >             return
> > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > bytes, ldr);
> > >         }
> > >
> > >         /**
> > >          * Finds next element.
> > >          *
> > >          * @throws IgniteCheckedException If failed.
> > >          */
> > >         @SuppressWarnings("unchecked")
> > >         private void findNext() throws IgniteCheckedException {
> > >             curr = null;
> > >
> > >             if(isClosed())
> > >                 throw new IgniteCheckedException("Iterator already
> > > closed");
> > >
> > >             if (shouldRequestNextBatch()) {
> > >                 try {
> > >                     requestNextBatch();
> > >                 } catch (IOException e) {
> > >                     close();
> > >                     throw new IgniteCheckedException(e);
> > >                 }
> > >             }
> > >
> > >             if(batch == null)
> > >                 return;
> > >
> > >             while (batchPos < batch.length) {
> > >                 Document doc;
> > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > >
> > >                 try {
> > >                     doc = searcher.doc(scoreDoc.doc);
> > >                 }
> > >                 catch (IOException e) {
> > >                     throw new IgniteCheckedException(e);
> > >                 }
> > >
> > >                 ClassLoader ldr = null;
> > >
> > >                 if (ctx != null && ctx.deploy().enabled())
> > >                     ldr =
> > > ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > >
> > >                 K k =
> > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > ldr);
> > >
> > >                 if (filters != null && !filters.apply(k))
> > >                     continue;
> > >
> > >                 V v = type.valueClass() == String.class ?
> > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > >
> > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> > >
> > >                 assert v != null;
> > >
> > >                 curr = new IgniteBiTuple<>(k, v);
> > >
> > >                 break;
> > >             }
> > >         }
> > >
> > >         private boolean shouldRequestNextBatch()  {
> > >             if(batch == null){
> > >                 // should request for first batch
> > >                 return (batchPos == BatchPosBeforeHead) ;
> > >             } else {
> > >                 // should request when reached to the end of batch
> > >                 return (batchPos  == batch.length);
> > >             }
> > >         }
> > >
> > >         private void requestNextBatch() throws IOException {
> > >             TopDocs docs;
> > >
> > >             if (batch == null) {
> > >                 docs = searcher.search(query, pageSize);
> > >             } else {
> > >                 docs = searcher.searchAfter(batch[batch.length - 1],
> > query,
> > > pageSize);
> > >             }
> > >
> > >             if(docs.scoreDocs.length ==0) {
> > >                 batch = null;
> > >             }else{
> > >                 batch = docs.scoreDocs;
> > >             }
> > >
> > >             batchPos = 0;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > IgniteCheckedException {
> > >             IgniteBiTuple<K, V> res = curr;
> > >
> > >             findNext();
> > >
> > >             return res;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected boolean onHasNext() throws
> > > IgniteCheckedException {
> > >             return curr != null;
> > >         }
> > >
> > >         /** {@inheritDoc} */
> > >         @Override protected void onClose() throws
> IgniteCheckedException
> > {
> > >             U.closeQuiet(reader);
> > >         }
> > >     }
> > > }
> > > ```
> > >
> > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > nguyenmanhtam123@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I tried to implement iterator for GridLuceneInde, could you please
> help
> > > to
> > > > review ?
> > > >
> > > > --
> > > > Thanks & Best Regards
> > > >
> > > > Tam, Nguyen Manh
> > > >
> > > >
> > >
> > > --
> > > Thanks & Best Regards
> > >
> > > Tam, Nguyen Manh
> > >
> >
>
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message