1. Project Clover database Tue Dec 20 2016 21:24:09 CET
  2. Package org.xwiki.watchlist.internal

File DefaultWatchListNotificationCache.java

 

Coverage histogram

../../../../img/srcFileCovDistChart8.png
54% of files have more coverage

Code metrics

14
95
11
1
370
221
22
0.23
8.64
11
2

Classes

Class Line # Actions
DefaultWatchListNotificationCache 57 95 0% 22 28
0.7666666576.7%
 

Contributing tests

No tests hitting this source file were found.

Source view

1    /*
2    * See the NOTICE file distributed with this work for additional
3    * information regarding copyright ownership.
4    *
5    * This is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU Lesser General Public License as
7    * published by the Free Software Foundation; either version 2.1 of
8    * the License, or (at your option) any later version.
9    *
10    * This software is distributed in the hope that it will be useful,
11    * but WITHOUT ANY WARRANTY; without even the implied warranty of
12    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13    * Lesser General Public License for more details.
14    *
15    * You should have received a copy of the GNU Lesser General Public
16    * License along with this software; if not, write to the Free
17    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19    */
20    package org.xwiki.watchlist.internal;
21   
22    import java.util.ArrayList;
23    import java.util.Collection;
24    import java.util.Collections;
25    import java.util.HashMap;
26    import java.util.HashSet;
27    import java.util.Iterator;
28    import java.util.List;
29    import java.util.Map;
30    import java.util.Set;
31    import java.util.concurrent.locks.ReentrantReadWriteLock;
32   
33    import javax.inject.Inject;
34    import javax.inject.Named;
35    import javax.inject.Provider;
36    import javax.inject.Singleton;
37   
38    import org.slf4j.Logger;
39    import org.xwiki.component.annotation.Component;
40    import org.xwiki.component.phase.Initializable;
41    import org.xwiki.component.phase.InitializationException;
42    import org.xwiki.configuration.ConfigurationSource;
43    import org.xwiki.query.Query;
44    import org.xwiki.query.QueryManager;
45    import org.xwiki.watchlist.internal.documents.WatchListClassDocumentInitializer;
46    import org.xwiki.wiki.descriptor.WikiDescriptorManager;
47   
48    import com.xpn.xwiki.XWikiContext;
49   
50    /**
51    * Default implementation for {@link WatchListNotificationCache}.
52    *
53    * @version $Id: a0b4dd88e03ffe0ced2843d6c4c72d54c8b5899c $
54    */
55    @Component
56    @Singleton
 
57    public class DefaultWatchListNotificationCache implements WatchListNotificationCache, Initializable
58    {
59    /**
60    * The realtime interval ID.
61    */
62    public static final String REALTIME_INTERVAL_ID = "realtime";
63   
64    /**
65    * Context provider.
66    */
67    @Inject
68    private Provider<XWikiContext> contextProvider;
69   
70    /**
71    * Used to list the existing wikis.
72    */
73    @Inject
74    private WikiDescriptorManager wikiDescriptorManager;
75   
76    /**
77    * Used to search for subscribers.
78    */
79    @Inject
80    private QueryManager queryManager;
81   
82    /**
83    * Logging helper object.
84    */
85    @Inject
86    private Logger logger;
87   
88    /**
89    * Map of subscribers in the wiki farm.
90    */
91    private Map<String, Set<String>> intervalToSubscribersMap = new HashMap<>();
92   
93    /**
94    * Watchlist notification intervals.
95    */
96    private List<String> intervals;
97   
98    /**
99    * Lock for the subscribers map.
100    */
101    private ReentrantReadWriteLock subscribersLock = new ReentrantReadWriteLock();
102   
103    /**
104    * Lock for the intervals list.
105    */
106    private ReentrantReadWriteLock intervalsLock = new ReentrantReadWriteLock();
107   
108    /**
109    * Used to access xwiki.properties.
110    */
111    @Inject
112    @Named("xwikiproperties")
113    private ConfigurationSource xwikiProperties;
114   
115    /**
116    * Init watchlist store. Get all the intervals/jobs present in the wiki. Create the list of subscribers.
117    *
118    * @throws InitializationException if problems occur
119    */
 
120  1 toggle @Override
121    public void initialize() throws InitializationException
122    {
123  1 XWikiContext context = contextProvider.get();
124   
125    // Initialize the intervals cache.
126  1 try {
127  1 intervals = new ArrayList<String>();
128   
129  1 if (xwikiProperties.getProperty("watchlist.realtime.enabled", false)) {
130    // If the realtime notification feature is explicitly enabled (temporarily disabled by default), then
131    // propose/use it as possible notification interval option.
132  1 intervals.add(REALTIME_INTERVAL_ID);
133    }
134   
135    // Get all the watchlist job documents from the main wiki.
136  1 Query jobDocumentsQuery = queryManager.getNamedQuery("getWatchlistJobDocuments");
137    // Make double sure we run the query on the main wiki, since that is where the jobs are defined.
138  1 jobDocumentsQuery.setWiki(context.getWikiId());
139  1 List<String> jobDocumentNames = jobDocumentsQuery.execute();
140   
141    // TODO: Sort them by cron expression.
142   
143    // Add them to the list of intervals.
144  1 intervals.addAll(jobDocumentNames);
145    } catch (Exception e) {
146  0 throw new InitializationException("Failed to initialize the cache of watchlist intervals.", e);
147    }
148   
149    // Initialize the subscribers cache.
150  1 for (String jobDocumentName : intervals) {
151  2 initSubscribersCache(jobDocumentName);
152    }
153    }
154   
155    /**
156    * Retrieves all the users from all the wikis with a WatchList object in their profile.
157    *
158    * @param intervalId name of the interval to init the cache for
159    * @param context the XWiki context
160    */
 
161  5 toggle private void initSubscribersCache(String intervalId)
162    {
163    // init subscribers cache
164  5 List<Object> queryParams = new ArrayList<Object>();
165  5 queryParams.add(WatchListClassDocumentInitializer.DOCUMENT_FULL_NAME);
166  5 queryParams.add(intervalId);
167  5 queryParams.add(DefaultWatchListStore.USERS_CLASS);
168   
169  5 Set<String> subscribersForJob =
170    globalSearchDocuments(", BaseObject as obj, StringProperty as prop, BaseObject as userobj where"
171    + " doc.fullName=obj.name and obj.className=? and obj.id=prop.id.id and prop.value=?"
172    + " and doc.fullName=userobj.name and userobj.className=?", 0, 0, queryParams);
173   
174  5 subscribersLock.writeLock().lock();
175  5 try {
176  5 intervalToSubscribersMap.put(intervalId, subscribersForJob);
177    } finally {
178  5 subscribersLock.writeLock().unlock();
179    }
180    }
181   
182    /**
183    * Search documents on all the wikis by passing HQL where clause values as parameters.
184    *
185    * @param request The HQL where clause.
186    * @param nb Number of results to retrieve
187    * @param start Offset to use in the search query
188    * @param values The where clause values that replaces the question marks (?)
189    * @return a set of document names prefixed with the wiki they come from ex : xwiki:Main.WebHome
190    */
 
191  5 toggle private Set<String> globalSearchDocuments(String request, int nb, int start, List<Object> values)
192    {
193  5 Collection<String> wikiServers = new ArrayList<String>();
194  5 Set<String> results = new HashSet<>();
195   
196  5 try {
197  5 wikiServers = wikiDescriptorManager.getAllIds();
198    } catch (Exception e) {
199  0 logger.error("Failed to get the list of wikis", e);
200    }
201   
202  5 try {
203    // Create the query and set the common parameters.
204  5 Query query = queryManager.createQuery(request, Query.HQL);
205  5 query.setOffset(start);
206  5 query.setLimit(nb);
207  5 query.bindValues(values);
208   
209    // Run on each wiki.
210  5 for (String wiki : wikiServers) {
211  5 String wikiPrefix = wiki + DefaultWatchListStore.WIKI_SPACE_SEP;
212  5 try {
213  5 query.setWiki(wiki);
214  5 List<String> upDocsInWiki = query.execute();
215   
216    // Prefix the results with the wiki ID.
217  5 Iterator<String> it = upDocsInWiki.iterator();
218  5 while (it.hasNext()) {
219  0 results.add(wikiPrefix + it.next());
220    }
221    } catch (Exception e) {
222  0 logger.error("Failed to search in wiki [{}]", wiki, e);
223    }
224    }
225    } catch (Exception e) {
226  0 logger.error("Failed to create query", e);
227    }
228   
229  5 return results;
230    }
231   
232    /**
233    * Destroy subscribers cache for the given job.
234    *
235    * @param intervalId ID of the interval for which the cache must be destroyed
236    * @param context the XWiki context
237    */
 
238  0 toggle private void destroySubscribersCache(String intervalId)
239    {
240  0 subscribersLock.writeLock().lock();
241  0 try {
242  0 intervalToSubscribersMap.remove(intervalId);
243    } finally {
244  0 subscribersLock.writeLock().unlock();
245    }
246    }
247   
 
248  59 toggle @Override
249    public Collection<String> getSubscribers(String intervalId)
250    {
251  59 Set<String> result = null;
252   
253  59 subscribersLock.readLock().lock();
254  59 try {
255  59 result = intervalToSubscribersMap.get(intervalId);
256    } finally {
257  59 subscribersLock.readLock().unlock();
258    }
259   
260  59 if (result == null) {
261  0 return Collections.emptySet();
262    } else {
263  59 return new HashSet<>(result);
264    }
265    }
266   
 
267  8 toggle @Override
268    public boolean addSubscriber(String jobId, String user)
269    {
270  8 subscribersLock.writeLock().lock();
271  8 try {
272  8 Set<String> subscribersForJob = intervalToSubscribersMap.get(jobId);
273   
274  8 if (subscribersForJob != null) {
275  2 return subscribersForJob.add(user);
276    }
277   
278  6 return false;
279    } finally {
280  8 subscribersLock.writeLock().unlock();
281    }
282    }
283   
 
284  2 toggle @Override
285    public boolean moveSubscriber(String oldIntervalId, String newIntervalId, String user)
286    {
287    // Atomic operation.
288  2 subscribersLock.writeLock().lock();
289  2 try {
290    // We do not really care if the remove is successful (i.e. valid interval or existed in the first place).
291  2 removeSubscriber(oldIntervalId, user);
292   
293    // We mark the move operation success based on the add operation.
294  2 return addSubscriber(newIntervalId, user);
295    } finally {
296  2 subscribersLock.writeLock().unlock();
297    }
298    }
299   
 
300  4 toggle @Override
301    public boolean removeSubscriber(String intervalId, String user)
302    {
303  4 subscribersLock.writeLock().lock();
304  4 try {
305  4 Set<String> subscribersForJob = intervalToSubscribersMap.get(intervalId);
306   
307  4 if (subscribersForJob != null) {
308  1 return subscribersForJob.remove(user);
309    }
310    } finally {
311  4 subscribersLock.writeLock().unlock();
312    }
313   
314  3 return false;
315    }
316   
 
317  115 toggle @Override
318    public List<String> getIntervals()
319    {
320  115 intervalsLock.readLock().lock();
321  115 try {
322    // Do not return the internal reference, but copy it.
323  115 return new ArrayList<String>(intervals);
324    } finally {
325  115 intervalsLock.readLock().unlock();
326    }
327    }
328   
 
329  0 toggle @Override
330    public boolean removeInterval(String intervalId)
331    {
332    // Atomic operation
333  0 intervalsLock.writeLock().lock();
334  0 subscribersLock.writeLock().lock();
335  0 try {
336  0 if (intervals.remove(intervalId)) {
337  0 destroySubscribersCache(intervalId);
338   
339  0 return true;
340    }
341   
342  0 return false;
343    } finally {
344  0 subscribersLock.writeLock().unlock();
345  0 intervalsLock.writeLock().unlock();
346    }
347    }
348   
 
349  3 toggle @Override
350    public boolean addInterval(String jobDocument)
351    {
352    // Atomic operation
353  3 intervalsLock.writeLock().lock();
354  3 subscribersLock.writeLock().lock();
355  3 try {
356  3 if (intervals.add(jobDocument)) {
357  3 initSubscribersCache(jobDocument);
358   
359    // TODO: Re-sort the intervals by cron expression
360   
361  3 return true;
362    }
363   
364  0 return false;
365    } finally {
366  3 subscribersLock.writeLock().unlock();
367  3 intervalsLock.writeLock().unlock();
368    }
369    }
370    }