1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.launch.support; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.LinkedHashMap; |
20 | import java.util.LinkedHashSet; |
21 | import java.util.List; |
22 | import java.util.Map; |
23 | import java.util.Set; |
24 | import java.util.TreeSet; |
25 | |
26 | import org.apache.commons.logging.Log; |
27 | import org.apache.commons.logging.LogFactory; |
28 | import org.springframework.batch.core.BatchStatus; |
29 | import org.springframework.batch.core.Job; |
30 | import org.springframework.batch.core.JobExecution; |
31 | import org.springframework.batch.core.JobInstance; |
32 | import org.springframework.batch.core.JobParameters; |
33 | import org.springframework.batch.core.JobParametersIncrementer; |
34 | import org.springframework.batch.core.JobParametersInvalidException; |
35 | import org.springframework.batch.core.StepExecution; |
36 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
37 | import org.springframework.batch.core.configuration.JobRegistry; |
38 | import org.springframework.batch.core.configuration.ListableJobLocator; |
39 | import org.springframework.batch.core.converter.DefaultJobParametersConverter; |
40 | import org.springframework.batch.core.converter.JobParametersConverter; |
41 | import org.springframework.batch.core.explore.JobExplorer; |
42 | import org.springframework.batch.core.launch.JobExecutionNotRunningException; |
43 | import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; |
44 | import org.springframework.batch.core.launch.JobLauncher; |
45 | import org.springframework.batch.core.launch.JobOperator; |
46 | import org.springframework.batch.core.launch.JobParametersNotFoundException; |
47 | import org.springframework.batch.core.launch.NoSuchJobException; |
48 | import org.springframework.batch.core.launch.NoSuchJobExecutionException; |
49 | import org.springframework.batch.core.launch.NoSuchJobInstanceException; |
50 | import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; |
51 | import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; |
52 | import org.springframework.batch.core.repository.JobRepository; |
53 | import org.springframework.batch.core.repository.JobRestartException; |
54 | import org.springframework.batch.support.PropertiesConverter; |
55 | import org.springframework.beans.factory.InitializingBean; |
56 | import org.springframework.transaction.annotation.Transactional; |
57 | import org.springframework.util.Assert; |
58 | |
59 | /** |
60 | * Simple implementation of the JobOperator interface. Due to the amount of |
61 | * functionality the implementation is combining, the following dependencies |
62 | * are required: |
63 | * |
64 | * <ul> |
65 | * <li> {@link JobLauncher} |
66 | * <li> {@link JobExplorer} |
67 | * <li> {@link JobRepository} |
68 | * <li> {@link JobRegistry} |
69 | * </ul> |
70 | * |
71 | * @author Dave Syer |
72 | * @author Lucas Ward |
73 | * @since 2.0 |
74 | */ |
75 | public class SimpleJobOperator implements JobOperator, InitializingBean { |
76 | |
77 | /** |
78 | * |
79 | */ |
80 | private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): " |
81 | + "%s with name=%s and parameters=%s"; |
82 | |
83 | private ListableJobLocator jobRegistry; |
84 | |
85 | private JobExplorer jobExplorer; |
86 | |
87 | private JobLauncher jobLauncher; |
88 | |
89 | private JobRepository jobRepository; |
90 | |
91 | private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); |
92 | |
93 | private final Log logger = LogFactory.getLog(getClass()); |
94 | |
95 | /** |
96 | * Check mandatory properties. |
97 | * |
98 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
99 | */ |
100 | public void afterPropertiesSet() throws Exception { |
101 | Assert.notNull(jobLauncher, "JobLauncher must be provided"); |
102 | Assert.notNull(jobRegistry, "JobLocator must be provided"); |
103 | Assert.notNull(jobExplorer, "JobExplorer must be provided"); |
104 | Assert.notNull(jobRepository, "JobRepository must be provided"); |
105 | } |
106 | |
107 | /** |
108 | * Public setter for the {@link JobParametersConverter}. |
109 | * @param jobParametersConverter the {@link JobParametersConverter} to set |
110 | */ |
111 | public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { |
112 | this.jobParametersConverter = jobParametersConverter; |
113 | } |
114 | |
115 | /** |
116 | * Public setter for the {@link ListableJobLocator}. |
117 | * @param jobRegistry the {@link ListableJobLocator} to set |
118 | */ |
119 | public void setJobRegistry(ListableJobLocator jobRegistry) { |
120 | this.jobRegistry = jobRegistry; |
121 | } |
122 | |
123 | /** |
124 | * Public setter for the {@link JobExplorer}. |
125 | * @param jobExplorer the {@link JobExplorer} to set |
126 | */ |
127 | public void setJobExplorer(JobExplorer jobExplorer) { |
128 | this.jobExplorer = jobExplorer; |
129 | } |
130 | |
131 | public void setJobRepository(JobRepository jobRepository) { |
132 | this.jobRepository = jobRepository; |
133 | } |
134 | |
135 | /** |
136 | * Public setter for the {@link JobLauncher}. |
137 | * @param jobLauncher the {@link JobLauncher} to set |
138 | */ |
139 | public void setJobLauncher(JobLauncher jobLauncher) { |
140 | this.jobLauncher = jobLauncher; |
141 | } |
142 | |
143 | /* |
144 | * (non-Javadoc) |
145 | * |
146 | * @see org.springframework.batch.core.launch.JobOperator#getExecutions(java.lang.Long) |
147 | */ |
148 | public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException { |
149 | JobInstance jobInstance = jobExplorer.getJobInstance(instanceId); |
150 | if (jobInstance == null) { |
151 | throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId)); |
152 | } |
153 | List<Long> list = new ArrayList<Long>(); |
154 | for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) { |
155 | list.add(jobExecution.getId()); |
156 | } |
157 | return list; |
158 | } |
159 | |
160 | /* |
161 | * (non-Javadoc) |
162 | * |
163 | * @see org.springframework.batch.core.launch.JobOperator#getJobNames() |
164 | */ |
165 | public Set<String> getJobNames() { |
166 | return new TreeSet<String>(jobRegistry.getJobNames()); |
167 | } |
168 | |
169 | /* |
170 | * (non-Javadoc) |
171 | * |
172 | * @see JobOperator#getLastInstances(String, int, int) |
173 | */ |
174 | public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException { |
175 | List<Long> list = new ArrayList<Long>(); |
176 | for (JobInstance jobInstance : jobExplorer.getJobInstances(jobName, start, count)) { |
177 | list.add(jobInstance.getId()); |
178 | } |
179 | if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
180 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
181 | } |
182 | return list; |
183 | } |
184 | |
185 | /* |
186 | * (non-Javadoc) |
187 | * |
188 | * @see |
189 | * org.springframework.batch.core.launch.JobOperator#getParameters(java. |
190 | * lang.Long) |
191 | */ |
192 | public String getParameters(long executionId) throws NoSuchJobExecutionException { |
193 | JobExecution jobExecution = findExecutionById(executionId); |
194 | |
195 | return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution |
196 | .getJobInstance().getJobParameters())); |
197 | } |
198 | |
199 | /* |
200 | * (non-Javadoc) |
201 | * |
202 | * @see |
203 | * org.springframework.batch.core.launch.JobOperator#getRunningExecutions |
204 | * (java.lang.String) |
205 | */ |
206 | public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException { |
207 | Set<Long> set = new LinkedHashSet<Long>(); |
208 | for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) { |
209 | set.add(jobExecution.getId()); |
210 | } |
211 | if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
212 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
213 | } |
214 | return set; |
215 | } |
216 | |
217 | /* |
218 | * (non-Javadoc) |
219 | * |
220 | * @see |
221 | * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries |
222 | * (java.lang.Long) |
223 | */ |
224 | public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException { |
225 | JobExecution jobExecution = findExecutionById(executionId); |
226 | |
227 | Map<Long, String> map = new LinkedHashMap<Long, String>(); |
228 | for (StepExecution stepExecution : jobExecution.getStepExecutions()) { |
229 | map.put(stepExecution.getId(), stepExecution.toString()); |
230 | } |
231 | return map; |
232 | } |
233 | |
234 | /* |
235 | * (non-Javadoc) |
236 | * |
237 | * @see |
238 | * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang |
239 | * .Long) |
240 | */ |
241 | public String getSummary(long executionId) throws NoSuchJobExecutionException { |
242 | JobExecution jobExecution = findExecutionById(executionId); |
243 | return jobExecution.toString(); |
244 | } |
245 | |
246 | /* |
247 | * (non-Javadoc) |
248 | * |
249 | * @see |
250 | * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long) |
251 | */ |
252 | public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException { |
253 | |
254 | logger.info("Checking status of job execution with id=" + executionId); |
255 | |
256 | JobExecution jobExecution = findExecutionById(executionId); |
257 | |
258 | String jobName = jobExecution.getJobInstance().getJobName(); |
259 | Job job = jobRegistry.getJob(jobName); |
260 | JobParameters parameters = jobExecution.getJobInstance().getJobParameters(); |
261 | |
262 | logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters)); |
263 | try { |
264 | return jobLauncher.run(job, parameters).getId(); |
265 | } |
266 | catch (JobExecutionAlreadyRunningException e) { |
267 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
268 | jobName, parameters), e); |
269 | } |
270 | |
271 | } |
272 | |
273 | /* |
274 | * (non-Javadoc) |
275 | * |
276 | * @see |
277 | * org.springframework.batch.core.launch.JobOperator#start(java.lang.String, |
278 | * java.lang.String) |
279 | */ |
280 | public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException { |
281 | |
282 | logger.info("Checking status of job with name=" + jobName); |
283 | |
284 | JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter |
285 | .stringToProperties(parameters)); |
286 | |
287 | if (jobRepository.isJobInstanceExists(jobName, jobParameters)) { |
288 | throw new JobInstanceAlreadyExistsException(String.format( |
289 | "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName, |
290 | parameters)); |
291 | } |
292 | |
293 | Job job = jobRegistry.getJob(jobName); |
294 | |
295 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
296 | try { |
297 | return jobLauncher.run(job, jobParameters).getId(); |
298 | } |
299 | catch (JobExecutionAlreadyRunningException e) { |
300 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
301 | jobName, parameters), e); |
302 | } |
303 | catch (JobRestartException e) { |
304 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
305 | parameters), e); |
306 | } |
307 | catch (JobInstanceAlreadyCompleteException e) { |
308 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName, |
309 | parameters), e); |
310 | } |
311 | |
312 | } |
313 | |
314 | /* |
315 | * (non-Javadoc) |
316 | * |
317 | * @see JobOperator#startNextInstance(String ) |
318 | */ |
319 | public Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException, |
320 | UnexpectedJobExecutionException, JobParametersInvalidException { |
321 | |
322 | logger.info("Locating parameters for next instance of job with name=" + jobName); |
323 | |
324 | Job job = jobRegistry.getJob(jobName); |
325 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobName, 0, 1); |
326 | |
327 | JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); |
328 | if (incrementer == null) { |
329 | throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName); |
330 | } |
331 | |
332 | JobParameters parameters; |
333 | if (lastInstances.isEmpty()) { |
334 | parameters = incrementer.getNext(new JobParameters()); |
335 | if (parameters == null) { |
336 | throw new JobParametersNotFoundException("No bootstrap parameters found for job=" + jobName); |
337 | } |
338 | } |
339 | else { |
340 | parameters = incrementer.getNext(lastInstances.get(0).getJobParameters()); |
341 | } |
342 | |
343 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
344 | try { |
345 | return jobLauncher.run(job, parameters).getId(); |
346 | } |
347 | catch (JobExecutionAlreadyRunningException e) { |
348 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName, |
349 | parameters), e); |
350 | } |
351 | catch (JobRestartException e) { |
352 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
353 | parameters), e); |
354 | } |
355 | catch (JobInstanceAlreadyCompleteException e) { |
356 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete", |
357 | jobName, parameters), e); |
358 | } |
359 | |
360 | } |
361 | |
362 | /* |
363 | * (non-Javadoc) |
364 | * |
365 | * @see |
366 | * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long) |
367 | */ |
368 | @Transactional |
369 | public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { |
370 | |
371 | JobExecution jobExecution = findExecutionById(executionId); |
372 | // Indicate the execution should be stopped by setting it's status to |
373 | // 'STOPPING'. It is assumed that |
374 | // the step implementation will check this status at chunk boundaries. |
375 | BatchStatus status = jobExecution.getStatus(); |
376 | if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) { |
377 | throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution); |
378 | } |
379 | jobExecution.setStatus(BatchStatus.STOPPING); |
380 | jobRepository.update(jobExecution); |
381 | |
382 | return true; |
383 | } |
384 | |
385 | private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException { |
386 | JobExecution jobExecution = jobExplorer.getJobExecution(executionId); |
387 | |
388 | if (jobExecution == null) { |
389 | throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]"); |
390 | } |
391 | return jobExecution; |
392 | |
393 | } |
394 | |
395 | } |