1. Project Clover database Tue Dec 20 2016 21:24:09 CET
  2. Package org.xwiki.search.solr.internal

File DefaultSolrIndexer.java

 

Coverage histogram

../../../../../img/srcFileCovDistChart8.png
54% of files have more coverage

Code metrics

38
131
18
4
572
320
49
0.37
7.28
4.5
2.72

Classes

Class Line # Actions
DefaultSolrIndexer 76 84 0% 33 18
0.8536585685.4%
DefaultSolrIndexer.IndexQueueEntry 83 19 0% 6 16
0.2727272827.3%
DefaultSolrIndexer.ResolveQueueEntry 149 3 0% 1 0
1.0100%
DefaultSolrIndexer.Resolver 184 25 0% 9 7
0.8157894681.6%
 

Contributing tests

No tests hitting this source file were found.

Source view

1    /*
2    * See the NOTICE file distributed with this work for additional
3    * information regarding copyright ownership.
4    *
5    * This is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU Lesser General Public License as
7    * published by the Free Software Foundation; either version 2.1 of
8    * the License, or (at your option) any later version.
9    *
10    * This software is distributed in the hope that it will be useful,
11    * but WITHOUT ANY WARRANTY; without even the implied warranty of
12    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13    * Lesser General Public License for more details.
14    *
15    * You should have received a copy of the GNU Lesser General Public
16    * License along with this software; if not, write to the Free
17    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19    */
20    package org.xwiki.search.solr.internal;
21   
22    import java.util.Arrays;
23    import java.util.concurrent.BlockingQueue;
24    import java.util.concurrent.LinkedBlockingQueue;
25   
26    import javax.inject.Inject;
27    import javax.inject.Provider;
28    import javax.inject.Singleton;
29   
30    import org.apache.solr.common.SolrInputDocument;
31    import org.slf4j.Logger;
32    import org.xwiki.component.annotation.Component;
33    import org.xwiki.component.annotation.DisposePriority;
34    import org.xwiki.component.manager.ComponentLifecycleException;
35    import org.xwiki.component.manager.ComponentLookupException;
36    import org.xwiki.component.manager.ComponentManager;
37    import org.xwiki.component.phase.Disposable;
38    import org.xwiki.component.phase.Initializable;
39    import org.xwiki.component.phase.InitializationException;
40    import org.xwiki.context.Execution;
41    import org.xwiki.context.ExecutionContext;
42    import org.xwiki.context.ExecutionContextException;
43    import org.xwiki.context.ExecutionContextManager;
44    import org.xwiki.job.JobException;
45    import org.xwiki.job.JobExecutor;
46    import org.xwiki.model.EntityType;
47    import org.xwiki.model.reference.EntityReference;
48    import org.xwiki.search.solr.internal.api.SolrConfiguration;
49    import org.xwiki.search.solr.internal.api.SolrIndexer;
50    import org.xwiki.search.solr.internal.api.SolrIndexerException;
51    import org.xwiki.search.solr.internal.api.SolrInstance;
52    import org.xwiki.search.solr.internal.job.IndexerJob;
53    import org.xwiki.search.solr.internal.job.IndexerRequest;
54    import org.xwiki.search.solr.internal.metadata.LengthSolrInputDocument;
55    import org.xwiki.search.solr.internal.metadata.SolrMetadataExtractor;
56    import org.xwiki.search.solr.internal.reference.SolrReferenceResolver;
57   
58    import com.xpn.xwiki.util.AbstractXWikiRunnable;
59   
60    /**
61    * Default implementation of {@link SolrIndexer}.
62    * <p>
63    * This implementation does not directly process the given leaf-references, but adds them to a processing queue, in the
64    * order they were received. The {@link Runnable} part of this implementation is the one that sequentially reads and
65    * processes the queue.
66    *
67    * @version $Id: c54022bd64bcbfa613d96438a2112dda8c5f56d5 $
68    * @since 5.1M2
69    */
70    @Component
71    @Singleton
72    // We start the disposal a bit earlier because we want the resolver & indexer threads to finish before the Solr client
73    // is shutdown. We can't stop the threads immediately because the resolve & index queues may have entries that are being
74    // processed.
75    @DisposePriority(500)
 
76    public class DefaultSolrIndexer implements SolrIndexer, Initializable, Disposable, Runnable
77    {
78    /**
79    * Index queue entry.
80    *
81    * @version $Id: c54022bd64bcbfa613d96438a2112dda8c5f56d5 $
82    */
 
83    private static class IndexQueueEntry
84    {
85    /**
86    * The reference of the entity to index.
87    */
88    public EntityReference reference;
89   
90    /**
91    * The query used to filter entries to delete.
92    */
93    public String deleteQuery;
94   
95    /**
96    * The indexing operation to perform.
97    */
98    public IndexOperation operation;
99   
100    /**
101    * @param indexReference the reference of the entity to index.
102    * @param operation the indexing operation to perform.
103    */
 
104  4469 toggle public IndexQueueEntry(EntityReference indexReference, IndexOperation operation)
105    {
106  4469 this.reference = indexReference;
107  4469 this.operation = operation;
108    }
109   
110    /**
111    * @param deleteQuery the query used to filter entries to delete.
112    * @param operation the indexing operation to perform.
113    */
 
114  3 toggle public IndexQueueEntry(String deleteQuery, IndexOperation operation)
115    {
116  3 this.deleteQuery = deleteQuery;
117  3 this.operation = operation;
118    }
119   
 
120  0 toggle @Override
121    public String toString()
122    {
123  0 String str;
124   
125  0 switch (operation) {
126  0 case INDEX:
127  0 str = "INDEX " + this.reference;
128  0 break;
129  0 case DELETE:
130  0 str = "DELETE " + this.deleteQuery;
131  0 break;
132  0 case STOP:
133  0 str = "STOP";
134  0 break;
135  0 default:
136  0 str = "";
137  0 break;
138    }
139   
140  0 return str;
141    }
142    }
143   
144    /**
145    * Resolve queue entry.
146    *
147    * @version $Id: c54022bd64bcbfa613d96438a2112dda8c5f56d5 $
148    */
 
149    private static class ResolveQueueEntry
150    {
151    /**
152    * The reference of the entity to index.
153    */
154    public EntityReference reference;
155   
156    /**
157    * Also apply operation to reference children.
158    */
159    public boolean recurse;
160   
161    /**
162    * The indexing operation to perform.
163    */
164    public IndexOperation operation;
165   
166    /**
167    * @param reference the reference of the entity to index.
168    * @param recurse also apply operation to reference children.
169    * @param operation the indexing operation to perform.
170    */
 
171  1972 toggle public ResolveQueueEntry(EntityReference reference, boolean recurse, IndexOperation operation)
172    {
173  1972 this.reference = reference;
174  1972 this.recurse = recurse;
175  1972 this.operation = operation;
176    }
177    }
178   
179    /**
180    * Extract children references from passed references and dispatch them to the index queue.
181    *
182    * @version $Id: c54022bd64bcbfa613d96438a2112dda8c5f56d5 $
183    */
 
184    private class Resolver extends AbstractXWikiRunnable
185    {
 
186  3 toggle @Override
187    public void runInternal()
188    {
189  3 logger.debug("Start SOLR resolver thread");
190   
191  1972 while (!Thread.interrupted()) {
192  1972 ResolveQueueEntry queueEntry;
193  1972 try {
194  1972 queueEntry = resolveQueue.take();
195    } catch (InterruptedException e) {
196  0 logger.warn("The SOLR resolve thread has been interrupted", e);
197  0 queueEntry = RESOLVE_QUEUE_ENTRY_STOP;
198    }
199   
200  1972 if (queueEntry == RESOLVE_QUEUE_ENTRY_STOP) {
201    // Stop the index thread: clear the queue and send the stop signal without blocking.
202  3 indexQueue.clear();
203  3 indexQueue.offer(INDEX_QUEUE_ENTRY_STOP);
204  3 break;
205    }
206   
207  1969 try {
208  1969 if (queueEntry.operation == IndexOperation.INDEX) {
209  1536 Iterable<EntityReference> references;
210  1536 if (queueEntry.recurse) {
211  482 references = solrRefereceResolver.getReferences(queueEntry.reference);
212    } else {
213  1054 references = Arrays.asList(queueEntry.reference);
214    }
215   
216  1536 for (EntityReference reference : references) {
217  4036 indexQueue.put(new IndexQueueEntry(reference, queueEntry.operation));
218    }
219    } else {
220  433 if (queueEntry.recurse) {
221  0 indexQueue.put(new IndexQueueEntry(solrRefereceResolver.getQuery(queueEntry.reference),
222    queueEntry.operation));
223  433 } else if (queueEntry.reference != null) {
224  433 indexQueue.put(new IndexQueueEntry(queueEntry.reference, queueEntry.operation));
225    }
226    }
227    } catch (Throwable e) {
228  0 logger.warn("Failed to apply operation [{}] on root reference [{}]", queueEntry.operation,
229    queueEntry.reference, e);
230    }
231    }
232   
233  3 logger.debug("Stop SOLR resolver thread");
234    }
235    }
236   
237    /**
238    * Stop resolver thread.
239    */
240    private static final ResolveQueueEntry RESOLVE_QUEUE_ENTRY_STOP =
241    new ResolveQueueEntry(null, false, IndexOperation.STOP);
242   
243    /**
244    * Stop indexer thread.
245    */
246    private static final IndexQueueEntry INDEX_QUEUE_ENTRY_STOP =
247    new IndexQueueEntry((String) null, IndexOperation.STOP);
248   
249    /**
250    * Logging framework.
251    */
252    @Inject
253    private Logger logger;
254   
255    /**
256    * Component manager used to get metadata extractors.
257    */
258    @Inject
259    private ComponentManager componentManager;
260   
261    /**
262    * The Solr configuration source.
263    */
264    @Inject
265    private SolrConfiguration configuration;
266   
267    /**
268    * Communication with the Solr instance.
269    */
270    @Inject
271    private Provider<SolrInstance> solrInstanceProvider;
272   
273    /**
274    * Extract contained indexable references.
275    */
276    @Inject
277    private SolrReferenceResolver solrRefereceResolver;
278   
279    @Inject
280    private Execution execution;
281   
282    @Inject
283    private ExecutionContextManager ecim;
284   
285    @Inject
286    private JobExecutor jobs;
287   
288    /**
289    * The queue of index operation to perform.
290    */
291    private BlockingQueue<IndexQueueEntry> indexQueue;
292   
293    /**
294    * The queue of resolve references and add them to the index queue.
295    */
296    private BlockingQueue<ResolveQueueEntry> resolveQueue;
297   
298    /**
299    * Thread in which the indexUpdater will be executed.
300    */
301    private Thread indexThread;
302   
303    /**
304    * Thread in which the provided references children will be resolved.
305    */
306    private Thread resolveThread;
307   
308    /**
309    * Indicate of the component has been disposed.
310    */
311    private boolean disposed;
312   
313    /**
314    * The size of the not yet sent batch.
315    */
316    private volatile int batchSize;
317   
 
318  3 toggle @Override
319    public void initialize() throws InitializationException
320    {
321    // Initialize the queues before starting the threads.
322  3 this.resolveQueue = new LinkedBlockingQueue<>();
323  3 this.indexQueue = new LinkedBlockingQueue<>(this.configuration.getIndexerQueueCapacity());
324   
325    // Launch the resolve thread
326  3 this.resolveThread = new Thread(new Resolver());
327  3 this.resolveThread.setName("XWiki Solr resolve thread");
328  3 this.resolveThread.setDaemon(true);
329  3 this.resolveThread.start();
330  3 this.resolveThread.setPriority(Thread.NORM_PRIORITY - 1);
331   
332    // Launch the index thread
333  3 this.indexThread = new Thread(this);
334  3 this.indexThread.setName("XWiki Solr index thread");
335  3 this.indexThread.setDaemon(true);
336  3 this.indexThread.start();
337  3 this.indexThread.setPriority(Thread.NORM_PRIORITY - 1);
338    }
339   
 
340  3 toggle @Override
341    public void dispose() throws ComponentLifecycleException
342    {
343    // Mark the component as disposed
344  3 this.disposed = true;
345   
346    // Stop the resolve thread. Clear the queue and send the stop signal without blocking. We know that the resolve
347    // queue will remain empty after the clear call because we set the disposed flag above.
348  3 this.resolveQueue.clear();
349  3 this.resolveQueue.offer(RESOLVE_QUEUE_ENTRY_STOP);
350   
351    // Stop the index thread. Clear the queue and send the stop signal without blocking. There should be enough
352    // space in the index queue before the special stop entry is added as long the the index queue capacity is
353    // greater than 1. In the worse case, the clear call will unblock the resolve thread (which was waiting because
354    // the index queue was full) and just one entry will be added to the queue before the special stop entry.
355  3 this.indexQueue.clear();
356  3 this.indexQueue.offer(INDEX_QUEUE_ENTRY_STOP);
357    }
358   
 
359  3 toggle @Override
360    public void run()
361    {
362  3 this.logger.debug("Start SOLR indexer thread");
363   
364  96 while (!Thread.interrupted()) {
365    // Block until there is at least one entry in the queue
366  96 IndexQueueEntry queueEntry = null;
367  96 try {
368  96 queueEntry = this.indexQueue.take();
369    } catch (InterruptedException e) {
370  0 this.logger.warn("The SOLR index thread has been interrupted", e);
371   
372  0 queueEntry = INDEX_QUEUE_ENTRY_STOP;
373    }
374   
375    // Add to the batch until either the batch size is achieved, the queue gets emptied or the
376    // INDEX_QUEUE_ENTRY_STOP is retrieved from the queue.
377  96 if (!processBatch(queueEntry)) {
378  3 break;
379    }
380    }
381   
382  3 this.logger.debug("Stop SOLR indexer thread");
383    }
384   
385    /**
386    * Process a batch of operations that were just read from the index operations queue. This method also commits the
387    * batch when it finishes to process it.
388    *
389    * @param queueEntry the batch to process
390    * @return {@code true} to wait for another batch, {@code false} to stop the indexing thread
391    */
 
392  96 toggle private boolean processBatch(IndexQueueEntry queueEntry)
393    {
394  96 SolrInstance solrInstance = this.solrInstanceProvider.get();
395   
396  96 int length = 0;
397   
398  4565 for (IndexQueueEntry batchEntry = queueEntry; batchEntry != null; batchEntry = this.indexQueue.poll()) {
399  4472 if (batchEntry == INDEX_QUEUE_ENTRY_STOP) {
400    // Discard the current batch and stop the indexing thread.
401  3 return false;
402    }
403   
404  4469 IndexOperation operation = batchEntry.operation;
405   
406    // For the current contiguous operations queue, group the changes
407  4469 try {
408  4469 this.ecim.initialize(new ExecutionContext());
409   
410  4469 if (IndexOperation.INDEX.equals(operation)) {
411  4036 LengthSolrInputDocument solrDocument = getSolrDocument(batchEntry.reference);
412  4036 if (solrDocument != null) {
413  4031 solrInstance.add(solrDocument);
414  4031 length += solrDocument.getLength();
415  4031 ++this.batchSize;
416    }
417  433 } else if (IndexOperation.DELETE.equals(operation)) {
418  433 if (batchEntry.reference == null) {
419  0 solrInstance.deleteByQuery(batchEntry.deleteQuery);
420    } else {
421  433 solrInstance.delete(this.solrRefereceResolver.getId(batchEntry.reference));
422    }
423   
424  433 ++this.batchSize;
425    }
426    } catch (Throwable e) {
427  0 this.logger.error("Failed to process entry [{}]", batchEntry, e);
428    } finally {
429  4469 this.execution.removeContext();
430    }
431   
432    // Commit the index changes so that they become available to queries. This is a costly operation and that is
433    // the reason why we perform it at the end of the batch.
434  4469 if (shouldCommit(length, this.batchSize)) {
435  235 commit();
436  235 length = 0;
437    }
438    }
439   
440    // Commit what's left
441  93 if (this.batchSize > 0) {
442  93 commit();
443    }
444   
445  93 return true;
446    }
447   
448    /**
449    * Commit.
450    */
 
451  328 toggle private void commit()
452    {
453  328 SolrInstance solrInstance = this.solrInstanceProvider.get();
454   
455  328 try {
456  328 solrInstance.commit();
457    } catch (Exception e) {
458  0 this.logger.error("Failed to commit index changes to the Solr server. Rolling back.", e);
459   
460  0 try {
461  0 solrInstance.rollback();
462    } catch (Exception ex) {
463    // Just log the failure.
464  0 this.logger.error("Failed to rollback index changes.", ex);
465    }
466    }
467   
468  328 this.batchSize = 0;
469    }
470   
471    /**
472    * Check various constraints to know if the batch should be committed.
473    *
474    * @param length the current length
475    * @param size the current size
476    * @return true if the batch should be sent
477    */
 
478  4469 toggle private boolean shouldCommit(int length, int size)
479    {
480    // If the length is above the configured maximum
481  4469 if (length >= this.configuration.getIndexerBatchMaxLengh()) {
482  235 return true;
483    }
484   
485    // If the size is above the configured maximum
486  4234 return size >= this.configuration.getIndexerBatchSize();
487    }
488   
489    /**
490    * @param reference the reference to extract metadata from.
491    * @return the {@link SolrInputDocument} containing extracted metadata from the passed reference; {@code null} if
492    * the reference type is not supported.
493    * @throws SolrIndexerException if problems occur.
494    * @throws IllegalArgumentException if there is an incompatibility between a reference and the assigned extractor.
495    * @throws ExecutionContextException
496    */
 
497  4036 toggle private LengthSolrInputDocument getSolrDocument(EntityReference reference)
498    throws SolrIndexerException, IllegalArgumentException, ExecutionContextException
499    {
500  4036 SolrMetadataExtractor metadataExtractor = getMetadataExtractor(reference.getType());
501   
502    // If the entity type is supported, use the extractor to get the SolrInputDocuent.
503  4036 if (metadataExtractor != null) {
504  4036 return metadataExtractor.getSolrDocument(reference);
505    }
506   
507  0 return null;
508    }
509   
510    /**
511    * @param entityType the entity type
512    * @return the metadata extractor that is registered for the specified type or {@code null} if none exists.
513    */
 
514  4036 toggle private SolrMetadataExtractor getMetadataExtractor(EntityType entityType)
515    {
516  4036 SolrMetadataExtractor result = null;
517  4036 try {
518  4036 result = this.componentManager.getInstance(SolrMetadataExtractor.class, entityType.name().toLowerCase());
519    } catch (ComponentLookupException e) {
520  0 this.logger.warn("Unsupported entity type: [{}]", entityType.toString(), e);
521    }
522   
523  4036 return result;
524    }
525   
 
526  1536 toggle @Override
527    public void index(EntityReference reference, boolean recurse)
528    {
529  1536 addToQueue(reference, recurse, IndexOperation.INDEX);
530    }
531   
 
532  433 toggle @Override
533    public void delete(EntityReference reference, boolean recurse)
534    {
535  433 addToQueue(reference, recurse, IndexOperation.DELETE);
536    }
537   
538    /**
539    * Add a list of references to the index queue, all having the same operation.
540    *
541    * @param reference the references to add
542    * @param recurse also apply operation to children
543    * @param operation the operation to assign to the given references
544    */
 
545  1969 toggle private void addToQueue(EntityReference reference, boolean recurse, IndexOperation operation)
546    {
547  1969 if (!this.disposed) {
548    // Don't block because the capacity of the resolver queue is not limited.
549  1969 try {
550  1969 this.resolveQueue.put(new ResolveQueueEntry(reference, recurse, operation));
551    } catch (InterruptedException e) {
552  0 this.logger.error("Failed to add reference [{}] to Solr indexing queue", reference, e);
553    }
554    }
555    }
556   
 
557  4821 toggle @Override
558    public int getQueueSize()
559    {
560  4821 return this.indexQueue.size() + this.resolveQueue.size() + this.batchSize;
561    }
562   
 
563  3 toggle @Override
564    public IndexerJob startIndex(IndexerRequest request) throws SolrIndexerException
565    {
566  3 try {
567  3 return (IndexerJob) this.jobs.execute(IndexerJob.JOBTYPE, request);
568    } catch (JobException e) {
569  0 throw new SolrIndexerException("Failed to start index job", e);
570    }
571    }
572    }