1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.launch.support; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.LinkedHashMap; |
20 | import java.util.LinkedHashSet; |
21 | import java.util.List; |
22 | import java.util.Map; |
23 | import java.util.Set; |
24 | import java.util.TreeSet; |
25 | |
26 | import org.apache.commons.logging.Log; |
27 | import org.apache.commons.logging.LogFactory; |
28 | import org.springframework.batch.core.BatchStatus; |
29 | import org.springframework.batch.core.Job; |
30 | import org.springframework.batch.core.JobExecution; |
31 | import org.springframework.batch.core.JobInstance; |
32 | import org.springframework.batch.core.JobParameters; |
33 | import org.springframework.batch.core.JobParametersIncrementer; |
34 | import org.springframework.batch.core.StepExecution; |
35 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
36 | import org.springframework.batch.core.configuration.JobRegistry; |
37 | import org.springframework.batch.core.configuration.ListableJobRegistry; |
38 | import org.springframework.batch.core.converter.DefaultJobParametersConverter; |
39 | import org.springframework.batch.core.converter.JobParametersConverter; |
40 | import org.springframework.batch.core.explore.JobExplorer; |
41 | import org.springframework.batch.core.launch.JobExecutionNotRunningException; |
42 | import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; |
43 | import org.springframework.batch.core.launch.JobLauncher; |
44 | import org.springframework.batch.core.launch.JobOperator; |
45 | import org.springframework.batch.core.launch.JobParametersNotFoundException; |
46 | import org.springframework.batch.core.launch.NoSuchJobException; |
47 | import org.springframework.batch.core.launch.NoSuchJobExecutionException; |
48 | import org.springframework.batch.core.launch.NoSuchJobInstanceException; |
49 | import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; |
50 | import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; |
51 | import org.springframework.batch.core.repository.JobRepository; |
52 | import org.springframework.batch.core.repository.JobRestartException; |
53 | import org.springframework.batch.support.PropertiesConverter; |
54 | import org.springframework.beans.factory.InitializingBean; |
55 | import org.springframework.transaction.annotation.Transactional; |
56 | import org.springframework.util.Assert; |
57 | |
58 | /** |
59 | * Simple implementation of the JobOperator interface. Due to the amount of |
60 | * functionality the implementation is combining, the following dependencies |
61 | * are required: |
62 | * |
63 | * <ul> |
64 | * <li> {@link JobLauncher} |
65 | * <li> {@link JobExplorer} |
66 | * <li> {@link JobRepository} |
67 | * <li> {@link JobRegistry} |
68 | * </ul> |
69 | * |
70 | * @author Dave Syer |
71 | * @author Lucas Ward |
72 | * @since 2.0 |
73 | */ |
74 | public class SimpleJobOperator implements JobOperator, InitializingBean { |
75 | |
76 | /** |
77 | * |
78 | */ |
79 | private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): " |
80 | + "%s with name=%s and parameters=%s"; |
81 | |
82 | private ListableJobRegistry jobRegistry; |
83 | |
84 | private JobExplorer jobExplorer; |
85 | |
86 | private JobLauncher jobLauncher; |
87 | |
88 | private JobRepository jobRepository; |
89 | |
90 | private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); |
91 | |
92 | private final Log logger = LogFactory.getLog(getClass()); |
93 | |
94 | /** |
95 | * Check mandatory properties. |
96 | * |
97 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
98 | */ |
99 | public void afterPropertiesSet() throws Exception { |
100 | Assert.notNull(jobLauncher, "JobLauncher must be provided"); |
101 | Assert.notNull(jobRegistry, "JobLocator must be provided"); |
102 | Assert.notNull(jobExplorer, "JobExplorer must be provided"); |
103 | Assert.notNull(jobRepository, "JobRepository must be provided"); |
104 | } |
105 | |
106 | /** |
107 | * Public setter for the {@link JobParametersConverter}. |
108 | * @param jobParametersConverter the {@link JobParametersConverter} to set |
109 | */ |
110 | public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { |
111 | this.jobParametersConverter = jobParametersConverter; |
112 | } |
113 | |
114 | /** |
115 | * Public setter for the {@link ListableJobRegistry}. |
116 | * @param jobRegistry the {@link ListableJobRegistry} to set |
117 | */ |
118 | public void setJobRegistry(ListableJobRegistry jobRegistry) { |
119 | this.jobRegistry = jobRegistry; |
120 | } |
121 | |
122 | /** |
123 | * Public setter for the {@link JobExplorer}. |
124 | * @param jobExplorer the {@link JobExplorer} to set |
125 | */ |
126 | public void setJobExplorer(JobExplorer jobExplorer) { |
127 | this.jobExplorer = jobExplorer; |
128 | } |
129 | |
130 | public void setJobRepository(JobRepository jobRepository) { |
131 | this.jobRepository = jobRepository; |
132 | } |
133 | |
134 | /** |
135 | * Public setter for the {@link JobLauncher}. |
136 | * @param jobLauncher the {@link JobLauncher} to set |
137 | */ |
138 | public void setJobLauncher(JobLauncher jobLauncher) { |
139 | this.jobLauncher = jobLauncher; |
140 | } |
141 | |
142 | /* |
143 | * (non-Javadoc) |
144 | * |
145 | * @see |
146 | * org.springframework.batch.core.launch.JobOperator#getExecutions(java. |
147 | * lang.Long) |
148 | */ |
149 | public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException { |
150 | JobInstance jobInstance = jobExplorer.getJobInstance(instanceId); |
151 | if (jobInstance == null) { |
152 | throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId)); |
153 | } |
154 | List<Long> list = new ArrayList<Long>(); |
155 | for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) { |
156 | list.add(jobExecution.getId()); |
157 | } |
158 | return list; |
159 | } |
160 | |
161 | /* |
162 | * (non-Javadoc) |
163 | * |
164 | * @see org.springframework.batch.core.launch.JobOperator#getJobNames() |
165 | */ |
166 | public Set<String> getJobNames() { |
167 | return new TreeSet<String>(jobRegistry.getJobNames()); |
168 | } |
169 | |
170 | /* |
171 | * (non-Javadoc) |
172 | * |
173 | * @see JobOperator#getLastInstances(String, int, int) |
174 | */ |
175 | public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException { |
176 | List<Long> list = new ArrayList<Long>(); |
177 | for (JobInstance jobInstance : jobExplorer.getJobInstances(jobName, start, count)) { |
178 | list.add(jobInstance.getId()); |
179 | } |
180 | if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
181 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
182 | } |
183 | return list; |
184 | } |
185 | |
186 | /* |
187 | * (non-Javadoc) |
188 | * |
189 | * @see |
190 | * org.springframework.batch.core.launch.JobOperator#getParameters(java. |
191 | * lang.Long) |
192 | */ |
193 | public String getParameters(long executionId) throws NoSuchJobExecutionException { |
194 | JobExecution jobExecution = findExecutionById(executionId); |
195 | |
196 | return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution |
197 | .getJobInstance().getJobParameters())); |
198 | } |
199 | |
200 | /* |
201 | * (non-Javadoc) |
202 | * |
203 | * @see |
204 | * org.springframework.batch.core.launch.JobOperator#getRunningExecutions |
205 | * (java.lang.String) |
206 | */ |
207 | public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException { |
208 | Set<Long> set = new LinkedHashSet<Long>(); |
209 | for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) { |
210 | set.add(jobExecution.getId()); |
211 | } |
212 | if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { |
213 | throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); |
214 | } |
215 | return set; |
216 | } |
217 | |
218 | /* |
219 | * (non-Javadoc) |
220 | * |
221 | * @see |
222 | * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries |
223 | * (java.lang.Long) |
224 | */ |
225 | public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException { |
226 | JobExecution jobExecution = findExecutionById(executionId); |
227 | |
228 | Map<Long, String> map = new LinkedHashMap<Long, String>(); |
229 | for (StepExecution stepExecution : jobExecution.getStepExecutions()) { |
230 | map.put(stepExecution.getId(), stepExecution.toString()); |
231 | } |
232 | return map; |
233 | } |
234 | |
235 | /* |
236 | * (non-Javadoc) |
237 | * |
238 | * @see |
239 | * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang |
240 | * .Long) |
241 | */ |
242 | public String getSummary(long executionId) throws NoSuchJobExecutionException { |
243 | JobExecution jobExecution = findExecutionById(executionId); |
244 | return jobExecution.toString(); |
245 | } |
246 | |
247 | /* |
248 | * (non-Javadoc) |
249 | * |
250 | * @see |
251 | * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long) |
252 | */ |
253 | public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, |
254 | NoSuchJobException, JobRestartException { |
255 | |
256 | logger.info("Checking status of job execution with id=" + executionId); |
257 | |
258 | JobExecution jobExecution = findExecutionById(executionId); |
259 | |
260 | String jobName = jobExecution.getJobInstance().getJobName(); |
261 | Job job = jobRegistry.getJob(jobName); |
262 | JobParameters parameters = jobExecution.getJobInstance().getJobParameters(); |
263 | |
264 | logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters)); |
265 | try { |
266 | return jobLauncher.run(job, parameters).getId(); |
267 | } |
268 | catch (JobExecutionAlreadyRunningException e) { |
269 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
270 | jobName, parameters), e); |
271 | } |
272 | |
273 | } |
274 | |
275 | /* |
276 | * (non-Javadoc) |
277 | * |
278 | * @see |
279 | * org.springframework.batch.core.launch.JobOperator#start(java.lang.String, |
280 | * java.lang.String) |
281 | */ |
282 | public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException { |
283 | |
284 | logger.info("Checking status of job with name=" + jobName); |
285 | |
286 | JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter |
287 | .stringToProperties(parameters)); |
288 | |
289 | if (jobRepository.isJobInstanceExists(jobName, jobParameters)) { |
290 | throw new JobInstanceAlreadyExistsException(String.format( |
291 | "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName, |
292 | parameters)); |
293 | } |
294 | |
295 | Job job = jobRegistry.getJob(jobName); |
296 | |
297 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
298 | try { |
299 | return jobLauncher.run(job, jobParameters).getId(); |
300 | } |
301 | catch (JobExecutionAlreadyRunningException e) { |
302 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", |
303 | jobName, parameters), e); |
304 | } |
305 | catch (JobRestartException e) { |
306 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
307 | parameters), e); |
308 | } |
309 | catch (JobInstanceAlreadyCompleteException e) { |
310 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName, |
311 | parameters), e); |
312 | } |
313 | |
314 | } |
315 | |
316 | /* |
317 | * (non-Javadoc) |
318 | * |
319 | * @see JobOperator#startNextInstance(String ) |
320 | */ |
321 | public Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException, |
322 | UnexpectedJobExecutionException { |
323 | |
324 | logger.info("Locating parameters for next instance of job with name=" + jobName); |
325 | |
326 | Job job = jobRegistry.getJob(jobName); |
327 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobName, 0, 1); |
328 | |
329 | JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); |
330 | if (incrementer == null) { |
331 | throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName); |
332 | } |
333 | |
334 | JobParameters parameters; |
335 | if (lastInstances.isEmpty()) { |
336 | parameters = incrementer.getNext(new JobParameters()); |
337 | if (parameters == null) { |
338 | throw new JobParametersNotFoundException("No bootstrap parameters found for job=" + jobName); |
339 | } |
340 | } |
341 | else { |
342 | parameters = incrementer.getNext(lastInstances.get(0).getJobParameters()); |
343 | } |
344 | |
345 | logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); |
346 | try { |
347 | return jobLauncher.run(job, parameters).getId(); |
348 | } |
349 | catch (JobExecutionAlreadyRunningException e) { |
350 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName, |
351 | parameters), e); |
352 | } |
353 | catch (JobRestartException e) { |
354 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, |
355 | parameters), e); |
356 | } |
357 | catch (JobInstanceAlreadyCompleteException e) { |
358 | throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete", |
359 | jobName, parameters), e); |
360 | } |
361 | |
362 | } |
363 | |
364 | /* |
365 | * (non-Javadoc) |
366 | * |
367 | * @see |
368 | * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long) |
369 | */ |
370 | @Transactional |
371 | public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { |
372 | |
373 | JobExecution jobExecution = findExecutionById(executionId); |
374 | // Indicate the execution should be stopped by setting it's status to |
375 | // 'STOPPING'. It is assumed that |
376 | // the step implementation will check this status at chunk boundaries. |
377 | BatchStatus status = jobExecution.getStatus(); |
378 | if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) { |
379 | throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution); |
380 | } |
381 | jobExecution.setStatus(BatchStatus.STOPPING); |
382 | jobRepository.update(jobExecution); |
383 | |
384 | return true; |
385 | } |
386 | |
387 | private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException { |
388 | JobExecution jobExecution = jobExplorer.getJobExecution(executionId); |
389 | |
390 | if (jobExecution == null) { |
391 | throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]"); |
392 | } |
393 | return jobExecution; |
394 | |
395 | } |
396 | |
397 | } |