1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.admin.launch;
17
18 import java.util.Date;
19 import java.util.HashSet;
20 import java.util.Set;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.aspectj.lang.annotation.AfterReturning;
25 import org.aspectj.lang.annotation.Aspect;
26 import org.aspectj.lang.annotation.Before;
27 import org.springframework.batch.core.BatchStatus;
28 import org.springframework.batch.core.ExitStatus;
29 import org.springframework.batch.core.Job;
30 import org.springframework.batch.core.JobExecution;
31 import org.springframework.batch.core.explore.JobExplorer;
32 import org.springframework.batch.core.launch.JobLauncher;
33 import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
34 import org.springframework.batch.core.repository.JobRepository;
35 import org.springframework.beans.factory.InitializingBean;
36 import org.springframework.jmx.export.annotation.ManagedAttribute;
37 import org.springframework.jmx.export.annotation.ManagedOperation;
38 import org.springframework.jmx.export.annotation.ManagedResource;
39 import org.springframework.util.Assert;
40
41
42
43
44
45
46
47
48 @Aspect
49 @ManagedResource
50 public class JobLauncherSynchronizer implements InitializingBean {
51
52 private static final Log logger = LogFactory.getLog(JobLauncherSynchronizer.class);
53
54 private JobExplorer jobExplorer;
55
56 private JobRepository jobRepository;
57
58 private Set<String> jobNames = new HashSet<String>();
59
60
61
62
63
64
65 public void setJobExplorer(JobExplorer jobExplorer) {
66 this.jobExplorer = jobExplorer;
67 }
68
69
70
71
72
73
74 public void setJobRepository(JobRepository jobRepository) {
75 this.jobRepository = jobRepository;
76 }
77
78
79
80
81
82
83 public void setJobNames(Set<String> jobNames) {
84 this.jobNames = jobNames;
85 }
86
87
88
89
90
91
92 @ManagedOperation
93 public void addJobName(String jobName) {
94 this.jobNames.add(jobName);
95 }
96
97
98
99
100
101
102 @ManagedOperation
103 public void removeJobName(String jobName) {
104 this.jobNames.remove(jobName);
105 }
106
107
108
109
110 @ManagedAttribute
111 public Set<String> getJobNames() {
112 return jobNames;
113 }
114
115 public void afterPropertiesSet() throws Exception {
116 Assert.notNull(jobExplorer, "A JobExplorer must be provided");
117 Assert.notNull(jobRepository, "A JobRepository must be provided");
118 }
119
120 @Before("execution(* org.springframework.batch..JobLauncher+.*(..)) && args(job,..)")
121 public void checkJobBeforeLaunch(Job job) throws JobExecutionAlreadyRunningException {
122 String jobName = job.getName();
123 logger.debug("Checking for synchronization on Job: " + jobName);
124 if (!jobNames.contains(jobName)) {
125 logger.debug("Not synchronizing Job: " + jobName);
126 return;
127 }
128 Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
129 if (!running.isEmpty()) {
130 throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
131 }
132 logger.debug("Job checked and no duplicates detected: " + jobName);
133 }
134
135 @AfterReturning(value = "execution(* org.springframework.batch..JobRepository+.createJobExecution(..)) && args(jobName,..)", returning = "jobExecution")
136 public void checkJobDuringLaunch(String jobName, JobExecution jobExecution)
137 throws JobExecutionAlreadyRunningException {
138 logger.debug("Re-checking for synchronization on JobExecution: " + jobExecution);
139 if (!jobNames.contains(jobName)) {
140 logger.debug("Not re-checking for synchronization of Job: " + jobName);
141 return;
142 }
143 Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName);
144 if (running.size() > 1) {
145 jobExecution.setEndTime(new Date());
146 jobExecution.upgradeStatus(BatchStatus.ABANDONED);
147 jobExecution.setExitStatus(jobExecution.getExitStatus().and(ExitStatus.NOOP).addExitDescription(
148 "Not executed because another execution was detected for the same Job."));
149 jobRepository.update(jobExecution);
150 throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName);
151 }
152 }
153 }