lucene index search 2 - yaokun123/php-wiki GitHub Wiki

Lucene学习总结之四:Lucene索引过程分析(2)

3、将文档加入IndexWriter

代码

writer.addDocument(doc); 
-->IndexWriter.addDocument(Document doc, Analyzer analyzer) 
     -->doFlush = docWriter.addDocument(doc, analyzer); 
          --> DocumentsWriter.updateDocument(Document, Analyzer, Term) 
注:--> 代表一级函数调用

IndexWriter继而调用DocumentsWriter.addDocument,其又调用DocumentsWriter.updateDocument。

4、将文档加入DocumentsWriter

代码:

DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm) 
-->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm); 
-->(2) DocWriter perDoc = state.consumer.processDocument(); 
-->(3) finishDocument(state, perDoc);

DocumentsWriter对象主要包含以下几部分:

  • 用于写索引文件
IndexWriter writer;

Directory directory;

Similarity similarity:分词器

String segment:当前的段名,每当flush的时候,将索引写入以此为名称的段。
IndexWriter.doFlushInternal() 
--> String segment = docWriter.getSegment();//return segment 
--> newSegment = new SegmentInfo(segment,……); 
--> docWriter.createCompoundFile(segment);//根据segment创建cfs文件。
String docStoreSegment:存储域所要写入的目标段。(在索引文件格式一文中已经详细描述)
int docStoreOffset:存储域在目标段中的偏移量。
int nextDocID:下一篇添加到此索引的文档ID号,对于同一个索引文件夹,此变量唯一,且同步访问。
DocConsumer consumer; 这是整个索引过程的核心,是IndexChain整个索引链的源头。
基本索引链:

对于一篇文档的索引过程,不是由一个对象来完成的,而是用对象组合的方式形成的一个处理链,链上的每个对象仅仅处理索引过程的一部分,称为索引
链,由于后面还有其他的索引链,所以此处的索引链我称为基本索引链。

DocConsumer consumer 类型为DocFieldProcessor,是整个索引链的源头,包含如下部分:

对索引域的处理
DocFieldConsumer consumer 类型为DocInverter,包含如下部分
  InvertedDocConsumer consumer类型为TermsHash,包含如下部分
    TermsHashConsumer consumer类型为FreqProxTermsWriter,负责写freq, prox信息
      TermsHash nextTermsHash
      TermsHashConsumer consumer类型为TermVectorsTermsWriter,负责写tvx, tvd, tvf信息
    InvertedDocEndConsumer endConsumer 类型为NormsWriter,负责写nrm信息
对存储域的处理
  FieldInfos fieldInfos = new FieldInfos();
  StoredFieldsWriter fieldsWriter负责写fnm, fdt, fdx信息
  • 删除文档 BufferedDeletes deletesInRAM = new BufferedDeletes(); BufferedDeletes deletesFlushed = new BufferedDeletes();
类BufferedDeletes包含了一下的成员变量:

HashMap<Term,Num> terms = new HashMap<Term,Num>();删除的词(Term)
HashMap<Query,Integer> queries = new HashMap<Query,Integer>();删除的查询(Query)
List<Integer> docIDs = new ArrayList<Integer>();删除的文档ID
long bytesUsed:用于判断是否应该对删除的文档写入索引文件。
由此可见,文档的删除主要有三种方式:

IndexWriter.deleteDocuments(Term term):所有包含此词的文档都会被删除。
IndexWriter.deleteDocuments(Query query):所有能满足此查询的文档都会被删除。
IndexReader.deleteDocument(int docNum):删除此文档ID
删除文档既可以用reader进行删除,也可以用writer进行删除,不同的是,reader进行删除后,此reader马上能够生效,而用writer删除后,会被缓存在deletesInRAM及deletesFlushed中,只有写入到索引文件中,当reader再次打开的时候,才能够看到。

那deletesInRAM和deletesFlushed各有什么用处呢?

此版本的Lucene对文档的删除是支持多线程的,当用IndexWriter删除文档的时候,都是缓存在deletesInRAM中的,直到flush,才将删除的文档写入到索引文件中去,我们知道flush是需要一段时间的,那么在flush的过程中,另一个线程又有文档删除怎么办呢?

一般过程是这个样子的,当flush的时候,首先在同步(synchornized)的方法pushDeletes中,将deletesInRAM全部加到deletesFlushed中,然后将deletesInRAM清空,退出同步方法,于是flush的线程程就向索引文件写deletesFlushed中的删除文档的过程,而与此同时其他线程新删除的文档则添加到新的deletesInRAM中去,直到下次flush才写入索引文件。
  • 缓存管理

为了提高索引的速度,Lucene对很多的数据进行了缓存,使一起写入磁盘,然而缓存需要进行管理,何时分配,何时回收,何时写入磁盘都需要考虑。 ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();将用于缓存词(Term)信息的空闲块 ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();将用于缓存文档号(doc id)及词频(freq),位置(prox)信息的空闲块。 ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();将存储某词的词频(freq)和位置(prox)分别在byteBlocks中的偏移量 boolean bufferIsFull;用来判断缓存是否满了,如果满了,则应该写入磁盘 long numBytesAlloc;分配的内存数量 long numBytesUsed;使用的内存数量 long freeTrigger;应该开始回收内存时的内存用量。 long freeLevel;回收内存应该回收到的内存用量。 long ramBufferSize;用户设定的内存用量。

缓存用量之间的关系如下: 
DocumentsWriter.setRAMBufferSizeMB(double mb){ 
    ramBufferSize = (long) (mb*1024*1024);//用户设定的内存用量,当使用内存大于此时,开始写入磁盘 
    waitQueuePauseBytes = (long) (ramBufferSize*0.1); 
    waitQueueResumeBytes = (long) (ramBufferSize*0.05); 
    freeTrigger = (long) (1.05 * ramBufferSize);//当分配的内存到达105%的时候开始释放freeBlocks中的内存 
    freeLevel = (long) (0.95 * ramBufferSize);//一直释放到95%

} 

DocumentsWriter.balanceRAM(){ 
    if (numBytesAlloc+deletesRAMUsed > freeTrigger) { 
    //当分配的内存加删除文档所占用的内存大于105%的时候,开始释放内存 
        while(numBytesAlloc+deletesRAMUsed > freeLevel) { 
        //一直进行释放,直到95% 
            //释放free blocks

            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); 
            numBytesAlloc -= BYTE_BLOCK_SIZE;

            freeCharBlocks.remove(freeCharBlocks.size()-1); 
            numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;

            freeIntBlocks.remove(freeIntBlocks.size()-1); 
            numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; 
        } 
    } else {

        if (numBytesUsed+deletesRAMUsed > ramBufferSize){

        //当使用的内存加删除文档占有的内存大于用户指定的内存时,可以写入磁盘

              bufferIsFull = true;

        }

    } 
}

当判断是否应该写入磁盘时:

如果使用的内存大于用户指定内存时,bufferIsFull = true
当使用的内存加删除文档所占的内存加正在写入的删除文档所占的内存大于用户指定内存时 deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize
当删除的文档数目大于maxBufferedDeleteTerms时
DocumentsWriter.timeToFlushDeletes(){

    return (bufferIsFull || deletesFull()) && setFlushPending();

}

DocumentsWriter.deletesFull(){

    return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && 
        (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || 
        (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && 
        ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));

}
  • 多线程并发索引

为了支持多线程并发索引,对每一个线程都有一个DocumentsWriterThreadState,其为每一个线程根据DocConsumer consumer的索引链来创建每个线程的索引链(XXXPerThread),来进行对文档的并发处理。 DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>(); 虽然对文档的处理过程可以并行,但是将文档写入索引文件却必须串行进行,串行写入的代码在DocumentsWriter.finishDocument中 WaitQueue waitQueue = new WaitQueue() long waitQueuePauseBytes long waitQueueResumeBytes

在Lucene中,文档是按添加的顺序编号的,DocumentsWriter中的nextDocID就是记录下一个添加的文档id。 当Lucene支持多线程的时候,就必须要有一个synchornized方法来付给文档id并且将nextDocID加一,这些是在DocumentsWriter.getThreadState这个函数里面做的。

虽然给文档付ID没有问题了。但是由Lucene索引文件格式我们知道,文档是要按照ID的顺序从小到大写到索引文件中去的,然而不同的文档处理速度不同,当一个先来的线程一处理一篇需要很长时间的大文档时,另一个后来的线程二可能已经处理了很多小的文档了,但是这些后来小文档的ID号都大于第一个线程所处理的大文档,因而不能马上写到索引文件中去,而是放到waitQueue中,仅仅当大文档处理完了之后才写入索引文件。

waitQueue中有一个变量nextWriteDocID表示下一个可以写入文件的ID,当付给大文档ID=4时,则nextWriteDocID也设为4,虽然后来的小文档5,6,7,8等都已处理结束,但是如下代码,

WaitQueue.add(){

    if (doc.docID == nextWriteDocID){ 
       ………… 
    } else { 
        waiting[loc] = doc; 
        waitingBytes += doc.sizeInBytes(); 
   }

   doPause()

}

则把5, 6, 7, 8放入waiting队列,并且记录当前等待的文档所占用的内存大小waitingBytes。

当大文档4处理完毕后,不但写入文档4,把原来等待的文档5, 6, 7, 8也一起写入。

WaitQueue.add(){

    if (doc.docID == nextWriteDocID) {

       writeDocument(doc);

       while(true) {

           doc = waiting[nextWriteLoc];

           writeDocument(doc);

       }

   } else {

      …………

   }

   doPause()

}

但是这存在一个问题:当大文档很大很大,处理的很慢很慢的时候,后来的线程二可能已经处理了很多的小文档了,这些文档都是在waitQueue中,则占有了越来越多的内存,长此以往,有内存不够的危险。

因而在finishDocuments里面,在WaitQueue.add最后调用了doPause()函数

DocumentsWriter.finishDocument(){

    doPause = waitQueue.add(docWriter);

    if (doPause) 
        waitForWaitQueue();

    notifyAll();

}

WaitQueue.doPause() { 
    return waitingBytes > waitQueuePauseBytes; 
}

当waitingBytes足够大的时候(为用户指定的内存使用量的10%),doPause返回true,于是后来的线程二会进入wait状态,不再处理另外的文档,而是等待线程一处理大文档结束。

当线程一处理大文档结束的时候,调用notifyAll唤醒等待他的线程。

DocumentsWriter.waitForWaitQueue() { 
  do { 
    try { 
      wait(); 
    } catch (InterruptedException ie) { 
      throw new ThreadInterruptedException(ie); 
    } 
  } while (!waitQueue.doResume()); 
}

WaitQueue.doResume() { 
     return waitingBytes <= waitQueueResumeBytes; 
}

当waitingBytes足够小的时候,doResume返回true, 则线程二不用再wait了,可以继续处理另外的文档。
  • 一些标志位 int maxFieldLength:一篇文档中,一个域内可索引的最大的词(Term)数。

int maxBufferedDeleteTerms:可缓存的最大的删除词(Term)数。当大于这个数的时候,就要写到文件中了。

此过程又包含如下三个子过程:

4.1、得到当前线程对应的文档集处理对象(DocumentsWriterThreadState)

代码为:

DocumentsWriterThreadState state = getThreadState(doc, delTerm);

在Lucene中,对于同一个索引文件夹,只能够有一个IndexWriter打开它,在打开后,在文件夹中,生成文件write.lock,当其他IndexWriter再试图打开此索引文件夹的时候,则会报org.apache.lucene.store.LockObtainFailedException错误。

这样就出现了这样一个问题,在同一个进程中,对同一个索引文件夹,只能有一个IndexWriter打开它,因而如果想多线程向此索引文件夹中添加文档,则必须共享一个IndexWriter,而且在以往的实现中,addDocument函数是同步的(synchronized),也即多线程的索引并不能起到提高性能的效果。

于是为了支持多线程索引,不使IndexWriter成为瓶颈,对于每一个线程都有一个相应的文档集处理对象(DocumentsWriterThreadState),这样对文档的索引过程可以多线程并行进行,从而增加索引的速度。

getThreadState函数是同步的(synchronized),DocumentsWriter有一个成员变量threadBindings,它是一个HashMap,键为线程对象(Thread.currentThread()),值为此线程对应的DocumentsWriterThreadState对象。

DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下几个过程:

根据当前线程对象,从HashMap中查找相应的DocumentsWriterThreadState对象,如果没找到,则生成一个新对象,并添加到HashMap中

DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread()); 
if (state == null) { 
    …… 
    state = new DocumentsWriterThreadState(this); 
    …… 
    threadBindings.put(Thread.currentThread(), state); 
} 

如果此线程对象正在用于处理上一篇文档,则等待,直到此线程的上一篇文档处理完。

DocumentsWriter.getThreadState() { 
    waitReady(state); 
    state.isIdle = false; 
} 

waitReady(state) { 
    while (!state.isIdle) {wait();} 
}  

显然如果state.isIdle为false,则此线程等待。 
在一篇文档处理之前,state.isIdle = false会被设定,而在一篇文档处理完毕之后,DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)中,会首先设定perThread.isIdle = true; 然后notifyAll()来唤醒等待此文档完成的线程,从而处理下一篇文档。

如果IndexWriter刚刚commit过,则新添加的文档要加入到新的段中(segment),则首先要生成新的段名。

initSegmentName(false); 
--> if (segment == null) segment = writer.newSegmentName();

将此线程的文档处理对象设为忙碌:state.isIdle = false;

4.2、用得到的文档集处理对象(DocumentsWriterThreadState)处理文档

代码为:

DocWriter perDoc = state.consumer.processDocument();

每一个文档集处理对象DocumentsWriterThreadState都有一个文档及域处理对象DocFieldProcessorPerThread,它的成员函数processDocument()被调用来对文档及域进行处理。

线程索引链(XXXPerThread):
由于要多线程进行索引,因而每个线程都要有自己的索引链,称为线程索引链。

线程索引链同基本索引链有相似的树形结构,由基本索引链中每个层次的对象调用addThreads进行创建的,负责每个线程的对文档的处理。

DocFieldProcessorPerThread是线程索引链的源头,由DocFieldProcessor.addThreads(…)创建

DocFieldProcessorPerThread对象结构如下:

对索引域进行处理
DocFieldConsumerPerThread consumer 类型为 DocInverterPerThread,由DocInverter.addThreads创建
InvertedDocConsumerPerThread consumer 类型为TermsHashPerThread,由TermsHash.addThreads创建
TermsHashConsumerPerThread consumer类型为FreqProxTermsWriterPerThread,由FreqProxTermsWriter.addThreads创建,负责每个线程的freq,prox信息处理
TermsHashPerThread nextPerThread
TermsHashConsumerPerThread consumer类型TermVectorsTermsWriterPerThread,由TermVectorsTermsWriter创建,负责每个线程的tvx,tvd,tvf信息处理
InvertedDocEndConsumerPerThread endConsumer 类型为NormsWriterPerThread,由NormsWriter.addThreads创建,负责nrm信息的处理
对存储域进行处理
StoredFieldsWriterPerThread fieldsWriter由StoredFieldsWriter.addThreads创建,负责fnm,fdx,fdt的处理。
FieldInfos fieldInfos;

DocumentsWriter.DocWriter DocFieldProcessorPerThread.processDocument()包含以下几个过程:

4.2.1、开始处理当前文档
consumer(DocInverterPerThread).startDocument(); 
fieldsWriter(StoredFieldsWriterPerThread).startDocument();

在此版的Lucene中,几乎所有的XXXPerThread的类,都有startDocument和finishDocument两个函数,因为对同一个线程,这些对象都是复用的,而非对每一篇新来的文档都创建一套,这样也提高了效率,也牵扯到数据的清理问题。一般在startDocument函数中,清理处理上篇文档遗留的数据,在finishDocument中,收集本次处理的结果数据,并返回,一直返回到DocumentsWriter.updateDocument(Document, Analyzer, Term) 然后根据条件判断是否将数据刷新到硬盘上。

4.2.2、逐个处理文档的每一个域

由于一个线程可以连续处理多个文档,而在普通的应用中,几乎每篇文档的域都是大致相同的,为每篇文档的每个域都创建一个处理对象非常低效,因而考虑到复用域处理对象DocFieldProcessorPerField,对于每一个域都有一个此对象。

那当来到一个新的域的时候,如何更快的找到此域的处理对象呢?Lucene创建了一个DocFieldProcessorPerField[] fieldHash哈希表来方便更快查找域对应的处理对象。

当处理各个域的时候,按什么顺序呢?其实是按照域名的字典顺序。因而Lucene创建了DocFieldProcessorPerField[] fields的数组来方便按顺序处理域。

因而一个域的处理对象被放在了两个地方。

对于域的处理过程如下:

https://www.cnblogs.com/forfuture1978/archive/2010/02/02/1661440.html

⚠️ **GitHub.com Fallback** ⚠️