View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.repository.support;
18  
19  import java.util.ArrayList;
20  import java.util.Date;
21  import java.util.List;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.springframework.batch.core.BatchStatus;
26  import org.springframework.batch.core.JobExecution;
27  import org.springframework.batch.core.JobInstance;
28  import org.springframework.batch.core.JobParameters;
29  import org.springframework.batch.core.StepExecution;
30  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
31  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
32  import org.springframework.batch.core.repository.JobRepository;
33  import org.springframework.batch.core.repository.JobRestartException;
34  import org.springframework.batch.core.repository.dao.ExecutionContextDao;
35  import org.springframework.batch.core.repository.dao.JobExecutionDao;
36  import org.springframework.batch.core.repository.dao.JobInstanceDao;
37  import org.springframework.batch.core.repository.dao.StepExecutionDao;
38  import org.springframework.batch.item.ExecutionContext;
39  import org.springframework.util.Assert;
40  
41  /**
42   *
43   * <p>
44   * Implementation of {@link JobRepository} that stores JobInstances,
45   * JobExecutions, and StepExecutions using the injected DAOs.
46   * <p>
47   *
48   * @author Lucas Ward
49   * @author Dave Syer
50   * @author Robert Kasanicky
51   *
52   * @see JobRepository
53   * @see JobInstanceDao
54   * @see JobExecutionDao
55   * @see StepExecutionDao
56   *
57   */
58  public class SimpleJobRepository implements JobRepository {
59  
60  	private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);
61  
62  	private JobInstanceDao jobInstanceDao;
63  
64  	private JobExecutionDao jobExecutionDao;
65  
66  	private StepExecutionDao stepExecutionDao;
67  
68  	private ExecutionContextDao ecDao;
69  
70  	/**
71  	 * Provide default constructor with low visibility in case user wants to use
72  	 * use aop:proxy-target-class="true" for AOP interceptor.
73  	 */
74  	SimpleJobRepository() {
75  	}
76  
77  	public SimpleJobRepository(JobInstanceDao jobInstanceDao, JobExecutionDao jobExecutionDao,
78  			StepExecutionDao stepExecutionDao, ExecutionContextDao ecDao) {
79  		super();
80  		this.jobInstanceDao = jobInstanceDao;
81  		this.jobExecutionDao = jobExecutionDao;
82  		this.stepExecutionDao = stepExecutionDao;
83  		this.ecDao = ecDao;
84  	}
85  
86  	@Override
87  	public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
88  		return jobInstanceDao.getJobInstance(jobName, jobParameters) != null;
89  	}
90  
91  	@Override
92  	public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
93  			throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
94  
95  		Assert.notNull(jobName, "Job name must not be null.");
96  		Assert.notNull(jobParameters, "JobParameters must not be null.");
97  
98  		/*
99  		 * Find all jobs matching the runtime information.
100 		 *
101 		 * If this method is transactional, and the isolation level is
102 		 * REPEATABLE_READ or better, another launcher trying to start the same
103 		 * job in another thread or process will block until this transaction
104 		 * has finished.
105 		 */
106 
107 		JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
108 		ExecutionContext executionContext;
109 
110 		// existing job instance found
111 		if (jobInstance != null) {
112 
113 			List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
114 
115 			// check for running executions and find the last started
116 			for (JobExecution execution : executions) {
117 				if (execution.isRunning()) {
118 					throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
119 							+ jobInstance);
120 				}
121 
122 				BatchStatus status = execution.getStatus();
123 				if (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED) {
124 					throw new JobInstanceAlreadyCompleteException(
125 							"A job instance already exists and is complete for parameters=" + jobParameters
126 							+ ".  If you want to run this job again, change the parameters.");
127 				}
128 			}
129 			executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
130 		}
131 		else {
132 			// no job found, create one
133 			jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
134 			executionContext = new ExecutionContext();
135 		}
136 
137 		JobExecution jobExecution = new JobExecution(jobInstance);
138 		jobExecution.setExecutionContext(executionContext);
139 		jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
140 
141 		// Save the JobExecution so that it picks up an ID (useful for clients
142 		// monitoring asynchronous executions):
143 		jobExecutionDao.saveJobExecution(jobExecution);
144 		ecDao.saveExecutionContext(jobExecution);
145 
146 		return jobExecution;
147 
148 	}
149 
150 	@Override
151 	public void update(JobExecution jobExecution) {
152 
153 		Assert.notNull(jobExecution, "JobExecution cannot be null.");
154 		Assert.notNull(jobExecution.getJobId(), "JobExecution must have a Job ID set.");
155 		Assert.notNull(jobExecution.getId(), "JobExecution must be already saved (have an id assigned).");
156 
157 		jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
158 		jobExecutionDao.updateJobExecution(jobExecution);
159 	}
160 
161 	@Override
162 	public void add(StepExecution stepExecution) {
163 		validateStepExecution(stepExecution);
164 
165 		stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
166 		stepExecutionDao.saveStepExecution(stepExecution);
167 		ecDao.saveExecutionContext(stepExecution);
168 	}
169 
170 	@Override
171 	public void update(StepExecution stepExecution) {
172 		validateStepExecution(stepExecution);
173 		Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
174 
175 		stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
176 		stepExecutionDao.updateStepExecution(stepExecution);
177 		checkForInterruption(stepExecution);
178 	}
179 
180 	private void validateStepExecution(StepExecution stepExecution) {
181 		Assert.notNull(stepExecution, "StepExecution cannot be null.");
182 		Assert.notNull(stepExecution.getStepName(), "StepExecution's step name cannot be null.");
183 		Assert.notNull(stepExecution.getJobExecutionId(), "StepExecution must belong to persisted JobExecution");
184 	}
185 
186 	@Override
187 	public void updateExecutionContext(StepExecution stepExecution) {
188 		validateStepExecution(stepExecution);
189 		Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
190 		ecDao.updateExecutionContext(stepExecution);
191 	}
192 
193 	@Override
194 	public void updateExecutionContext(JobExecution jobExecution) {
195 		ecDao.updateExecutionContext(jobExecution);
196 	}
197 
198 	@Override
199 	public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
200 		List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
201 		List<StepExecution> stepExecutions = new ArrayList<StepExecution>(jobExecutions.size());
202 		for (JobExecution jobExecution : jobExecutions) {
203 			stepExecutionDao.addStepExecutions(jobExecution);
204 			for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
205 				if (stepName.equals(stepExecution.getStepName())) {
206 					stepExecutions.add(stepExecution);
207 				}
208 			}
209 		}
210 		StepExecution latest = null;
211 		for (StepExecution stepExecution : stepExecutions) {
212 			if (latest == null) {
213 				latest = stepExecution;
214 			}
215 			if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
216 				latest = stepExecution;
217 			}
218 		}
219 		if (latest != null) {
220 			ExecutionContext executionContext = ecDao.getExecutionContext(latest);
221 			latest.setExecutionContext(executionContext);
222 		}
223 		return latest;
224 	}
225 
226 	/**
227 	 * @return number of executions of the step within given job instance
228 	 */
229 	@Override
230 	public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
231 		int count = 0;
232 		List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
233 		for (JobExecution jobExecution : jobExecutions) {
234 			stepExecutionDao.addStepExecutions(jobExecution);
235 			for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
236 				if (stepName.equals(stepExecution.getStepName())) {
237 					count++;
238 				}
239 			}
240 		}
241 		return count;
242 	}
243 
244 	/*
245 	 * Check to determine whether or not the JobExecution that is the parent of
246 	 * the provided StepExecution has been interrupted. If, after synchronizing
247 	 * the status with the database, the status has been updated to STOPPING,
248 	 * then the job has been interrupted.
249 	 *
250 	 * @param stepExecution
251 	 */
252 	private void checkForInterruption(StepExecution stepExecution) {
253 		JobExecution jobExecution = stepExecution.getJobExecution();
254 		jobExecutionDao.synchronizeStatus(jobExecution);
255 		if (jobExecution.isStopping()) {
256 			logger.info("Parent JobExecution is stopped, so passing message on to StepExecution");
257 			stepExecution.setTerminateOnly();
258 		}
259 	}
260 
261 	@Override
262 	public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
263 		JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
264 		if (jobInstance == null) {
265 			return null;
266 		}
267 		JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);
268 
269 		if (jobExecution != null) {
270 			jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
271 		}
272 		return jobExecution;
273 
274 	}
275 
276 
277 }