View Javadoc
1   /*
2    * Copyright 2009-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.admin.launch;
17  
18  import java.util.Date;
19  import java.util.HashSet;
20  import java.util.Set;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.aspectj.lang.annotation.AfterReturning;
25  import org.aspectj.lang.annotation.Aspect;
26  import org.aspectj.lang.annotation.Before;
27  import org.springframework.batch.core.BatchStatus;
28  import org.springframework.batch.core.ExitStatus;
29  import org.springframework.batch.core.Job;
30  import org.springframework.batch.core.JobExecution;
31  import org.springframework.batch.core.explore.JobExplorer;
32  import org.springframework.batch.core.launch.JobLauncher;
33  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
34  import org.springframework.batch.core.repository.JobRepository;
35  import org.springframework.beans.factory.InitializingBean;
36  import org.springframework.jmx.export.annotation.ManagedAttribute;
37  import org.springframework.jmx.export.annotation.ManagedOperation;
38  import org.springframework.jmx.export.annotation.ManagedResource;
39  import org.springframework.util.Assert;
40  
41  /**
42   * Wrapper for a {@link JobLauncher} that synchronizes jobs globally so that
43   * only one execution of a given Job can be active at once.
44   * 
45   * @author Dave Syer
46   * 
47   */
48  @Aspect
49  @ManagedResource
50  public class JobLauncherSynchronizer implements InitializingBean {
51  
52  	private static final Log logger = LogFactory.getLog(JobLauncherSynchronizer.class);
53  
54  	private JobExplorer jobExplorer;
55  
56  	private JobRepository jobRepository;
57  
58  	private Set<String> jobNames = new HashSet<String>();
59  
60  	/**
61  	 * The {@link JobExplorer} to use to inspect existing executions.
62  	 * 
63  	 * @param jobExplorer a {@link JobExplorer}
64  	 */
65  	public void setJobExplorer(JobExplorer jobExplorer) {
66  		this.jobExplorer = jobExplorer;
67  	}
68  
69  	/**
70  	 * The {@link JobRepository} needed for updates to execution data.
71  	 * 
72  	 * @param jobRepository a {@link JobRepository}
73  	 */
74  	public void setJobRepository(JobRepository jobRepository) {
75  		this.jobRepository = jobRepository;
76  	}
77  
78  	/**
79  	 * Set of job names that will be synchronized. Others are ignored.
80  	 * 
81  	 * @param jobNames the job names
82  	 */
83  	public void setJobNames(Set<String> jobNames) {
84  		this.jobNames = jobNames;
85  	}
86  	
87  	/**
88  	 * A job name that will be synchronized.
89  	 * 
90  	 * @param jobName the job name
91  	 */
92  	@ManagedOperation
93  	public void addJobName(String jobName) {
94  		this.jobNames.add(jobName);
95  	}
96  	
97  	/**
98  	 * Remove a job name from the list to synchronize.
99  	 * 
100 	 * @param jobName the job name
101 	 */
102 	@ManagedOperation
103 	public void removeJobName(String jobName) {
104 		this.jobNames.remove(jobName);
105 	}
106 	
107 	/**
108 	 * @return the jobNames
109 	 */
110 	@ManagedAttribute
111 	public Set<String> getJobNames() {
112 		return jobNames;
113 	}
114 
115 	public void afterPropertiesSet() throws Exception {
116 		Assert.notNull(jobExplorer, "A JobExplorer must be provided");
117 		Assert.notNull(jobRepository, "A JobRepository must be provided");
118 	}
119 
120 	@Before("execution(* org.springframework.batch..JobLauncher+.*(..)) && args(job,..)")
121 	public void checkJobBeforeLaunch(Job job) throws JobExecutionAlreadyRunningException {
122 		String jobName = job.getName();
123 		logger.debug("Checking for synchronization on Job: " + jobName);
124 		if (!jobNames.contains(jobName)) {
125 			logger.debug("Not synchronizing Job: " + jobName);
126 			return;
127 		}
128 		Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
129 		if (!running.isEmpty()) {
130 			throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
131 		}
132 		logger.debug("Job checked and no duplicates detected: " + jobName);
133 	}
134 
135 	@AfterReturning(value = "execution(* org.springframework.batch..JobRepository+.createJobExecution(..)) && args(jobName,..)", returning = "jobExecution")
136 	public void checkJobDuringLaunch(String jobName, JobExecution jobExecution)
137 			throws JobExecutionAlreadyRunningException {
138 		logger.debug("Re-checking for synchronization on JobExecution: " + jobExecution);
139 		if (!jobNames.contains(jobName)) {
140 			logger.debug("Not re-checking for synchronization of Job: " + jobName);
141 			return;
142 		}
143 		Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
144 		if (running.size() > 1) {
145 			jobExecution.setEndTime(new Date());
146 			jobExecution.upgradeStatus(BatchStatus.ABANDONED);
147 			jobExecution.setExitStatus(jobExecution.getExitStatus().and(ExitStatus.NOOP).addExitDescription(
148 					"Not executed because another execution was detected for the same Job."));
149 			jobRepository.update(jobExecution);
150 			throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
151 		}
152 	}
153 }