ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Pavlov <dpavlov....@gmail.com>
Subject Re: IGNITE-7482 Cursor in TextQuery fetches all data in first call to next() or hasNext()
Date Tue, 11 Sep 2018 09:36:16 GMT
Hi Tam Manh Nguyen,

I've added you to the list of contributors, so now you can assign an issue
to yourself.

Welcome to the Apache Ignite Community!

Sincerely,
Dmitriy Pavlov

пн, 10 сент. 2018 г. в 23:23, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com>:

> Hi,
>
> I just registered. Here is my jira account: nguyenmanhtam123@gmail.com
>
> Thank you,
> Tamnm
>
> On Mon, Sep 10, 2018 at 3:25 PM Alexey Goncharuk <
> alexey.goncharuk@gmail.com>
> wrote:
>
> > Hi,
> >
> > Please send your jira account ID so we can add you to the contributors
> > list. Then you will be able to assign tickets to yourself and contribute
> to
> > the project according to the process.
> >
> > You can get more info here:
> >
> > https://cwiki.apache.org/confluence/display/IGNITE/Development+Process
> > https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute
> >
> > --AG
> >
> > пн, 10 сент. 2018 г. в 9:16, Tâm Nguyễn Mạnh <nguyenmanhtam123@gmail.com
> >:
> >
> > > Hi,
> > > I have not been assigned yet. But i really want to.
> > >
> > > On Fri, Sep 7, 2018 at 4:13 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > wrote:
> > >
> > > > Hello!
> > > >
> > > > Can you please frame it as Github pull request as per our process? Do
> > you
> > > > have ticket for that?
> > > >
> > > > Regards,
> > > > --
> > > > Ilya Kasnacheev
> > > >
> > > >
> > > > пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <
> > nguyenmanhtam123@gmail.com
> > > >:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> > > > > ```java
> > > > > /*
> > > > >  * Licensed to the Apache Software Foundation (ASF) under one or
> more
> > > > >  * contributor license agreements.  See the NOTICE file distributed
> > > with
> > > > >  * this work for additional information regarding copyright
> > ownership.
> > > > >  * The ASF licenses this file to You under the Apache License,
> > Version
> > > > 2.0
> > > > >  * (the "License"); you may not use this file except in compliance
> > with
> > > > >  * the License.  You may obtain a copy of the License at
> > > > >  *
> > > > >  *      http://www.apache.org/licenses/LICENSE-2.0
> > > > >  *
> > > > >  * Unless required by applicable law or agreed to in writing,
> > software
> > > > >  * distributed under the License is distributed on an "AS IS"
> BASIS,
> > > > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > > > implied.
> > > > >  * See the License for the specific language governing permissions
> > and
> > > > >  * limitations under the License.
> > > > >  */
> > > > >
> > > > > package org.apache.ignite.internal.processors.query.h2.opt;
> > > > >
> > > > > import java.io.IOException;
> > > > > import java.util.Collection;
> > > > > import java.util.concurrent.atomic.AtomicLong;
> > > > > import org.apache.ignite.IgniteCheckedException;
> > > > > import org.apache.ignite.internal.GridKernalContext;
> > > > > import org.apache.ignite.internal.processors.cache.CacheObject;
> > > > > import
> > org.apache.ignite.internal.processors.cache.CacheObjectContext;
> > > > > import
> > > > >
> org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> > > > > import
> > > > >
> org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> > > > > import
> > > > org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> > > > > import org.apache.ignite.internal.util.GridAtomicLong;
> > > > > import
> org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> > > > > import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> > > > > import
> > org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> > > > > import org.apache.ignite.internal.util.typedef.internal.U;
> > > > > import org.apache.ignite.lang.IgniteBiTuple;
> > > > > import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> > > > > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> > > > > import org.apache.lucene.analysis.standard.StandardAnalyzer;
> > > > > import org.apache.lucene.document.Document;
> > > > > import org.apache.lucene.document.Field;
> > > > > import org.apache.lucene.document.LongField;
> > > > > import org.apache.lucene.document.StoredField;
> > > > > import org.apache.lucene.document.StringField;
> > > > > import org.apache.lucene.document.TextField;
> > > > > import org.apache.lucene.index.DirectoryReader;
> > > > > import org.apache.lucene.index.IndexReader;
> > > > > import org.apache.lucene.index.IndexWriter;
> > > > > import org.apache.lucene.index.IndexWriterConfig;
> > > > > import org.apache.lucene.index.Term;
> > > > > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> > > > > import org.apache.lucene.search.BooleanClause;
> > > > > import org.apache.lucene.search.BooleanQuery;
> > > > > import org.apache.lucene.search.IndexSearcher;
> > > > > import org.apache.lucene.search.NumericRangeQuery;
> > > > > import org.apache.lucene.search.Query;
> > > > > import org.apache.lucene.search.ScoreDoc;
> > > > > import org.apache.lucene.search.TopDocs;
> > > > > import org.apache.lucene.util.BytesRef;
> > > > > import org.h2.util.JdbcUtils;
> > > > > import org.jetbrains.annotations.Nullable;
> > > > >
> > > > > import static
> > > > >
> > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> > > > > import static
> > > > >
> > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
> > > > >
> > > > > /**
> > > > >  * Lucene fulltext index.
> > > > >  */
> > > > > public class GridLuceneIndex implements AutoCloseable {
> > > > >     /** Field name for string representation of value. */
> > > > >     public static final String VAL_STR_FIELD_NAME =
> "_gg_val_str__";
> > > > >
> > > > >     /** Field name for value version. */
> > > > >     public static final String VER_FIELD_NAME = "_gg_ver__";
> > > > >
> > > > >     /** Field name for value expiration time. */
> > > > >     public static final String EXPIRATION_TIME_FIELD_NAME =
> > > > > "_gg_expires__";
> > > > >
> > > > >     /** */
> > > > >     private final String cacheName;
> > > > >
> > > > >     /** */
> > > > >     private final GridQueryTypeDescriptor type;
> > > > >
> > > > >     /** */
> > > > >     private final IndexWriter writer;
> > > > >
> > > > >     /** */
> > > > >     private final String[] idxdFields;
> > > > >
> > > > >     /** */
> > > > >     private final AtomicLong updateCntr = new GridAtomicLong();
> > > > >
> > > > >     /** */
> > > > >     private final GridLuceneDirectory dir;
> > > > >
> > > > >     /** */
> > > > >     private final GridKernalContext ctx;
> > > > >
> > > > >     /**
> > > > >      * Constructor.
> > > > >      *
> > > > >      * @param ctx Kernal context.
> > > > >      * @param cacheName Cache name.
> > > > >      * @param type Type descriptor.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> > > > > cacheName, GridQueryTypeDescriptor type)
> > > > >         throws IgniteCheckedException {
> > > > >         this.ctx = ctx;
> > > > >         this.cacheName = cacheName;
> > > > >         this.type = type;
> > > > >
> > > > >         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
> > > > >
> > > > >         try {
> > > > >             writer = new IndexWriter(dir, new IndexWriterConfig(new
> > > > > StandardAnalyzer()));
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         GridQueryIndexDescriptor idx = type.textIndex();
> > > > >
> > > > >         if (idx != null) {
> > > > >             Collection<String> fields = idx.fields();
> > > > >
> > > > >             idxdFields = new String[fields.size() + 1];
> > > > >
> > > > >             fields.toArray(idxdFields);
> > > > >         }
> > > > >         else {
> > > > >             assert type.valueTextIndex() || type.valueClass() ==
> > > > > String.class;
> > > > >
> > > > >             idxdFields = new String[1];
> > > > >         }
> > > > >
> > > > >         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * @return Cache object context.
> > > > >      */
> > > > >     private CacheObjectContext objectContext() {
> > > > >         if (ctx == null)
> > > > >             return null;
> > > > >
> > > > >         return
> > > > >
> ctx.cache().internalCache(cacheName).context().cacheObjectContext();
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Stores given data in this fulltext index.
> > > > >      *
> > > > >      * @param k Key.
> > > > >      * @param v Value.
> > > > >      * @param ver Version.
> > > > >      * @param expires Expiration time.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     @SuppressWarnings("ConstantConditions")
> > > > >     public void store(CacheObject k, CacheObject v,
> GridCacheVersion
> > > ver,
> > > > > long expires) throws IgniteCheckedException {
> > > > >         CacheObjectContext coctx = objectContext();
> > > > >
> > > > >         Object key = k.isPlatformType() ? k.value(coctx, false) :
> k;
> > > > >         Object val = v.isPlatformType() ? v.value(coctx, false) :
> v;
> > > > >
> > > > >         Document doc = new Document();
> > > > >
> > > > >         boolean stringsFound = false;
> > > > >
> > > > >         if (type.valueTextIndex() || type.valueClass() ==
> > > String.class) {
> > > > >             doc.add(new TextField(VAL_STR_FIELD_NAME,
> val.toString(),
> > > > > Field.Store.YES));
> > > > >
> > > > >             stringsFound = true;
> > > > >         }
> > > > >
> > > > >         for (int i = 0, last = idxdFields.length - 1; i < last;
> i++)
> > {
> > > > >             Object fieldVal = type.value(idxdFields[i], key, val);
> > > > >
> > > > >             if (fieldVal != null) {
> > > > >                 doc.add(new TextField(idxdFields[i],
> > > fieldVal.toString(),
> > > > > Field.Store.YES));
> > > > >
> > > > >                 stringsFound = true;
> > > > >             }
> > > > >         }
> > > > >
> > > > >         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
> > > > >
> > > > >         try {
> > > > >             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
> > > > >
> > > > >             if (!stringsFound) {
> > > > >                 writer.deleteDocuments(term);
> > > > >
> > > > >                 return; // We did not find any strings to be
> indexed,
> > > > will
> > > > > not store data at all.
> > > > >             }
> > > > >
> > > > >             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> > > > > Field.Store.YES));
> > > > >
> > > > >             if (type.valueClass() != String.class)
> > > > >                 doc.add(new StoredField(VAL_FIELD_NAME,
> > > > > v.valueBytes(coctx)));
> > > > >
> > > > >             doc.add(new StoredField(VER_FIELD_NAME,
> > > > > ver.toString().getBytes()));
> > > > >
> > > > >             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME,
> > expires,
> > > > > Field.Store.YES));
> > > > >
> > > > >             // Next implies remove than add atomically operation.
> > > > >             writer.updateDocument(term, doc);
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >         finally {
> > > > >             updateCntr.incrementAndGet();
> > > > >         }
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Removes entry for given key from this index.
> > > > >      *
> > > > >      * @param key Key.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public void remove(CacheObject key) throws
> > IgniteCheckedException {
> > > > >         try {
> > > > >             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
> > > > >                 new BytesRef(key.valueBytes(objectContext()))));
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >         finally {
> > > > >             updateCntr.incrementAndGet();
> > > > >         }
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Runs lucene fulltext query over this index.
> > > > >      *
> > > > >      * @param qry Query.
> > > > >      * @param filters Filters over result.
> > > > >      * @param pageSize Size of batch
> > > > >      * @return Query result.
> > > > >      * @throws IgniteCheckedException If failed.
> > > > >      */
> > > > >     public <K, V> GridCloseableIterator<IgniteBiTuple<K,
V>>
> > > query(String
> > > > > qry, IndexingQueryFilter filters, int pageSize) throws
> > > > > IgniteCheckedException {
> > > > >         IndexReader reader;
> > > > >
> > > > >         try {
> > > > >             long updates = updateCntr.get();
> > > > >
> > > > >             if (updates != 0) {
> > > > >                 writer.commit();
> > > > >
> > > > >                 updateCntr.addAndGet(-updates);
> > > > >             }
> > > > >
> > > > >             //We can cache reader\searcher and change this to
> > > > > 'openIfChanged'
> > > > >             reader = DirectoryReader.open(writer, true);
> > > > >         }
> > > > >         catch (IOException e) {
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         IndexSearcher searcher;
> > > > >
> > > > >         Query query;
> > > > >
> > > > >         try {
> > > > >             searcher = new IndexSearcher(reader);
> > > > >
> > > > >             MultiFieldQueryParser parser = new
> > > > > MultiFieldQueryParser(idxdFields,
> > > > >                 writer.getAnalyzer());
> > > > >
> > > > > //            parser.setAllowLeadingWildcard(true);
> > > > >
> > > > >             // Filter expired items.
> > > > >             Query filter =
> > > > > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> > > > > U.currentTimeMillis(),
> > > > >                 null, false, false);
> > > > >
> > > > >             query = new BooleanQuery.Builder()
> > > > >                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
> > > > >                 .add(filter, BooleanClause.Occur.FILTER)
> > > > >                 .build();
> > > > >         }
> > > > >         catch (Exception e) {
> > > > >             U.closeQuiet(reader);
> > > > >
> > > > >             throw new IgniteCheckedException(e);
> > > > >         }
> > > > >
> > > > >         IndexingQueryCacheFilter fltr = null;
> > > > >
> > > > >         if (filters != null)
> > > > >             fltr = filters.forCache(cacheName);
> > > > >
> > > > >         return new It<>(reader, searcher, query, fltr, pageSize);
> > > > >     }
> > > > >
> > > > >     /** {@inheritDoc} */
> > > > >     @Override public void close() {
> > > > >         U.closeQuiet(writer);
> > > > >         U.close(dir, ctx.log(GridLuceneIndex.class));
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Key-value iterator over fulltext search result.
> > > > >      */
> > > > >     private class It<K, V> extends
> > > > > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
> > > > >         private final int BatchPosBeforeHead = -1;
> > > > >
> > > > >         /** */
> > > > >         private static final long serialVersionUID = 0L;
> > > > >
> > > > >         /** */
> > > > >         private final int pageSize;
> > > > >
> > > > >         /** */
> > > > >         private final IndexReader reader;
> > > > >
> > > > >         /** */
> > > > >         private final Query query;
> > > > >
> > > > >         /** */
> > > > >         private final IndexSearcher searcher;
> > > > >
> > > > >         /** current batch docs*/
> > > > >         private ScoreDoc[] batch;
> > > > >
> > > > >         /** current position in batch*/
> > > > >         private int batchPos = BatchPosBeforeHead;
> > > > >
> > > > >         /** */
> > > > >         private final IndexingQueryCacheFilter filters;
> > > > >
> > > > >         /** */
> > > > >         private IgniteBiTuple<K, V> curr;
> > > > >
> > > > >         /** */
> > > > >         private CacheObjectContext coctx;
> > > > >
> > > > >         /**
> > > > >          * Constructor.
> > > > >          *
> > > > >          * @param reader Reader.
> > > > >          * @param searcher Searcher.
> > > > >          * @param filters Filters over result.
> > > > >          * @throws IgniteCheckedException if failed.
> > > > >          */
> > > > >         private It(IndexReader reader, IndexSearcher searcher,
> Query
> > > > query,
> > > > > IndexingQueryCacheFilter filters, int pageSize)
> > > > >             throws IgniteCheckedException {
> > > > >             this.reader = reader;
> > > > >             this.searcher = searcher;
> > > > >             this.filters = filters;
> > > > >             this.query = query;
> > > > >             this.pageSize = pageSize;
> > > > >
> > > > >             coctx = objectContext();
> > > > >
> > > > >             findNext();
> > > > >         }
> > > > >
> > > > >         /**
> > > > >          * @param bytes Bytes.
> > > > >          * @param ldr Class loader.
> > > > >          * @return Object.
> > > > >          * @throws IgniteCheckedException If failed.
> > > > >          */
> > > > >         @SuppressWarnings("unchecked")
> > > > >         private <Z> Z unmarshall(byte[] bytes, ClassLoader
ldr)
> > throws
> > > > > IgniteCheckedException {
> > > > >             if (coctx == null) // For tests.
> > > > >                 return (Z)JdbcUtils.deserialize(bytes, null);
> > > > >
> > > > >             return
> > > > (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> > > > > bytes, ldr);
> > > > >         }
> > > > >
> > > > >         /**
> > > > >          * Finds next element.
> > > > >          *
> > > > >          * @throws IgniteCheckedException If failed.
> > > > >          */
> > > > >         @SuppressWarnings("unchecked")
> > > > >         private void findNext() throws IgniteCheckedException {
> > > > >             curr = null;
> > > > >
> > > > >             if(isClosed())
> > > > >                 throw new IgniteCheckedException("Iterator already
> > > > > closed");
> > > > >
> > > > >             if (shouldRequestNextBatch()) {
> > > > >                 try {
> > > > >                     requestNextBatch();
> > > > >                 } catch (IOException e) {
> > > > >                     close();
> > > > >                     throw new IgniteCheckedException(e);
> > > > >                 }
> > > > >             }
> > > > >
> > > > >             if(batch == null)
> > > > >                 return;
> > > > >
> > > > >             while (batchPos < batch.length) {
> > > > >                 Document doc;
> > > > >                 ScoreDoc scoreDoc =batch[batchPos++];
> > > > >
> > > > >                 try {
> > > > >                     doc = searcher.doc(scoreDoc.doc);
> > > > >                 }
> > > > >                 catch (IOException e) {
> > > > >                     throw new IgniteCheckedException(e);
> > > > >                 }
> > > > >
> > > > >                 ClassLoader ldr = null;
> > > > >
> > > > >                 if (ctx != null && ctx.deploy().enabled())
> > > > >                     ldr =
> > > > >
> > ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
> > > > >
> > > > >                 K k =
> > > > unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> > > > > ldr);
> > > > >
> > > > >                 if (filters != null && !filters.apply(k))
> > > > >                     continue;
> > > > >
> > > > >                 V v = type.valueClass() == String.class ?
> > > > >                     (V)doc.get(VAL_STR_FIELD_NAME) :
> > > > >
> > > > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes,
ldr);
> > > > >
> > > > >                 assert v != null;
> > > > >
> > > > >                 curr = new IgniteBiTuple<>(k, v);
> > > > >
> > > > >                 break;
> > > > >             }
> > > > >         }
> > > > >
> > > > >         private boolean shouldRequestNextBatch()  {
> > > > >             if(batch == null){
> > > > >                 // should request for first batch
> > > > >                 return (batchPos == BatchPosBeforeHead) ;
> > > > >             } else {
> > > > >                 // should request when reached to the end of batch
> > > > >                 return (batchPos  == batch.length);
> > > > >             }
> > > > >         }
> > > > >
> > > > >         private void requestNextBatch() throws IOException {
> > > > >             TopDocs docs;
> > > > >
> > > > >             if (batch == null) {
> > > > >                 docs = searcher.search(query, pageSize);
> > > > >             } else {
> > > > >                 docs = searcher.searchAfter(batch[batch.length -
> 1],
> > > > query,
> > > > > pageSize);
> > > > >             }
> > > > >
> > > > >             if(docs.scoreDocs.length ==0) {
> > > > >                 batch = null;
> > > > >             }else{
> > > > >                 batch = docs.scoreDocs;
> > > > >             }
> > > > >
> > > > >             batchPos = 0;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected IgniteBiTuple<K, V> onNext() throws
> > > > > IgniteCheckedException {
> > > > >             IgniteBiTuple<K, V> res = curr;
> > > > >
> > > > >             findNext();
> > > > >
> > > > >             return res;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected boolean onHasNext() throws
> > > > > IgniteCheckedException {
> > > > >             return curr != null;
> > > > >         }
> > > > >
> > > > >         /** {@inheritDoc} */
> > > > >         @Override protected void onClose() throws
> > > IgniteCheckedException
> > > > {
> > > > >             U.closeQuiet(reader);
> > > > >         }
> > > > >     }
> > > > > }
> > > > > ```
> > > > >
> > > > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <
> > > > nguyenmanhtam123@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I tried to implement iterator for GridLuceneInde, could you
> please
> > > help
> > > > > to
> > > > > > review ?
> > > > > >
> > > > > > --
> > > > > > Thanks & Best Regards
> > > > > >
> > > > > > Tam, Nguyen Manh
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > > Thanks & Best Regards
> > > > >
> > > > > Tam, Nguyen Manh
> > > > >
> > > >
> > >
> > >
> > > --
> > > Thanks & Best Regards
> > >
> > > Tam, Nguyen Manh
> > >
> >
>
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message