1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.admin.service;
17
18 import java.io.File;
19 import java.io.FilenameFilter;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Date;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.LinkedHashSet;
28 import java.util.List;
29 import java.util.Properties;
30 import java.util.Set;
31
32 import javax.batch.operations.JobOperator;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 import org.springframework.batch.core.BatchStatus;
38 import org.springframework.batch.core.Job;
39 import org.springframework.batch.core.JobExecution;
40 import org.springframework.batch.core.JobInstance;
41 import org.springframework.batch.core.JobParameters;
42 import org.springframework.batch.core.JobParametersInvalidException;
43 import org.springframework.batch.core.StepExecution;
44 import org.springframework.batch.core.configuration.ListableJobLocator;
45 import org.springframework.batch.core.launch.JobExecutionNotRunningException;
46 import org.springframework.batch.core.launch.JobLauncher;
47 import org.springframework.batch.core.launch.NoSuchJobException;
48 import org.springframework.batch.core.launch.NoSuchJobExecutionException;
49 import org.springframework.batch.core.launch.NoSuchJobInstanceException;
50 import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
51 import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
52 import org.springframework.batch.core.repository.JobRepository;
53 import org.springframework.batch.core.repository.JobRestartException;
54 import org.springframework.batch.core.repository.dao.ExecutionContextDao;
55 import org.springframework.batch.core.step.NoSuchStepException;
56 import org.springframework.batch.core.step.StepLocator;
57 import org.springframework.beans.factory.DisposableBean;
58 import org.springframework.core.io.ClassPathResource;
59 import org.springframework.core.io.Resource;
60 import org.springframework.scheduling.annotation.Scheduled;
61 import org.springframework.util.CollectionUtils;
62
63
64
65
66
67
68
69
70
71 public class SimpleJobService implements JobService, DisposableBean {
72
73 private static final Log logger = LogFactory.getLog(SimpleJobService.class);
74
75
76 private static final int DEFAULT_SHUTDOWN_TIMEOUT = 60 * 1000;
77
78 private final SearchableJobInstanceDao jobInstanceDao;
79
80 private final SearchableJobExecutionDao jobExecutionDao;
81
82 private final JobRepository jobRepository;
83
84 private final JobLauncher jobLauncher;
85
86 private final ListableJobLocator jobLocator;
87
88 private final SearchableStepExecutionDao stepExecutionDao;
89
90 private final ExecutionContextDao executionContextDao;
91
92 private Collection<JobExecution> activeExecutions = Collections.synchronizedList(new ArrayList<JobExecution>());
93
94 private JobOperator jsrJobOperator;
95
96 private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
97
98
99
100
101
102
103 public void setShutdownTimeout(int shutdownTimeout) {
104 this.shutdownTimeout = shutdownTimeout;
105 }
106
107 public SimpleJobService(SearchableJobInstanceDao jobInstanceDao, SearchableJobExecutionDao jobExecutionDao,
108 SearchableStepExecutionDao stepExecutionDao, JobRepository jobRepository, JobLauncher jobLauncher,
109 ListableJobLocator jobLocator, ExecutionContextDao executionContextDao) {
110 this(jobInstanceDao, jobExecutionDao, stepExecutionDao, jobRepository, jobLauncher, jobLocator, executionContextDao, null);
111 }
112
113 public SimpleJobService(SearchableJobInstanceDao jobInstanceDao, SearchableJobExecutionDao jobExecutionDao,
114 SearchableStepExecutionDao stepExecutionDao, JobRepository jobRepository, JobLauncher jobLauncher,
115 ListableJobLocator jobLocator, ExecutionContextDao executionContextDao, JobOperator jsrJobOperator) {
116 super();
117 this.jobInstanceDao = jobInstanceDao;
118 this.jobExecutionDao = jobExecutionDao;
119 this.stepExecutionDao = stepExecutionDao;
120 this.jobRepository = jobRepository;
121 this.jobLauncher = jobLauncher;
122 this.jobLocator = jobLocator;
123 this.executionContextDao = executionContextDao;
124
125 if(jsrJobOperator == null) {
126 logger.warn("No JobOperator compatible with JSR-352 was provided.");
127 }
128 else {
129 this.jsrJobOperator = jsrJobOperator;
130 }
131 }
132
133 @Override
134 public Collection<StepExecution> getStepExecutions(Long jobExecutionId) throws NoSuchJobExecutionException {
135
136 JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
137 if (jobExecution == null) {
138 throw new NoSuchJobExecutionException("No JobExecution with id=" + jobExecutionId);
139 }
140
141 stepExecutionDao.addStepExecutions(jobExecution);
142
143 String jobName = jobExecution.getJobInstance() == null ? jobInstanceDao.getJobInstance(jobExecution).getJobName() : jobExecution.getJobInstance().getJobName();
144 Collection<String> missingStepNames = new LinkedHashSet<String>();
145
146 if (jobName != null) {
147 missingStepNames.addAll(stepExecutionDao.findStepNamesForJobExecution(jobName, "*:partition*"));
148 logger.debug("Found step executions in repository: " + missingStepNames);
149 }
150
151 Job job = null;
152 try {
153 job = jobLocator.getJob(jobName);
154 }
155 catch (NoSuchJobException e) {
156
157 }
158 if (job instanceof StepLocator) {
159 Collection<String> stepNames = ((StepLocator) job).getStepNames();
160 missingStepNames.addAll(stepNames);
161 logger.debug("Added step executions from job: " + missingStepNames);
162 }
163
164 for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
165 String stepName = stepExecution.getStepName();
166 if (missingStepNames.contains(stepName)) {
167 missingStepNames.remove(stepName);
168 }
169 logger.debug("Removed step executions from job execution: " + missingStepNames);
170 }
171
172 for (String stepName : missingStepNames) {
173 StepExecution stepExecution = jobExecution.createStepExecution(stepName);
174 stepExecution.setStatus(BatchStatus.UNKNOWN);
175 }
176
177 return jobExecution.getStepExecutions();
178
179 }
180
181 @Override
182 public boolean isLaunchable(String jobName) {
183 return jobLocator.getJobNames().contains(jobName) || getJsrJobNames().contains(jobName);
184 }
185
186 @Override
187 public boolean isIncrementable(String jobName) {
188 try {
189 return jobLocator.getJobNames().contains(jobName)
190 && jobLocator.getJob(jobName).getJobParametersIncrementer() != null;
191 }
192 catch (NoSuchJobException e) {
193
194 throw new IllegalStateException("Unexpected non-existent job: " + jobName);
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210 @Override
211 public JobExecution restart(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersInvalidException {
212 return restart(jobExecutionId, null);
213 }
214
215 @Override
216 public JobExecution restart(Long jobExecutionId, JobParameters params) throws NoSuchJobExecutionException,
217 JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
218 NoSuchJobException, JobParametersInvalidException {
219
220 JobExecution jobExecution = null;
221
222 JobExecution target = getJobExecution(jobExecutionId);
223 JobInstance lastInstance = target.getJobInstance();
224
225 if(jobLocator.getJobNames().contains(lastInstance.getJobName())) {
226 Job job = jobLocator.getJob(lastInstance.getJobName());
227
228 jobExecution = jobLauncher.run(job, target.getJobParameters());
229
230 if (jobExecution.isRunning()) {
231 activeExecutions.add(jobExecution);
232 }
233 }
234 else {
235 if(jsrJobOperator != null) {
236 if(params != null) {
237 jobExecution = new JobExecution(jsrJobOperator.restart(jobExecutionId, params.toProperties()));
238 }
239 else {
240 jobExecution = new JobExecution(jsrJobOperator.restart(jobExecutionId, new Properties()));
241 }
242 }
243 else {
244 throw new NoSuchJobException(String.format("Can't find job associated with job execution id %s to restart",
245 String.valueOf(jobExecutionId)));
246 }
247 }
248
249 return jobExecution;
250 }
251
252 @Override
253 public JobExecution launch(String jobName, JobParameters jobParameters) throws NoSuchJobException,
254 JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
255 JobParametersInvalidException {
256
257 JobExecution jobExecution = null;
258
259 if(jobLocator.getJobNames().contains(jobName)) {
260 Job job = jobLocator.getJob(jobName);
261
262 JobExecution lastJobExecution = jobRepository.getLastJobExecution(jobName, jobParameters);
263 boolean restart = false;
264 if (lastJobExecution != null) {
265 BatchStatus status = lastJobExecution.getStatus();
266 if (status.isUnsuccessful() && status!=BatchStatus.ABANDONED) {
267 restart = true;
268 }
269 }
270
271 if (job.getJobParametersIncrementer() != null && !restart) {
272 jobParameters = job.getJobParametersIncrementer().getNext(jobParameters);
273 }
274
275 jobExecution = jobLauncher.run(job, jobParameters);
276
277 if (jobExecution.isRunning()) {
278 activeExecutions.add(jobExecution);
279 }
280 }
281 else {
282 if(jsrJobOperator != null) {
283 jobExecution = new JobExecution(jsrJobOperator.start(jobName, jobParameters.toProperties()));
284 }
285 else {
286 throw new NoSuchJobException(String.format("Unable to find job %s to launch",
287 String.valueOf(jobName)));
288 }
289 }
290
291 return jobExecution;
292 }
293
294 @Override
295 public JobParameters getLastJobParameters(String jobName) throws NoSuchJobException {
296
297 Collection<JobExecution> executions = jobExecutionDao.getJobExecutions(jobName, 0, 1);
298
299 JobExecution lastExecution = null;
300 if (!CollectionUtils.isEmpty(executions)) {
301 lastExecution = executions.iterator().next();
302 }
303
304 JobParameters oldParameters = new JobParameters();
305 if (lastExecution != null) {
306 oldParameters = lastExecution.getJobParameters();
307 }
308
309 return oldParameters;
310
311 }
312
313 @Override
314 public Collection<JobExecution> listJobExecutions(int start, int count) {
315 return jobExecutionDao.getJobExecutions(start, count);
316 }
317
318 @Override
319 public int countJobExecutions() {
320 return jobExecutionDao.countJobExecutions();
321 }
322
323 @Override
324 public Collection<String> listJobs(int start, int count) {
325 Collection<String> jobNames = new LinkedHashSet<String>(jobLocator.getJobNames());
326 jobNames.addAll(getJsrJobNames());
327 if (start + count > jobNames.size()) {
328 jobNames.addAll(jobInstanceDao.getJobNames());
329 }
330 if (start >= jobNames.size()) {
331 start = jobNames.size();
332 }
333 if (start + count >= jobNames.size()) {
334 count = jobNames.size() - start;
335 }
336 return new ArrayList<String>(jobNames).subList(start, start + count);
337 }
338
339 private Collection<String> getJsrJobNames() {
340 Resource jsrJobsDirectory = new ClassPathResource("/META-INF/batch-jobs");
341 Set<String> jsr352JobNames = new HashSet<String>();
342
343 if(jsrJobsDirectory.exists()) {
344 try {
345 File [] jobXmlFiles = jsrJobsDirectory.getFile().listFiles(new FilenameFilter() {
346 @Override
347 public boolean accept(File dir, String name) {
348 return name.endsWith(".xml");
349 }
350 });
351
352 for (File jobXmlFile : jobXmlFiles) {
353 jsr352JobNames.add(jobXmlFile.getName().substring(0, jobXmlFile.getName().length() - 4));
354 }
355 }
356 catch (IOException e) {
357 logger.debug("Unable to list JSR-352 batch jobs", e);
358 }
359 }
360
361 return jsr352JobNames;
362 }
363
364 @Override
365 public int countJobs() {
366 Collection<String> names = new HashSet<String>(jobLocator.getJobNames());
367 names.addAll(jobInstanceDao.getJobNames());
368 return names.size();
369 }
370
371 @Override
372 public int stopAll() {
373 Collection<JobExecution> result = jobExecutionDao.getRunningJobExecutions();
374 Collection<String> jsrJobNames = getJsrJobNames();
375
376 for (JobExecution jobExecution : result) {
377 if(jsrJobOperator != null && jsrJobNames.contains(jobExecution.getJobInstance().getJobName())) {
378 jsrJobOperator.stop(jobExecution.getId());
379 }
380 else {
381 jobExecution.stop();
382 jobRepository.update(jobExecution);
383 }
384 }
385
386 return result.size();
387 }
388
389 @Override
390 public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
391
392 JobExecution jobExecution = getJobExecution(jobExecutionId);
393 if (!jobExecution.isRunning()) {
394 throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped");
395 }
396
397 logger.info("Stopping job execution: " + jobExecution);
398
399 Collection<String> jsrJobNames = getJsrJobNames();
400
401 if(jsrJobOperator != null && jsrJobNames.contains(jobExecution.getJobInstance().getJobName())) {
402 jsrJobOperator.stop(jobExecutionId);
403 jobExecution = getJobExecution(jobExecutionId);
404 }
405 else {
406 jobExecution.stop();
407 jobRepository.update(jobExecution);
408 }
409 return jobExecution;
410
411 }
412
413 @Override
414 public JobExecution abandon(Long jobExecutionId) throws NoSuchJobExecutionException,
415 JobExecutionAlreadyRunningException {
416
417 JobExecution jobExecution = getJobExecution(jobExecutionId);
418 if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) {
419 throw new JobExecutionAlreadyRunningException(
420 "JobExecution is running or complete and therefore cannot be aborted");
421 }
422
423 logger.info("Aborting job execution: " + jobExecution);
424
425 Collection<String> jsrJobNames = getJsrJobNames();
426
427 JobInstance jobInstance = jobExecution.getJobInstance();
428 if(jsrJobOperator != null && jsrJobNames.contains(jobInstance.getJobName())) {
429 jsrJobOperator.abandon(jobExecutionId);
430 jobExecution = getJobExecution(jobExecutionId);
431 }
432 else {
433 jobExecution.upgradeStatus(BatchStatus.ABANDONED);
434 jobExecution.setEndTime(new Date());
435 jobRepository.update(jobExecution);
436 }
437
438 return jobExecution;
439
440 }
441
442 @Override
443 public int countJobExecutionsForJob(String name) throws NoSuchJobException {
444 checkJobExists(name);
445 return jobExecutionDao.countJobExecutions(name);
446 }
447
448 @Override
449 public int countJobInstances(String name) throws NoSuchJobException {
450 return jobInstanceDao.countJobInstances(name);
451 }
452
453 @Override
454 public JobExecution getJobExecution(Long jobExecutionId) throws NoSuchJobExecutionException {
455 JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
456 if (jobExecution == null) {
457 throw new NoSuchJobExecutionException("There is no JobExecution with id=" + jobExecutionId);
458 }
459 jobExecution.setJobInstance(jobInstanceDao.getJobInstance(jobExecution));
460 try {
461 jobExecution.setExecutionContext(executionContextDao.getExecutionContext(jobExecution));
462 }
463 catch (Exception e) {
464 logger.info("Cannot load execution context for job execution: " + jobExecution);
465 }
466 stepExecutionDao.addStepExecutions(jobExecution);
467 return jobExecution;
468 }
469
470 @Override
471 public Collection<JobExecution> getJobExecutionsForJobInstance(String name, Long jobInstanceId)
472 throws NoSuchJobException {
473 checkJobExists(name);
474 List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstanceDao
475 .getJobInstance(jobInstanceId));
476 for (JobExecution jobExecution : jobExecutions) {
477 stepExecutionDao.addStepExecutions(jobExecution);
478 }
479 return jobExecutions;
480 }
481
482 @Override
483 public StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId)
484 throws NoSuchJobExecutionException, NoSuchStepExecutionException {
485 JobExecution jobExecution = getJobExecution(jobExecutionId);
486 StepExecution stepExecution = stepExecutionDao.getStepExecution(jobExecution, stepExecutionId);
487 if (stepExecution == null) {
488 throw new NoSuchStepExecutionException("There is no StepExecution with jobExecutionId=" + jobExecutionId
489 + " and id=" + stepExecutionId);
490 }
491 try {
492 stepExecution.setExecutionContext(executionContextDao.getExecutionContext(stepExecution));
493 }
494 catch (Exception e) {
495 logger.info("Cannot load execution context for step execution: " + stepExecution);
496 }
497 return stepExecution;
498 }
499
500 @Override
501 public Collection<JobExecution> listJobExecutionsForJob(String jobName, int start, int count)
502 throws NoSuchJobException {
503 checkJobExists(jobName);
504 List<JobExecution> jobExecutions = jobExecutionDao.getJobExecutions(jobName, start, count);
505 for (JobExecution jobExecution : jobExecutions) {
506 stepExecutionDao.addStepExecutions(jobExecution);
507 }
508 return jobExecutions;
509 }
510
511 @Override
512 public Collection<StepExecution> listStepExecutionsForStep(String jobName, String stepName, int start, int count)
513 throws NoSuchStepException {
514 if (stepExecutionDao.countStepExecutions(jobName, stepName) == 0) {
515 throw new NoSuchStepException("No step executions exist with this step name: " + stepName);
516 }
517 return stepExecutionDao.findStepExecutions(jobName, stepName, start, count);
518 }
519
520 @Override
521 public int countStepExecutionsForStep(String jobName, String stepName) throws NoSuchStepException {
522 return stepExecutionDao.countStepExecutions(jobName, stepName);
523 }
524
525 @Override
526 public JobInstance getJobInstance(long jobInstanceId) throws NoSuchJobInstanceException {
527 JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
528 if (jobInstance == null) {
529 throw new NoSuchJobInstanceException("JobInstance with id=" + jobInstanceId + " does not exist");
530 }
531 return jobInstance;
532 }
533
534 @Override
535 public Collection<JobInstance> listJobInstances(String jobName, int start, int count) throws NoSuchJobException {
536 checkJobExists(jobName);
537 return jobInstanceDao.getJobInstances(jobName, start, count);
538 }
539
540 @Override
541 public Collection<String> getStepNamesForJob(String jobName) throws NoSuchJobException {
542 try {
543 Job job = jobLocator.getJob(jobName);
544 if (job instanceof StepLocator) {
545 return ((StepLocator) job).getStepNames();
546 }
547 }
548 catch (NoSuchJobException e) {
549
550 }
551 Collection<String> stepNames = new LinkedHashSet<String>();
552 for (JobExecution jobExecution : listJobExecutionsForJob(jobName, 0, 100)) {
553 for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
554 stepNames.add(stepExecution.getStepName());
555 }
556 }
557 return Collections.unmodifiableList(new ArrayList<String>(stepNames));
558 }
559
560 private void checkJobExists(String jobName) throws NoSuchJobException {
561 if(getJsrJobNames().contains(jobName)) {
562 return;
563 }
564 if (jobLocator.getJobNames().contains(jobName)) {
565 return;
566 }
567 if (jobInstanceDao.countJobInstances(jobName) > 0) {
568 return;
569 }
570 throw new NoSuchJobException("No Job with that name either current or historic: [" + jobName + "]");
571 }
572
573
574
575
576
577 @Override
578 public void destroy() throws Exception {
579
580 Exception firstException = null;
581
582 for (JobExecution jobExecution : activeExecutions) {
583 try {
584 if (jobExecution.isRunning()) {
585 stop(jobExecution.getId());
586 }
587 }
588 catch (JobExecutionNotRunningException e) {
589 logger.info("JobExecution is not running so it cannot be stopped");
590 }
591 catch (Exception e) {
592 logger.error("Unexpected exception stopping JobExecution", e);
593 if (firstException == null) {
594 firstException = e;
595 }
596 }
597 }
598
599 int count = 0;
600 int maxCount = (shutdownTimeout + 1000) / 1000;
601 while (!activeExecutions.isEmpty() && ++count < maxCount) {
602 logger.error("Waiting for " + activeExecutions.size() + " active executions to complete");
603 removeInactiveExecutions();
604 Thread.sleep(1000L);
605 }
606
607 if (firstException != null) {
608 throw firstException;
609 }
610
611 }
612
613
614
615
616
617 @Scheduled(fixedDelay = 60000)
618 public void removeInactiveExecutions() {
619
620 for (Iterator<JobExecution> iterator = activeExecutions.iterator(); iterator.hasNext();) {
621 JobExecution jobExecution = iterator.next();
622 try {
623 jobExecution = getJobExecution(jobExecution.getId());
624 }
625 catch (NoSuchJobExecutionException e) {
626 logger.error("Unexpected exception loading JobExecution", e);
627 }
628 if (!jobExecution.isRunning()) {
629 iterator.remove();
630 }
631 }
632
633 }
634
635 }