1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.launch.support; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.Date; |
20 | import java.util.LinkedHashMap; |
21 | import java.util.LinkedHashSet; |
22 | import java.util.List; |
23 | import java.util.Map; |
24 | import java.util.Set; |
25 | import java.util.TreeSet; |
26 | |
27 | import org.apache.commons.logging.Log; |
28 | import org.apache.commons.logging.LogFactory; |
29 | import org.springframework.batch.core.BatchStatus; |
30 | import org.springframework.batch.core.Job; |
31 | import org.springframework.batch.core.JobExecution; |
32 | import org.springframework.batch.core.JobInstance; |
33 | import org.springframework.batch.core.JobParameters; |
34 | import org.springframework.batch.core.JobParametersIncrementer; |
35 | import org.springframework.batch.core.JobParametersInvalidException; |
36 | import org.springframework.batch.core.StepExecution; |
37 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
38 | import org.springframework.batch.core.configuration.JobRegistry; |
39 | import org.springframework.batch.core.configuration.ListableJobLocator; |
40 | import org.springframework.batch.core.converter.DefaultJobParametersConverter; |
41 | import org.springframework.batch.core.converter.JobParametersConverter; |
42 | import org.springframework.batch.core.explore.JobExplorer; |
43 | import org.springframework.batch.core.launch.JobExecutionNotRunningException; |
44 | import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; |
45 | import org.springframework.batch.core.launch.JobLauncher; |
46 | import org.springframework.batch.core.launch.JobOperator; |
47 | import org.springframework.batch.core.launch.JobParametersNotFoundException; |
48 | import org.springframework.batch.core.launch.NoSuchJobException; |
49 | import org.springframework.batch.core.launch.NoSuchJobExecutionException; |
50 | import org.springframework.batch.core.launch.NoSuchJobInstanceException; |
51 | import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; |
52 | import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; |
53 | import org.springframework.batch.core.repository.JobRepository; |
54 | import org.springframework.batch.core.repository.JobRestartException; |
55 | import org.springframework.batch.support.PropertiesConverter; |
56 | import org.springframework.beans.factory.InitializingBean; |
57 | import org.springframework.transaction.annotation.Transactional; |
58 | import org.springframework.util.Assert; |
59 | |
60 | /** |
61 | * Simple implementation of the JobOperator interface. Due to the amount of |
62 | * functionality the implementation is combining, the following dependencies |
63 | * are required: |
64 | * |
65 | * <ul> |
66 | * <li> {@link JobLauncher} |
67 | * <li> {@link JobExplorer} |
68 | * <li> {@link JobRepository} |
69 | * <li> {@link JobRegistry} |
70 | * </ul> |
71 | * |
72 | * @author Dave Syer |
73 | * @author Lucas Ward |
74 | * @since 2.0 |
75 | */ |
76 | public class SimpleJobOperator implements JobOperator, InitializingBean { |
77 | |
78 | /** |
79 | * |
80 | */ |
81 | private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): " |
82 | + "%s with name=%s and parameters=%s"; |
83 | |
84 | private ListableJobLocator jobRegistry; |
85 | |
86 | private JobExplorer jobExplorer; |
87 | |
88 | private JobLauncher jobLauncher; |
89 | |
90 | private JobRepository jobRepository; |
91 | |
92 | private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); |
93 | |
94 | private final Log logger = LogFactory.getLog(getClass()); |
95 | |
96 | /** |
97 | * Check mandatory properties. |
98 | * |
99 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
100 | */ |
101 | @Override |
102 | public void afterPropertiesSet() throws Exception { |
103 | Assert.notNull(jobLauncher, "JobLauncher must be provided"); |
104 | Assert.notNull(jobRegistry, "JobLocator must be provided"); |
105 | Assert.notNull(jobExplorer, "JobExplorer must be provided"); |
106 | Assert.notNull(jobRepository, "JobRepository must be provided"); |
107 | } |
108 | |
109 | /** |
110 | * Public setter for the {@link JobParametersConverter}. |
111 | * @param jobParametersConverter the {@link JobParametersConverter} to set |
112 | */ |
113 | public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { |
114 | this.jobParametersConverter = jobParametersConverter; |
115 | } |
116 | |
117 | /** |
118 | * Public setter for the {@link ListableJobLocator}. |
119 | * @param jobRegistry the {@link ListableJobLocator} to set |
120 | */ |
121 | public void setJobRegistry(ListableJobLocator jobRegistry) { |
122 | this.jobRegistry = jobRegistry; |
123 | } |
124 | |
125 | /** |
126 | * Public setter for the {@link JobExplorer}. |
127 | * @param jobExplorer the {@link JobExplorer} to set |
128 | */ |
129 | public void setJobExplorer(JobExplorer jobExplorer) { |
130 | this.jobExplorer = jobExplorer; |
131 | } |
132 | |
133 | public void setJobRepository(JobRepository jobRepository) { |
134 | this.jobRepository = jobRepository; |
135 | } |
136 | |
137 | /** |
138 | * Public setter for the {@link JobLauncher}. |
139 | * @param jobLauncher the {@link JobLauncher} to set |
140 | */ |
141 | public void setJobLauncher(JobLauncher jobLauncher) { |
142 | this.jobLauncher = jobLauncher; |
143 | } |
144 | |
145 | /* |
146 | * (non-Javadoc) |
147 | * |
148 | * @see org.springframework.batch.core.launch.JobOperator#getExecutions(java.lang.Long) |
149 | */ |
150 | @Override |
151 | public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException { |
152 | JobInstance jobInstance = jobExplorer.getJobInstance(instanceId); |
153 | if (jobInstance == null) { |
154 | throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId)); |
155 | } |
156 | List<Long> list = new ArrayList<Long>(); |
157 | for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) { |
158 | list.add(jobExecution.getId()); |
159 | } |
160 | return list; |
161 | } |
162 | |
163 | /* |
164 | * (non-Javadoc) |
165 | * |
166 | * @see org.springframework.batch.core.launch.JobOperator#getJobNames() |
167 | */ |
168 | @Override |
169 | public Set<String> getJobNames() { |
170 | return new TreeSet<String>(jobRegistry.getJobNames()); |
171 | } |
172 | |
173 | /* |
174 | * (non-Javadoc) |
175 | * |
176 | * @see JobOperator#getLastInstances(String, int, int) |
177 | */ |
178 | @Override |
179 | public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException { |
180 | List<Long> list = new ArrayList<Long>(); |
181 | for (JobInstance jobInstance : jobExplorer.getJobInstances(jobName, start, count)) { |
182 | list.add(jobInstance.getId()); |
183 | } |
184 | if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
185 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
186 | } |
187 | return list; |
188 | } |
189 | |
190 | /* |
191 | * (non-Javadoc) |
192 | * |
193 | * @see |
194 | * org.springframework.batch.core.launch.JobOperator#getParameters(java. |
195 | * lang.Long) |
196 | */ |
197 | @Override |
198 | public String getParameters(long executionId) throws NoSuchJobExecutionException { |
199 | JobExecution jobExecution = findExecutionById(executionId); |
200 | |
201 | return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution |
202 | .getJobParameters())); |
203 | } |
204 | |
205 | /* |
206 | * (non-Javadoc) |
207 | * |
208 | * @see |
209 | * org.springframework.batch.core.launch.JobOperator#getRunningExecutions |
210 | * (java.lang.String) |
211 | */ |
212 | @Override |
213 | public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException { |
214 | Set<Long> set = new LinkedHashSet<Long>(); |
215 | for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) { |
216 | set.add(jobExecution.getId()); |
217 | } |
218 | if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
219 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
220 | } |
221 | return set; |
222 | } |
223 | |
224 | /* |
225 | * (non-Javadoc) |
226 | * |
227 | * @see |
228 | * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries |
229 | * (java.lang.Long) |
230 | */ |
231 | @Override |
232 | public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException { |
233 | JobExecution jobExecution = findExecutionById(executionId); |
234 | |
235 | Map<Long, String> map = new LinkedHashMap<Long, String>(); |
236 | for (StepExecution stepExecution : jobExecution.getStepExecutions()) { |
237 | map.put(stepExecution.getId(), stepExecution.toString()); |
238 | } |
239 | return map; |
240 | } |
241 | |
242 | /* |
243 | * (non-Javadoc) |
244 | * |
245 | * @see |
246 | * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang |
247 | * .Long) |
248 | */ |
249 | @Override |
250 | public String getSummary(long executionId) throws NoSuchJobExecutionException { |
251 | JobExecution jobExecution = findExecutionById(executionId); |
252 | return jobExecution.toString(); |
253 | } |
254 | |
255 | /* |
256 | * (non-Javadoc) |
257 | * |
258 | * @see |
259 | * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long) |
260 | */ |
261 | @Override |
262 | public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException { |
263 | |
264 | logger.info("Checking status of job execution with id=" + executionId); |
265 | |
266 | JobExecution jobExecution = findExecutionById(executionId); |
267 | |
268 | String jobName = jobExecution.getJobInstance().getJobName(); |
269 | Job job = jobRegistry.getJob(jobName); |
270 | JobParameters parameters = jobExecution.getJobParameters(); |
271 | |
272 | logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters)); |
273 | try { |
274 | return jobLauncher.run(job, parameters).getId(); |
275 | } |
276 | catch (JobExecutionAlreadyRunningException e) { |
277 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
278 | jobName, parameters), e); |
279 | } |
280 | |
281 | } |
282 | |
283 | /* |
284 | * (non-Javadoc) |
285 | * |
286 | * @see |
287 | * org.springframework.batch.core.launch.JobOperator#start(java.lang.String, |
288 | * java.lang.String) |
289 | */ |
290 | @Override |
291 | public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException { |
292 | |
293 | logger.info("Checking status of job with name=" + jobName); |
294 | |
295 | JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter |
296 | .stringToProperties(parameters)); |
297 | |
298 | if (jobRepository.isJobInstanceExists(jobName, jobParameters)) { |
299 | throw new JobInstanceAlreadyExistsException(String.format( |
300 | "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName, |
301 | parameters)); |
302 | } |
303 | |
304 | Job job = jobRegistry.getJob(jobName); |
305 | |
306 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
307 | try { |
308 | return jobLauncher.run(job, jobParameters).getId(); |
309 | } |
310 | catch (JobExecutionAlreadyRunningException e) { |
311 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
312 | jobName, parameters), e); |
313 | } |
314 | catch (JobRestartException e) { |
315 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
316 | parameters), e); |
317 | } |
318 | catch (JobInstanceAlreadyCompleteException e) { |
319 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName, |
320 | parameters), e); |
321 | } |
322 | |
323 | } |
324 | |
325 | /* |
326 | * (non-Javadoc) |
327 | * |
328 | * @see JobOperator#startNextInstance(String ) |
329 | */ |
330 | @Override |
331 | public Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException, |
332 | UnexpectedJobExecutionException, JobParametersInvalidException { |
333 | |
334 | logger.info("Locating parameters for next instance of job with name=" + jobName); |
335 | |
336 | Job job = jobRegistry.getJob(jobName); |
337 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobName, 0, 1); |
338 | |
339 | JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); |
340 | if (incrementer == null) { |
341 | throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName); |
342 | } |
343 | |
344 | JobParameters parameters; |
345 | if (lastInstances.isEmpty()) { |
346 | parameters = incrementer.getNext(new JobParameters()); |
347 | if (parameters == null) { |
348 | throw new JobParametersNotFoundException("No bootstrap parameters found for job=" + jobName); |
349 | } |
350 | } |
351 | else { |
352 | List<JobExecution> lastExecutions = jobExplorer.getJobExecutions(lastInstances.get(0)); |
353 | parameters = incrementer.getNext(lastExecutions.get(0).getJobParameters()); |
354 | } |
355 | |
356 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
357 | try { |
358 | return jobLauncher.run(job, parameters).getId(); |
359 | } |
360 | catch (JobExecutionAlreadyRunningException e) { |
361 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName, |
362 | parameters), e); |
363 | } |
364 | catch (JobRestartException e) { |
365 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
366 | parameters), e); |
367 | } |
368 | catch (JobInstanceAlreadyCompleteException e) { |
369 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete", |
370 | jobName, parameters), e); |
371 | } |
372 | |
373 | } |
374 | |
375 | /* |
376 | * (non-Javadoc) |
377 | * |
378 | * @see |
379 | * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long) |
380 | */ |
381 | @Override |
382 | @Transactional |
383 | public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { |
384 | |
385 | JobExecution jobExecution = findExecutionById(executionId); |
386 | // Indicate the execution should be stopped by setting it's status to |
387 | // 'STOPPING'. It is assumed that |
388 | // the step implementation will check this status at chunk boundaries. |
389 | BatchStatus status = jobExecution.getStatus(); |
390 | if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) { |
391 | throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution); |
392 | } |
393 | jobExecution.setStatus(BatchStatus.STOPPING); |
394 | jobRepository.update(jobExecution); |
395 | |
396 | return true; |
397 | } |
398 | |
399 | @Override |
400 | public JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException { |
401 | JobExecution jobExecution = findExecutionById(jobExecutionId); |
402 | |
403 | if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) { |
404 | throw new JobExecutionAlreadyRunningException( |
405 | "JobExecution is running or complete and therefore cannot be aborted"); |
406 | } |
407 | |
408 | logger.info("Aborting job execution: " + jobExecution); |
409 | jobExecution.upgradeStatus(BatchStatus.ABANDONED); |
410 | jobExecution.setEndTime(new Date()); |
411 | jobRepository.update(jobExecution); |
412 | |
413 | return jobExecution; |
414 | } |
415 | |
416 | private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException { |
417 | JobExecution jobExecution = jobExplorer.getJobExecution(executionId); |
418 | |
419 | if (jobExecution == null) { |
420 | throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]"); |
421 | } |
422 | return jobExecution; |
423 | |
424 | } |
425 | } |