1. Project Clover database Tue Dec 20 2016 21:24:09 CET
  2. Package org.xwiki.job.internal

File DefaultJobExecutor.java

 

Coverage histogram

../../../../img/srcFileCovDistChart10.png
0% of files have more coverage

Code metrics

32
86
15
3
329
224
32
0.37
5.73
5
2.13

Classes

Class Line # Actions
DefaultJobExecutor 63 54 0% 21 7
0.9176470691.8%
DefaultJobExecutor.JobGroupExecutor 65 24 0% 7 2
0.941176594.1%
DefaultJobExecutor.JobThreadExecutor 135 8 0% 4 0
1.0100%
 

Contributing tests

This file is covered by 107 tests. .

Source view

1    /*
2    * See the NOTICE file distributed with this work for additional
3    * information regarding copyright ownership.
4    *
5    * This is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU Lesser General Public License as
7    * published by the Free Software Foundation; either version 2.1 of
8    * the License, or (at your option) any later version.
9    *
10    * This software is distributed in the hope that it will be useful,
11    * but WITHOUT ANY WARRANTY; without even the implied warranty of
12    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13    * Lesser General Public License for more details.
14    *
15    * You should have received a copy of the GNU Lesser General Public
16    * License along with this software; if not, write to the Free
17    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19    */
20    package org.xwiki.job.internal;
21   
22    import java.util.List;
23    import java.util.Map;
24    import java.util.Queue;
25    import java.util.concurrent.BlockingQueue;
26    import java.util.concurrent.ConcurrentHashMap;
27    import java.util.concurrent.ConcurrentLinkedQueue;
28    import java.util.concurrent.Executors;
29    import java.util.concurrent.LinkedBlockingQueue;
30    import java.util.concurrent.RejectedExecutionException;
31    import java.util.concurrent.SynchronousQueue;
32    import java.util.concurrent.ThreadFactory;
33    import java.util.concurrent.ThreadPoolExecutor;
34    import java.util.concurrent.TimeUnit;
35   
36    import javax.inject.Inject;
37    import javax.inject.Named;
38    import javax.inject.Provider;
39    import javax.inject.Singleton;
40   
41    import org.xwiki.component.annotation.Component;
42    import org.xwiki.component.manager.ComponentLifecycleException;
43    import org.xwiki.component.manager.ComponentLookupException;
44    import org.xwiki.component.manager.ComponentManager;
45    import org.xwiki.component.phase.Disposable;
46    import org.xwiki.component.phase.Initializable;
47    import org.xwiki.component.phase.InitializationException;
48    import org.xwiki.job.GroupedJob;
49    import org.xwiki.job.Job;
50    import org.xwiki.job.JobException;
51    import org.xwiki.job.JobExecutor;
52    import org.xwiki.job.JobGroupPath;
53    import org.xwiki.job.Request;
54   
55    /**
56    * Default implementation of {@link JobExecutor}.
57    *
58    * @version $Id: 4145ceab1e59d6346961af4e321e82926ee65632 $
59    * @since 6.1M2
60    */
61    @Component
62    @Singleton
 
63    public class DefaultJobExecutor implements JobExecutor, Initializable, Disposable
64    {
 
65    private class JobGroupExecutor extends JobThreadExecutor implements ThreadFactory
66    {
67    private final ThreadFactory threadFactory = Executors.defaultThreadFactory();
68   
69    private final JobGroupPath path;
70   
71    private Job currentJob;
72   
73    private String groupThreadName;
74   
 
75  165 toggle JobGroupExecutor(JobGroupPath path)
76    {
77  165 super(1, 36000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
78   
79  165 setThreadFactory(this);
80   
81  165 this.path = path;
82  165 this.groupThreadName = this.path + " job group daemon thread";
83    }
84   
 
85  285 toggle @Override
86    protected void beforeExecute(Thread t, Runnable r)
87    {
88  285 DefaultJobExecutor.this.lockTree.lock(this.path);
89   
90  285 this.currentJob = (Job) r;
91   
92  285 Thread.currentThread().setName(this.groupThreadName + " - " + this.currentJob);
93   
94  285 super.beforeExecute(t, r);
95    }
96   
 
97  285 toggle @Override
98    protected void afterExecute(Runnable r, Throwable t)
99    {
100  285 Thread.currentThread().setName(this.groupThreadName);
101   
102  285 DefaultJobExecutor.this.lockTree.unlock(this.path);
103   
104  285 this.currentJob = null;
105   
106  285 super.afterExecute(r, t);
107   
108  285 Job job = (Job) r;
109   
110  285 List<String> jobId = job.getRequest().getId();
111  285 if (jobId != null) {
112  88 synchronized (DefaultJobExecutor.this.groupedJobs) {
113  88 Queue<Job> jobQueue = DefaultJobExecutor.this.groupedJobs.get(jobId);
114  88 if (jobQueue != null) {
115  88 if (jobQueue.peek() == job) {
116  88 jobQueue.poll();
117    }
118    }
119    }
120    }
121    }
122   
 
123  168 toggle @Override
124    public Thread newThread(Runnable r)
125    {
126  168 Thread thread = this.threadFactory.newThread(r);
127   
128  168 thread.setDaemon(true);
129  168 thread.setName(this.groupThreadName);
130   
131  168 return thread;
132    }
133    }
134   
 
135    private class JobThreadExecutor extends ThreadPoolExecutor
136    {
 
137  316 toggle JobThreadExecutor(int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
138    {
139  316 super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
140    }
141   
 
142  320 toggle @Override
143    protected void afterExecute(Runnable r, Throwable t)
144    {
145  320 Job job = (Job) r;
146   
147  320 List<String> jobId = job.getRequest().getId();
148  320 if (jobId != null) {
149  123 synchronized (DefaultJobExecutor.this.jobs) {
150  123 Job storedJob = DefaultJobExecutor.this.jobs.get(jobId);
151  123 if (storedJob == job) {
152  35 DefaultJobExecutor.this.jobs.remove(jobId);
153    }
154    }
155    }
156    }
157    }
158   
159    /**
160    * Used to lookup {@link Job} implementations.
161    */
162    @Inject
163    @Named("context")
164    private Provider<ComponentManager> componentManager;
165   
166    private final Map<List<String>, Queue<Job>> groupedJobs = new ConcurrentHashMap<List<String>, Queue<Job>>();
167   
168    private final Map<List<String>, Job> jobs = new ConcurrentHashMap<List<String>, Job>();
169   
170    /**
171    * Handle care of hierarchical locking for grouped jobs.
172    */
173    private final JobGroupPathLockTree lockTree = new JobGroupPathLockTree();
174   
175    /**
176    * Map<groupname, group executor>.
177    */
178    private final Map<JobGroupPath, JobGroupExecutor> groupExecutors =
179    new ConcurrentHashMap<JobGroupPath, JobGroupExecutor>();
180   
181    /**
182    * Execute non grouped jobs.
183    */
184    private JobThreadExecutor jobExecutor;
185   
186    private volatile boolean disposed;
187   
 
188  151 toggle @Override
189    public void initialize() throws InitializationException
190    {
191  151 this.jobExecutor =
192    new JobThreadExecutor(Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
193    }
194   
 
195  151 toggle @Override
196    public void dispose() throws ComponentLifecycleException
197    {
198  151 synchronized (this) {
199  151 this.disposed = true;
200   
201  151 this.jobExecutor.shutdownNow();
202  151 for (JobGroupExecutor executor : this.groupExecutors.values()) {
203  165 executor.shutdownNow();
204    }
205    }
206    }
207   
208    // JobManager
209   
 
210  30 toggle @Override
211    public Job getCurrentJob(JobGroupPath path)
212    {
213  30 JobGroupExecutor executor = this.groupExecutors.get(path);
214   
215  30 return executor != null ? executor.currentJob : null;
216    }
217   
 
218  4185 toggle @Override
219    public Job getJob(List<String> id)
220    {
221    // Is it a standalone job
222  4185 Job job = this.jobs.get(id);
223  4185 if (job != null) {
224  69 return job;
225    }
226   
227    // Is it in a group
228  4116 Queue<Job> jobQueue = this.groupedJobs.get(id);
229  4116 if (jobQueue != null) {
230  492 job = jobQueue.peek();
231  492 if (job != null) {
232  72 return job;
233    }
234    }
235   
236  4044 return null;
237    }
238   
239    /**
240    * @param jobType the job id
241    * @param request the request
242    * @return a new job
243    * @throws JobException failed to create a job for the provided type
244    */
 
245  316 toggle private Job createJob(String jobType, Request request) throws JobException
246    {
247  316 Job job;
248  316 try {
249  316 job = this.componentManager.get().getInstance(Job.class, jobType);
250    } catch (ComponentLookupException e) {
251  0 throw new JobException("Failed to lookup any Job for role hint [" + jobType + "]", e);
252    }
253   
254  316 job.initialize(request);
255   
256  316 return job;
257    }
258   
 
259  316 toggle @Override
260    public Job execute(String jobType, Request request) throws JobException
261    {
262  316 Job job = createJob(jobType, request);
263   
264  316 execute(job);
265   
266  316 return job;
267    }
268   
 
269  320 toggle @Override
270    public void execute(Job job)
271    {
272  320 if (!this.disposed) {
273  320 if (job instanceof GroupedJob) {
274  285 executeGroupedJob((GroupedJob) job);
275    } else {
276  35 executeSingleJob(job);
277    }
278    } else {
279  0 throw new RejectedExecutionException("The job executor is disposed");
280    }
281    }
282   
 
283  35 toggle private void executeSingleJob(Job job)
284    {
285  35 this.jobExecutor.execute(job);
286   
287  35 List<String> jobId = job.getRequest().getId();
288  35 if (jobId != null) {
289  35 synchronized (this.jobs) {
290  35 this.jobs.put(jobId, job);
291    }
292    }
293    }
294   
 
295  285 toggle private void executeGroupedJob(GroupedJob job)
296    {
297  285 synchronized (this.groupExecutors) {
298  285 JobGroupPath path = job.getGroupPath();
299   
300    // If path is null execute as non grouped job
301  285 if (path == null) {
302  0 executeSingleJob(job);
303   
304  0 return;
305    }
306   
307  285 JobGroupExecutor groupExecutor = this.groupExecutors.get(path);
308   
309  285 if (groupExecutor == null) {
310  165 groupExecutor = new JobGroupExecutor(path);
311  165 this.groupExecutors.put(path, groupExecutor);
312    }
313   
314  285 groupExecutor.execute(job);
315   
316  285 List<String> jobId = job.getRequest().getId();
317  285 if (jobId != null) {
318  88 synchronized (this.groupedJobs) {
319  88 Queue<Job> jobQueue = this.groupedJobs.get(jobId);
320  88 if (jobQueue == null) {
321  59 jobQueue = new ConcurrentLinkedQueue<Job>();
322  59 this.groupedJobs.put(jobId, jobQueue);
323    }
324  88 jobQueue.offer(job);
325    }
326    }
327    }
328    }
329    }