View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.launch.support;
17  
18  import java.util.ArrayList;
19  import java.util.Date;
20  import java.util.LinkedHashMap;
21  import java.util.LinkedHashSet;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.TreeSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.springframework.batch.core.BatchStatus;
30  import org.springframework.batch.core.Job;
31  import org.springframework.batch.core.JobExecution;
32  import org.springframework.batch.core.JobInstance;
33  import org.springframework.batch.core.JobParameters;
34  import org.springframework.batch.core.JobParametersIncrementer;
35  import org.springframework.batch.core.JobParametersInvalidException;
36  import org.springframework.batch.core.StepExecution;
37  import org.springframework.batch.core.UnexpectedJobExecutionException;
38  import org.springframework.batch.core.configuration.JobRegistry;
39  import org.springframework.batch.core.configuration.ListableJobLocator;
40  import org.springframework.batch.core.converter.DefaultJobParametersConverter;
41  import org.springframework.batch.core.converter.JobParametersConverter;
42  import org.springframework.batch.core.explore.JobExplorer;
43  import org.springframework.batch.core.launch.JobExecutionNotRunningException;
44  import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
45  import org.springframework.batch.core.launch.JobLauncher;
46  import org.springframework.batch.core.launch.JobOperator;
47  import org.springframework.batch.core.launch.JobParametersNotFoundException;
48  import org.springframework.batch.core.launch.NoSuchJobException;
49  import org.springframework.batch.core.launch.NoSuchJobExecutionException;
50  import org.springframework.batch.core.launch.NoSuchJobInstanceException;
51  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
52  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
53  import org.springframework.batch.core.repository.JobRepository;
54  import org.springframework.batch.core.repository.JobRestartException;
55  import org.springframework.batch.support.PropertiesConverter;
56  import org.springframework.beans.factory.InitializingBean;
57  import org.springframework.transaction.annotation.Transactional;
58  import org.springframework.util.Assert;
59  
60  /**
61   * Simple implementation of the JobOperator interface.  Due to the amount of
62   * functionality the implementation is combining, the following dependencies
63   * are required:
64   *
65   * <ul>
66   * 	<li> {@link JobLauncher}
67   *  <li> {@link JobExplorer}
68   *  <li> {@link JobRepository}
69   *  <li> {@link JobRegistry}
70   * </ul>
71   *
72   * @author Dave Syer
73   * @author Lucas Ward
74   * @since 2.0
75   */
76  public class SimpleJobOperator implements JobOperator, InitializingBean {
77  
78  	/**
79  	 *
80  	 */
81  	private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): "
82  			+ "%s with name=%s and parameters=%s";
83  
84  	private ListableJobLocator jobRegistry;
85  
86  	private JobExplorer jobExplorer;
87  
88  	private JobLauncher jobLauncher;
89  
90  	private JobRepository jobRepository;
91  
92  	private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
93  
94  	private final Log logger = LogFactory.getLog(getClass());
95  
96  	/**
97  	 * Check mandatory properties.
98  	 *
99  	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
100 	 */
101 	@Override
102 	public void afterPropertiesSet() throws Exception {
103 		Assert.notNull(jobLauncher, "JobLauncher must be provided");
104 		Assert.notNull(jobRegistry, "JobLocator must be provided");
105 		Assert.notNull(jobExplorer, "JobExplorer must be provided");
106 		Assert.notNull(jobRepository, "JobRepository must be provided");
107 	}
108 
109 	/**
110 	 * Public setter for the {@link JobParametersConverter}.
111 	 * @param jobParametersConverter the {@link JobParametersConverter} to set
112 	 */
113 	public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
114 		this.jobParametersConverter = jobParametersConverter;
115 	}
116 
117 	/**
118 	 * Public setter for the {@link ListableJobLocator}.
119 	 * @param jobRegistry the {@link ListableJobLocator} to set
120 	 */
121 	public void setJobRegistry(ListableJobLocator jobRegistry) {
122 		this.jobRegistry = jobRegistry;
123 	}
124 
125 	/**
126 	 * Public setter for the {@link JobExplorer}.
127 	 * @param jobExplorer the {@link JobExplorer} to set
128 	 */
129 	public void setJobExplorer(JobExplorer jobExplorer) {
130 		this.jobExplorer = jobExplorer;
131 	}
132 
133 	public void setJobRepository(JobRepository jobRepository) {
134 		this.jobRepository = jobRepository;
135 	}
136 
137 	/**
138 	 * Public setter for the {@link JobLauncher}.
139 	 * @param jobLauncher the {@link JobLauncher} to set
140 	 */
141 	public void setJobLauncher(JobLauncher jobLauncher) {
142 		this.jobLauncher = jobLauncher;
143 	}
144 
145 	/*
146 	 * (non-Javadoc)
147 	 *
148 	 * @see org.springframework.batch.core.launch.JobOperator#getExecutions(java.lang.Long)
149 	 */
150 	@Override
151 	public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException {
152 		JobInstance jobInstance = jobExplorer.getJobInstance(instanceId);
153 		if (jobInstance == null) {
154 			throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId));
155 		}
156 		List<Long> list = new ArrayList<Long>();
157 		for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) {
158 			list.add(jobExecution.getId());
159 		}
160 		return list;
161 	}
162 
163 	/*
164 	 * (non-Javadoc)
165 	 *
166 	 * @see org.springframework.batch.core.launch.JobOperator#getJobNames()
167 	 */
168 	@Override
169 	public Set<String> getJobNames() {
170 		return new TreeSet<String>(jobRegistry.getJobNames());
171 	}
172 
173 	/*
174 	 * (non-Javadoc)
175 	 *
176 	 * @see JobOperator#getLastInstances(String, int, int)
177 	 */
178 	@Override
179 	public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException {
180 		List<Long> list = new ArrayList<Long>();
181 		for (JobInstance jobInstance : jobExplorer.getJobInstances(jobName, start, count)) {
182 			list.add(jobInstance.getId());
183 		}
184 		if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
185 			throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
186 		}
187 		return list;
188 	}
189 
190 	/*
191 	 * (non-Javadoc)
192 	 *
193 	 * @see
194 	 * org.springframework.batch.core.launch.JobOperator#getParameters(java.
195 	 * lang.Long)
196 	 */
197 	@Override
198 	public String getParameters(long executionId) throws NoSuchJobExecutionException {
199 		JobExecution jobExecution = findExecutionById(executionId);
200 
201 		return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution
202 				.getJobInstance().getJobParameters()));
203 	}
204 
205 	/*
206 	 * (non-Javadoc)
207 	 *
208 	 * @see
209 	 * org.springframework.batch.core.launch.JobOperator#getRunningExecutions
210 	 * (java.lang.String)
211 	 */
212 	@Override
213 	public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException {
214 		Set<Long> set = new LinkedHashSet<Long>();
215 		for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) {
216 			set.add(jobExecution.getId());
217 		}
218 		if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
219 			throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
220 		}
221 		return set;
222 	}
223 
224 	/*
225 	 * (non-Javadoc)
226 	 *
227 	 * @see
228 	 * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries
229 	 * (java.lang.Long)
230 	 */
231 	@Override
232 	public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException {
233 		JobExecution jobExecution = findExecutionById(executionId);
234 
235 		Map<Long, String> map = new LinkedHashMap<Long, String>();
236 		for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
237 			map.put(stepExecution.getId(), stepExecution.toString());
238 		}
239 		return map;
240 	}
241 
242 	/*
243 	 * (non-Javadoc)
244 	 *
245 	 * @see
246 	 * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang
247 	 * .Long)
248 	 */
249 	@Override
250 	public String getSummary(long executionId) throws NoSuchJobExecutionException {
251 		JobExecution jobExecution = findExecutionById(executionId);
252 		return jobExecution.toString();
253 	}
254 
255 	/*
256 	 * (non-Javadoc)
257 	 *
258 	 * @see
259 	 * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long)
260 	 */
261 	@Override
262 	public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
263 
264 		logger.info("Checking status of job execution with id=" + executionId);
265 
266 		JobExecution jobExecution = findExecutionById(executionId);
267 
268 		String jobName = jobExecution.getJobInstance().getJobName();
269 		Job job = jobRegistry.getJob(jobName);
270 		JobParameters parameters = jobExecution.getJobInstance().getJobParameters();
271 
272 		logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters));
273 		try {
274 			return jobLauncher.run(job, parameters).getId();
275 		}
276 		catch (JobExecutionAlreadyRunningException e) {
277 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
278 					jobName, parameters), e);
279 		}
280 
281 	}
282 
283 	/*
284 	 * (non-Javadoc)
285 	 *
286 	 * @see
287 	 * org.springframework.batch.core.launch.JobOperator#start(java.lang.String,
288 	 * java.lang.String)
289 	 */
290 	@Override
291 	public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
292 
293 		logger.info("Checking status of job with name=" + jobName);
294 
295 		JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter
296 				.stringToProperties(parameters));
297 
298 		if (jobRepository.isJobInstanceExists(jobName, jobParameters)) {
299 			throw new JobInstanceAlreadyExistsException(String.format(
300 					"Cannot start a job instance that already exists with name=%s and parameters=%s", jobName,
301 					parameters));
302 		}
303 
304 		Job job = jobRegistry.getJob(jobName);
305 
306 		logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
307 		try {
308 			return jobLauncher.run(job, jobParameters).getId();
309 		}
310 		catch (JobExecutionAlreadyRunningException e) {
311 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
312 					jobName, parameters), e);
313 		}
314 		catch (JobRestartException e) {
315 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
316 					parameters), e);
317 		}
318 		catch (JobInstanceAlreadyCompleteException e) {
319 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName,
320 					parameters), e);
321 		}
322 
323 	}
324 
325 	/*
326 	 * (non-Javadoc)
327 	 *
328 	 * @see JobOperator#startNextInstance(String )
329 	 */
330 	@Override
331 	public Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException,
332 	UnexpectedJobExecutionException, JobParametersInvalidException {
333 
334 		logger.info("Locating parameters for next instance of job with name=" + jobName);
335 
336 		Job job = jobRegistry.getJob(jobName);
337 		List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobName, 0, 1);
338 
339 		JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
340 		if (incrementer == null) {
341 			throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName);
342 		}
343 
344 		JobParameters parameters;
345 		if (lastInstances.isEmpty()) {
346 			parameters = incrementer.getNext(new JobParameters());
347 			if (parameters == null) {
348 				throw new JobParametersNotFoundException("No bootstrap parameters found for job=" + jobName);
349 			}
350 		}
351 		else {
352 			parameters = incrementer.getNext(lastInstances.get(0).getJobParameters());
353 		}
354 
355 		logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
356 		try {
357 			return jobLauncher.run(job, parameters).getId();
358 		}
359 		catch (JobExecutionAlreadyRunningException e) {
360 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName,
361 					parameters), e);
362 		}
363 		catch (JobRestartException e) {
364 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
365 					parameters), e);
366 		}
367 		catch (JobInstanceAlreadyCompleteException e) {
368 			throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete",
369 					jobName, parameters), e);
370 		}
371 
372 	}
373 
374 	/*
375 	 * (non-Javadoc)
376 	 *
377 	 * @see
378 	 * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long)
379 	 */
380 	@Override
381 	@Transactional
382 	public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
383 
384 		JobExecution jobExecution = findExecutionById(executionId);
385 		// Indicate the execution should be stopped by setting it's status to
386 		// 'STOPPING'. It is assumed that
387 		// the step implementation will check this status at chunk boundaries.
388 		BatchStatus status = jobExecution.getStatus();
389 		if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
390 			throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
391 		}
392 		jobExecution.setStatus(BatchStatus.STOPPING);
393 		jobRepository.update(jobExecution);
394 
395 		return true;
396 	}
397 
398 	@Override
399 	public JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException {
400 		JobExecution jobExecution = findExecutionById(jobExecutionId);
401 
402 		if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) {
403 			throw new JobExecutionAlreadyRunningException(
404 					"JobExecution is running or complete and therefore cannot be aborted");
405 		}
406 
407 		logger.info("Aborting job execution: " + jobExecution);
408 		jobExecution.upgradeStatus(BatchStatus.ABANDONED);
409 		jobExecution.setEndTime(new Date());
410 		jobRepository.update(jobExecution);
411 
412 		return jobExecution;
413 	}
414 
415 	private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException {
416 		JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
417 
418 		if (jobExecution == null) {
419 			throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]");
420 		}
421 		return jobExecution;
422 
423 	}
424 }