ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com>
Subject Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()
Date Mon, 10 Sep 2018 20:23:03 GMT
Hi,

I just registered. Here is my jira account: nguyenmanhtam123@gmail.com

Thank you,
Tamnm

On Mon, Sep 10, 2018 at 3:25 PM Alexey Goncharuk <alexey.goncharuk@gmail.com>
wrote:

> Hi,
>
> Please send your jira account ID so we can add you to the contributors
> list. Then you will be able to assign tickets to yourself and contribute to
> the project according to the process.
>
> You can get more info here:
>
> https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
> https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute
>
> --AG
>
> пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com>:
>
> > Hi,
> > I have not been assigned yet. But i really want to.
> >
> > On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <
> ilya.kasnacheev@gmail.com>
> > wrote:
> >
> > > Hello!
> > >
> > > Can you please frame it as Github pull request as per our process? Do
> you
> > > have ticket for that?
> > >
> > > Regards,
> > > --
> > > Ilya Kasnacheev
> > >
> > >
> > > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <
> nguyenmanhtam123@gmail.com
> > >:
> > >
> > > >
> > > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > > ```java
> > > > /*
> > > >  * Licensed to the Apache Software Foundation (ASF) under one or more
> > > >  * contributor license agreements.  See the NOTICE file distributed
> > with
> > > >  * this work for additional information regarding copyright
> ownership.
> > > >  * The ASF licenses this file to You under the Apache License,
> Version
> > > 2.0
> > > >  * (the "License"); you may not use this file except in compliance
> with
> > > >  * the License.  You may obtain a copy of the License at
> > > >  *
> > > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > > >  *
> > > >  * Unless required by applicable law or agreed to in writing,
> software
> > > >  * distributed under the License is distributed on an "AS IS" BASIS,
> > > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > > implied.
> > > >  * See the License for the specific language governing permissions
> and
> > > >  * limitations under the License.
> > > >  */
> > > >
> > > > package org.apache.ignite.internal.processors.query.h2.opt;
> > > >
> > > > import java.io.IOException;
> > > > import java.util.Collection;
> > > > import java.util.concurrent.atomic.AtomicLong;
> > > > import org.apache.ignite.IgniteCheckedException;
> > > > import org.apache.ignite.internal.GridKernalContext;
> > > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > > import
> org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > > import
> > > > org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > > import
> > > > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > > import
> > > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > > import
> org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > > import org.apache.ignite.lang.IgniteBiTuple;
> > > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > > import org.apache.lucene.document.Document;
> > > > import org.apache.lucene.document.Field;
> > > > import org.apache.lucene.document.LongField;
> > > > import org.apache.lucene.document.StoredField;
> > > > import org.apache.lucene.document.StringField;
> > > > import org.apache.lucene.document.TextField;
> > > > import org.apache.lucene.index.DirectoryReader;
> > > > import org.apache.lucene.index.IndexReader;
> > > > import org.apache.lucene.index.IndexWriter;
> > > > import org.apache.lucene.index.IndexWriterConfig;
> > > > import org.apache.lucene.index.Term;
> > > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > > import org.apache.lucene.search.BooleanClause;
> > > > import org.apache.lucene.search.BooleanQuery;
> > > > import org.apache.lucene.search.IndexSearcher;
> > > > import org.apache.lucene.search.NumericRangeQuery;
> > > > import org.apache.lucene.search.Query;
> > > > import org.apache.lucene.search.ScoreDoc;
> > > > import org.apache.lucene.search.TopDocs;
> > > > import org.apache.lucene.util.BytesRef;
> > > > import org.h2.util.JdbcUtils;
> > > > import org.jetbrains.annotations.Nullable;
> > > >
> > > > import static
> > > >
> org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > > import static
> > > >
> org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > > >
> > > > /**
> > > >  * Lucene fulltext index.
> > > >  */
> > > > public class GridLuceneIndex implements AutoCloseable {
> > > >     /** Field name for string representation of value. */
> > > >     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
> > > >
> > > >     /** Field name for value version. */
> > > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > > >
> > > >     /** Field name for value expiration time. */
> > > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > > "_gg_expires__";
> > > >
> > > >     /** */
> > > >     private final String cacheName;
> > > >
> > > >     /** */
> > > >     private final GridQueryTypeDescriptor type;
> > > >
> > > >     /** */
> > > >     private final IndexWriter writer;
> > > >
> > > >     /** */
> > > >     private final String[] idxdFields;
> > > >
> > > >     /** */
> > > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > > >
> > > >     /** */
> > > >     private final GridLuceneDirectory dir;
> > > >
> > > >     /** */
> > > >     private final GridKernalContext ctx;
> > > >
> > > >     /**
> > > >      * Constructor.
> > > >      *
> > > >      * @param ctx Kernal context.
> > > >      * @param cacheName Cache name.
> > > >      * @param type Type descriptor.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > > cacheName, GridQueryTypeDescriptor type)
> > > >         throws IgniteCheckedException {
> > > >         this.ctx = ctx;
> > > >         this.cacheName = cacheName;
> > > >         this.type = type;
> > > >
> > > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > > >
> > > >         try {
> > > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > > StandardAnalyzer()));
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         GridQueryIndexDescriptor idx = type.textIndex();
> > > >
> > > >         if (idx != null) {
> > > >             Collection<String> fields = idx.fields();
> > > >
> > > >             idxdFields = new String[fields.size() + 1];
> > > >
> > > >             fields.toArray(idxdFields);
> > > >         }
> > > >         else {
> > > >             assert type.valueTextIndex() || type.valueClass() ==
> > > > String.class;
> > > >
> > > >             idxdFields = new String[1];
> > > >         }
> > > >
> > > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > > >     }
> > > >
> > > >     /**
> > > >      * @return Cache object context.
> > > >      */
> > > >     private CacheObjectContext objectContext() {
> > > >         if (ctx == null)
> > > >             return null;
> > > >
> > > >         return
> > > > ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > > >     }
> > > >
> > > >     /**
> > > >      * Stores given data in this fulltext index.
> > > >      *
> > > >      * @param k Key.
> > > >      * @param v Value.
> > > >      * @param ver Version.
> > > >      * @param expires Expiration time.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     @SuppressWarnings("ConstantConditions")
> > > >     public void store(CacheObject k, CacheObject v, GridCacheVersion
> > ver,
> > > > long expires) throws IgniteCheckedException {
> > > >         CacheObjectContext coctx = objectContext();
> > > >
> > > >         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
> > > >         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
> > > >
> > > >         Document doc = new Document();
> > > >
> > > >         boolean stringsFound = false;
> > > >
> > > >         if (type.valueTextIndex() || type.valueClass() ==
> > String.class) {
> > > >             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> > > > Field.Store.YES));
> > > >
> > > >             stringsFound = true;
> > > >         }
> > > >
> > > >         for (int i = 0, last = idxdFields.length - 1; i < last; i++)
> {
> > > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > > >
> > > >             if (fieldVal != null) {
> > > >                 doc.add(new TextField(idxdFields[i],
> > fieldVal.toString(),
> > > > Field.Store.YES));
> > > >
> > > >                 stringsFound = true;
> > > >             }
> > > >         }
> > > >
> > > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > > >
> > > >         try {
> > > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > > >
> > > >             if (!stringsFound) {
> > > >                 writer.deleteDocuments(term);
> > > >
> > > >                 return; // We did not find any strings to be indexed,
> > > will
> > > > not store data at all.
> > > >             }
> > > >
> > > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > > Field.Store.YES));
> > > >
> > > >             if (type.valueClass() != String.class)
> > > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > > v.valueBytes(coctx)));
> > > >
> > > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > > ver.toString().getBytes()));
> > > >
> > > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME,
> expires,
> > > > Field.Store.YES));
> > > >
> > > >             // Next implies remove than add atomically operation.
> > > >             writer.updateDocument(term, doc);
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >         finally {
> > > >             updateCntr.incrementAndGet();
> > > >         }
> > > >     }
> > > >
> > > >     /**
> > > >      * Removes entry for given key from this index.
> > > >      *
> > > >      * @param key Key.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public void remove(CacheObject key) throws
> IgniteCheckedException {
> > > >         try {
> > > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > > >                 new BytesRef(key.valueBytes(objectContext()))));
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >         finally {
> > > >             updateCntr.incrementAndGet();
> > > >         }
> > > >     }
> > > >
> > > >     /**
> > > >      * Runs lucene fulltext query over this index.
> > > >      *
> > > >      * @param qry Query.
> > > >      * @param filters Filters over result.
> > > >      * @param pageSize Size of batch
> > > >      * @return Query result.
> > > >      * @throws IgniteCheckedException If failed.
> > > >      */
> > > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
> > query(String
> > > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > > IgniteCheckedException {
> > > >         IndexReader reader;
> > > >
> > > >         try {
> > > >             long updates = updateCntr.get();
> > > >
> > > >             if (updates != 0) {
> > > >                 writer.commit();
> > > >
> > > >                 updateCntr.addAndGet(-updates);
> > > >             }
> > > >
> > > >             //We can cache reader\searcher and change this to
> > > > 'openIfChanged'
> > > >             reader = DirectoryReader.open(writer, true);
> > > >         }
> > > >         catch (IOException e) {
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         IndexSearcher searcher;
> > > >
> > > >         Query query;
> > > >
> > > >         try {
> > > >             searcher = new IndexSearcher(reader);
> > > >
> > > >             MultiFieldQueryParser parser = new
> > > > MultiFieldQueryParser(idxdFields,
> > > >                 writer.getAnalyzer());
> > > >
> > > > //            parser.setAllowLeadingWildcard(true);
> > > >
> > > >             // Filter expired items.
> > > >             Query filter =
> > > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > > U.currentTimeMillis(),
> > > >                 null, false, false);
> > > >
> > > >             query = new BooleanQuery.Builder()
> > > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > > >                 .add(filter, BooleanClause.Occur.FILTER)
> > > >                 .build();
> > > >         }
> > > >         catch (Exception e) {
> > > >             U.closeQuiet(reader);
> > > >
> > > >             throw new IgniteCheckedException(e);
> > > >         }
> > > >
> > > >         IndexingQueryCacheFilter fltr = null;
> > > >
> > > >         if (filters != null)
> > > >             fltr = filters.forCache(cacheName);
> > > >
> > > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > > >     }
> > > >
> > > >     /** {@inheritDoc} */
> > > >     @Override public void close() {
> > > >         U.closeQuiet(writer);
> > > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > > >     }
> > > >
> > > >     /**
> > > >      * Key-value iterator over fulltext search result.
> > > >      */
> > > >     private class It<K, V> extends
> > > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > > >         private final int BatchPosBeforeHead = -1;
> > > >
> > > >         /** */
> > > >         private static final long serialVersionUID = 0L;
> > > >
> > > >         /** */
> > > >         private final int pageSize;
> > > >
> > > >         /** */
> > > >         private final IndexReader reader;
> > > >
> > > >         /** */
> > > >         private final Query query;
> > > >
> > > >         /** */
> > > >         private final IndexSearcher searcher;
> > > >
> > > >         /** current batch docs*/
> > > >         private ScoreDoc[] batch;
> > > >
> > > >         /** current position in batch*/
> > > >         private int batchPos = BatchPosBeforeHead;
> > > >
> > > >         /** */
> > > >         private final IndexingQueryCacheFilter filters;
> > > >
> > > >         /** */
> > > >         private IgniteBiTuple<K, V> curr;
> > > >
> > > >         /** */
> > > >         private CacheObjectContext coctx;
> > > >
> > > >         /**
> > > >          * Constructor.
> > > >          *
> > > >          * @param reader Reader.
> > > >          * @param searcher Searcher.
> > > >          * @param filters Filters over result.
> > > >          * @throws IgniteCheckedException if failed.
> > > >          */
> > > >         private It(IndexReader reader, IndexSearcher searcher, Query
> > > query,
> > > > IndexingQueryCacheFilter filters, int pageSize)
> > > >             throws IgniteCheckedException {
> > > >             this.reader = reader;
> > > >             this.searcher = searcher;
> > > >             this.filters = filters;
> > > >             this.query = query;
> > > >             this.pageSize = pageSize;
> > > >
> > > >             coctx = objectContext();
> > > >
> > > >             findNext();
> > > >         }
> > > >
> > > >         /**
> > > >          * @param bytes Bytes.
> > > >          * @param ldr Class loader.
> > > >          * @return Object.
> > > >          * @throws IgniteCheckedException If failed.
> > > >          */
> > > >         @SuppressWarnings("unchecked")
> > > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr)
> throws
> > > > IgniteCheckedException {
> > > >             if (coctx == null) // For tests.
> > > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > > >
> > > >             return
> > > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > > bytes, ldr);
> > > >         }
> > > >
> > > >         /**
> > > >          * Finds next element.
> > > >          *
> > > >          * @throws IgniteCheckedException If failed.
> > > >          */
> > > >         @SuppressWarnings("unchecked")
> > > >         private void findNext() throws IgniteCheckedException {
> > > >             curr = null;
> > > >
> > > >             if(isClosed())
> > > >                 throw new IgniteCheckedException("Iterator already
> > > > closed");
> > > >
> > > >             if (shouldRequestNextBatch()) {
> > > >                 try {
> > > >                     requestNextBatch();
> > > >                 } catch (IOException e) {
> > > >                     close();
> > > >                     throw new IgniteCheckedException(e);
> > > >                 }
> > > >             }
> > > >
> > > >             if(batch == null)
> > > >                 return;
> > > >
> > > >             while (batchPos < batch.length) {
> > > >                 Document doc;
> > > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > > >
> > > >                 try {
> > > >                     doc = searcher.doc(scoreDoc.doc);
> > > >                 }
> > > >                 catch (IOException e) {
> > > >                     throw new IgniteCheckedException(e);
> > > >                 }
> > > >
> > > >                 ClassLoader ldr = null;
> > > >
> > > >                 if (ctx != null && ctx.deploy().enabled())
> > > >                     ldr =
> > > >
> ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > > >
> > > >                 K k =
> > > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > > ldr);
> > > >
> > > >                 if (filters != null && !filters.apply(k))
> > > >                     continue;
> > > >
> > > >                 V v = type.valueClass() == String.class ?
> > > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > > >
> > > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
> > > >
> > > >                 assert v != null;
> > > >
> > > >                 curr = new IgniteBiTuple<>(k, v);
> > > >
> > > >                 break;
> > > >             }
> > > >         }
> > > >
> > > >         private boolean shouldRequestNextBatch()  {
> > > >             if(batch == null){
> > > >                 // should request for first batch
> > > >                 return (batchPos == BatchPosBeforeHead) ;
> > > >             } else {
> > > >                 // should request when reached to the end of batch
> > > >                 return (batchPos  == batch.length);
> > > >             }
> > > >         }
> > > >
> > > >         private void requestNextBatch() throws IOException {
> > > >             TopDocs docs;
> > > >
> > > >             if (batch == null) {
> > > >                 docs = searcher.search(query, pageSize);
> > > >             } else {
> > > >                 docs = searcher.searchAfter(batch[batch.length - 1],
> > > query,
> > > > pageSize);
> > > >             }
> > > >
> > > >             if(docs.scoreDocs.length ==0) {
> > > >                 batch = null;
> > > >             }else{
> > > >                 batch = docs.scoreDocs;
> > > >             }
> > > >
> > > >             batchPos = 0;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > > IgniteCheckedException {
> > > >             IgniteBiTuple<K, V> res = curr;
> > > >
> > > >             findNext();
> > > >
> > > >             return res;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected boolean onHasNext() throws
> > > > IgniteCheckedException {
> > > >             return curr != null;
> > > >         }
> > > >
> > > >         /** {@inheritDoc} */
> > > >         @Override protected void onClose() throws
> > IgniteCheckedException
> > > {
> > > >             U.closeQuiet(reader);
> > > >         }
> > > >     }
> > > > }
> > > > ```
> > > >
> > > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > > nguyenmanhtam123@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I tried to implement iterator for GridLuceneInde, could you please
> > help
> > > > to
> > > > > review ?
> > > > >
> > > > > --
> > > > > Thanks & Best Regards
> > > > >
> > > > > Tam, Nguyen Manh
> > > > >
> > > > >
> > > >
> > > > --
> > > > Thanks & Best Regards
> > > >
> > > > Tam, Nguyen Manh
> > > >
> > >
> >
> >
> > --
> > Thanks & Best Regards
> >
> > Tam, Nguyen Manh
> >
>


-- 
Thanks & Best Regards

Tam, Nguyen Manh

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message