1. Project Clover database Sat Feb 2 2019 06:45:20 CET
  2. Package org.xwiki.job.internal

File DefaultJobExecutor.java

 

Coverage histogram

../../../../img/srcFileCovDistChart10.png
0% of files have more coverage

Code metrics

32
88
16
3
339
230
33
0.38
5.5
5.33
2.06

Classes

Class Line # Actions
DefaultJobExecutor 63 54 0% 21 4
0.952941295.3%
DefaultJobExecutor.JobGroupExecutor 65 24 0% 7 2
0.941176594.1%
DefaultJobExecutor.JobThreadExecutor 135 10 0% 5 0
1.0100%
 

Contributing tests

This file is covered by 128 tests. .

Source view

1    /*
2    * See the NOTICE file distributed with this work for additional
3    * information regarding copyright ownership.
4    *
5    * This is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU Lesser General Public License as
7    * published by the Free Software Foundation; either version 2.1 of
8    * the License, or (at your option) any later version.
9    *
10    * This software is distributed in the hope that it will be useful,
11    * but WITHOUT ANY WARRANTY; without even the implied warranty of
12    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13    * Lesser General Public License for more details.
14    *
15    * You should have received a copy of the GNU Lesser General Public
16    * License along with this software; if not, write to the Free
17    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19    */
20    package org.xwiki.job.internal;
21   
22    import java.util.List;
23    import java.util.Map;
24    import java.util.Queue;
25    import java.util.concurrent.BlockingQueue;
26    import java.util.concurrent.ConcurrentHashMap;
27    import java.util.concurrent.ConcurrentLinkedQueue;
28    import java.util.concurrent.Executors;
29    import java.util.concurrent.LinkedBlockingQueue;
30    import java.util.concurrent.RejectedExecutionException;
31    import java.util.concurrent.SynchronousQueue;
32    import java.util.concurrent.ThreadFactory;
33    import java.util.concurrent.ThreadPoolExecutor;
34    import java.util.concurrent.TimeUnit;
35   
36    import javax.inject.Inject;
37    import javax.inject.Named;
38    import javax.inject.Provider;
39    import javax.inject.Singleton;
40   
41    import org.xwiki.component.annotation.Component;
42    import org.xwiki.component.manager.ComponentLifecycleException;
43    import org.xwiki.component.manager.ComponentLookupException;
44    import org.xwiki.component.manager.ComponentManager;
45    import org.xwiki.component.phase.Disposable;
46    import org.xwiki.component.phase.Initializable;
47    import org.xwiki.component.phase.InitializationException;
48    import org.xwiki.job.GroupedJob;
49    import org.xwiki.job.Job;
50    import org.xwiki.job.JobException;
51    import org.xwiki.job.JobExecutor;
52    import org.xwiki.job.JobGroupPath;
53    import org.xwiki.job.Request;
54   
55    /**
56    * Default implementation of {@link JobExecutor}.
57    *
58    * @version $Id: 5636dfe24d0adf16b32fd90d2f13a917f225fe4a $
59    * @since 6.1M2
60    */
61    @Component
62    @Singleton
 
63    public class DefaultJobExecutor implements JobExecutor, Initializable, Disposable
64    {
 
65    private class JobGroupExecutor extends JobThreadExecutor implements ThreadFactory
66    {
67    private final ThreadFactory threadFactory = Executors.defaultThreadFactory();
68   
69    private final JobGroupPath path;
70   
71    private Job currentJob;
72   
73    private String groupThreadName;
74   
 
75  258 toggle JobGroupExecutor(JobGroupPath path)
76    {
77  258 super(1, 36000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
78   
79  258 setThreadFactory(this);
80   
81  258 this.path = path;
82  258 this.groupThreadName = this.path + " job group daemon thread";
83    }
84   
 
85  426 toggle @Override
86    protected void beforeExecute(Thread t, Runnable r)
87    {
88  426 DefaultJobExecutor.this.lockTree.lock(this.path);
89   
90  426 this.currentJob = (Job) r;
91   
92  426 Thread.currentThread().setName(this.groupThreadName + " - " + this.currentJob);
93   
94  426 super.beforeExecute(t, r);
95    }
96   
 
97  426 toggle @Override
98    protected void afterExecute(Runnable r, Throwable t)
99    {
100  426 Thread.currentThread().setName(this.groupThreadName);
101   
102  426 DefaultJobExecutor.this.lockTree.unlock(this.path);
103   
104  426 this.currentJob = null;
105   
106  426 super.afterExecute(r, t);
107   
108  426 Job job = (Job) r;
109   
110  426 List<String> jobId = job.getRequest().getId();
111  426 if (jobId != null) {
112  222 synchronized (DefaultJobExecutor.this.groupedJobs) {
113  222 Queue<Job> jobQueue = DefaultJobExecutor.this.groupedJobs.get(jobId);
114  222 if (jobQueue != null) {
115  222 if (jobQueue.peek() == job) {
116  222 jobQueue.poll();
117    }
118    }
119    }
120    }
121    }
122   
 
123  282 toggle @Override
124    public Thread newThread(Runnable r)
125    {
126  282 Thread thread = this.threadFactory.newThread(r);
127   
128  282 thread.setDaemon(true);
129  282 thread.setName(this.groupThreadName);
130   
131  282 return thread;
132    }
133    }
134   
 
135    private class JobThreadExecutor extends ThreadPoolExecutor
136    {
 
137  463 toggle JobThreadExecutor(int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
138    {
139  463 super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
140    }
141   
 
142  21670 toggle @Override
143    protected void beforeExecute(Thread t, Runnable r)
144    {
145    // Set a custom thread name corresponding to the job to make debugging easier
146  21671 Thread.currentThread().setName(r.toString());
147    }
148   
 
149  21670 toggle @Override
150    protected void afterExecute(Runnable r, Throwable t)
151    {
152  21670 Job job = (Job) r;
153   
154  21670 List<String> jobId = job.getRequest().getId();
155  21668 if (jobId != null) {
156  21464 synchronized (DefaultJobExecutor.this.jobs) {
157  21467 Job storedJob = DefaultJobExecutor.this.jobs.get(jobId);
158  21467 if (storedJob == job) {
159  21245 DefaultJobExecutor.this.jobs.remove(jobId);
160    }
161    }
162    }
163   
164    // Reset thread name since it's not used anymore
165  21671 Thread.currentThread().setName("Unused job pool thread");
166    }
167    }
168   
169    /**
170    * Used to lookup {@link Job} implementations.
171    */
172    @Inject
173    @Named("context")
174    private Provider<ComponentManager> componentManager;
175   
176    private final Map<List<String>, Queue<Job>> groupedJobs = new ConcurrentHashMap<List<String>, Queue<Job>>();
177   
178    private final Map<List<String>, Job> jobs = new ConcurrentHashMap<List<String>, Job>();
179   
180    /**
181    * Handle care of hierarchical locking for grouped jobs.
182    */
183    private final JobGroupPathLockTree lockTree = new JobGroupPathLockTree();
184   
185    /**
186    * Map<groupname, group executor>.
187    */
188    private final Map<JobGroupPath, JobGroupExecutor> groupExecutors =
189    new ConcurrentHashMap<JobGroupPath, JobGroupExecutor>();
190   
191    /**
192    * Execute non grouped jobs.
193    */
194    private JobThreadExecutor jobExecutor;
195   
196    private volatile boolean disposed;
197   
 
198  205 toggle @Override
199    public void initialize() throws InitializationException
200    {
201  205 this.jobExecutor =
202    new JobThreadExecutor(Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
203    }
204   
 
205  196 toggle @Override
206    public void dispose() throws ComponentLifecycleException
207    {
208  196 synchronized (this) {
209  196 this.disposed = true;
210   
211  196 this.jobExecutor.shutdownNow();
212  196 for (JobGroupExecutor executor : this.groupExecutors.values()) {
213  228 executor.shutdownNow();
214    }
215    }
216    }
217   
218    // JobManager
219   
 
220  54 toggle @Override
221    public Job getCurrentJob(JobGroupPath path)
222    {
223  54 JobGroupExecutor executor = this.groupExecutors.get(path);
224   
225  54 return executor != null ? executor.currentJob : null;
226    }
227   
 
228  62007 toggle @Override
229    public Job getJob(List<String> id)
230    {
231    // Is it a standalone job
232  62010 Job job = this.jobs.get(id);
233  62010 if (job != null) {
234  13 return job;
235    }
236   
237    // Is it in a group
238  61998 Queue<Job> jobQueue = this.groupedJobs.get(id);
239  61998 if (jobQueue != null) {
240  1518 job = jobQueue.peek();
241  1518 if (job != null) {
242  542 return job;
243    }
244    }
245   
246  61454 return null;
247    }
248   
249    /**
250    * @param jobType the job id
251    * @param request the request
252    * @return a new job
253    * @throws JobException failed to create a job for the provided type
254    */
 
255  21667 toggle private Job createJob(String jobType, Request request) throws JobException
256    {
257  21667 Job job;
258  21667 try {
259  21667 job = this.componentManager.get().getInstance(Job.class, jobType);
260    } catch (ComponentLookupException e) {
261  0 throw new JobException("Failed to lookup any Job for role hint [" + jobType + "]", e);
262    }
263   
264  21667 job.initialize(request);
265   
266  21666 return job;
267    }
268   
 
269  21667 toggle @Override
270    public Job execute(String jobType, Request request) throws JobException
271    {
272  21667 Job job = createJob(jobType, request);
273   
274  21666 execute(job);
275   
276  21667 return job;
277    }
278   
 
279  21670 toggle @Override
280    public void execute(Job job)
281    {
282  21670 if (!this.disposed) {
283  21670 if (job instanceof GroupedJob) {
284  442 executeGroupedJob((GroupedJob) job);
285    } else {
286  21228 executeSingleJob(job);
287    }
288    } else {
289  0 throw new RejectedExecutionException("The job executor is disposed");
290    }
291    }
292   
 
293  21244 toggle private void executeSingleJob(Job job)
294    {
295  21244 this.jobExecutor.execute(job);
296   
297  21245 List<String> jobId = job.getRequest().getId();
298  21245 if (jobId != null) {
299  21245 synchronized (this.jobs) {
300  21245 this.jobs.put(jobId, job);
301    }
302    }
303    }
304   
 
305  442 toggle private void executeGroupedJob(GroupedJob job)
306    {
307  442 synchronized (this.groupExecutors) {
308  442 JobGroupPath path = job.getGroupPath();
309   
310    // If path is null execute as non grouped job
311  442 if (path == null) {
312  16 executeSingleJob(job);
313   
314  16 return;
315    }
316   
317  426 JobGroupExecutor groupExecutor = this.groupExecutors.get(path);
318   
319  426 if (groupExecutor == null) {
320  258 groupExecutor = new JobGroupExecutor(path);
321  258 this.groupExecutors.put(path, groupExecutor);
322    }
323   
324  426 groupExecutor.execute(job);
325   
326  426 List<String> jobId = job.getRequest().getId();
327  426 if (jobId != null) {
328  222 synchronized (this.groupedJobs) {
329  222 Queue<Job> jobQueue = this.groupedJobs.get(jobId);
330  222 if (jobQueue == null) {
331  191 jobQueue = new ConcurrentLinkedQueue<Job>();
332  191 this.groupedJobs.put(jobId, jobQueue);
333    }
334  222 jobQueue.offer(job);
335    }
336    }
337    }
338    }
339    }