1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.sql.ResultSet; |
20 | import java.sql.SQLException; |
21 | import java.sql.Types; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Set; |
25 | |
26 | import org.apache.commons.logging.Log; |
27 | import org.apache.commons.logging.LogFactory; |
28 | import org.springframework.batch.core.BatchStatus; |
29 | import org.springframework.batch.core.ExitStatus; |
30 | import org.springframework.batch.core.JobExecution; |
31 | import org.springframework.batch.core.JobInstance; |
32 | import org.springframework.beans.factory.InitializingBean; |
33 | import org.springframework.dao.EmptyResultDataAccessException; |
34 | import org.springframework.dao.OptimisticLockingFailureException; |
35 | import org.springframework.jdbc.core.RowCallbackHandler; |
36 | import org.springframework.jdbc.core.simple.ParameterizedRowMapper; |
37 | import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; |
38 | import org.springframework.util.Assert; |
39 | |
40 | /** |
41 | * Jdbc implementation of {@link JobExecutionDao}. Uses sequences (via Spring's |
42 | * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys |
43 | * before inserting a new row. Objects are checked to ensure all mandatory |
44 | * fields to be stored are not null. If any are found to be null, an |
45 | * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, |
46 | * however, the exception will be fairly vague, and fails to highlight which |
47 | * field caused the exception. |
48 | * |
49 | * @author Lucas Ward |
50 | * @author Dave Syer |
51 | * @author Robert Kasanicky |
52 | */ |
53 | public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean { |
54 | |
55 | private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class); |
56 | |
57 | private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, " |
58 | + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; |
59 | |
60 | private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?"; |
61 | |
62 | private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; |
63 | |
64 | private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, " |
65 | + " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?"; |
66 | |
67 | private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" |
68 | + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc"; |
69 | |
70 | private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION " |
71 | + "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID = (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E.JOB_INSTANCE_ID = E2.JOB_INSTANCE_ID)"; |
72 | |
73 | private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" |
74 | + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; |
75 | |
76 | private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " |
77 | + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; |
78 | |
79 | private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?"; |
80 | |
81 | private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH; |
82 | |
83 | private DataFieldMaxValueIncrementer jobExecutionIncrementer; |
84 | |
85 | /** |
86 | * Public setter for the exit message length in database. Do not set this if |
87 | * you haven't modified the schema. |
88 | * @param exitMessageLength the exitMessageLength to set |
89 | */ |
90 | public void setExitMessageLength(int exitMessageLength) { |
91 | this.exitMessageLength = exitMessageLength; |
92 | } |
93 | |
94 | /** |
95 | * Setter for {@link DataFieldMaxValueIncrementer} to be used when |
96 | * generating primary keys for {@link JobExecution} instances. |
97 | * |
98 | * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer} |
99 | */ |
100 | public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { |
101 | this.jobExecutionIncrementer = jobExecutionIncrementer; |
102 | } |
103 | |
104 | public void afterPropertiesSet() throws Exception { |
105 | super.afterPropertiesSet(); |
106 | Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null."); |
107 | } |
108 | |
109 | public List<JobExecution> findJobExecutions(final JobInstance job) { |
110 | |
111 | Assert.notNull(job, "Job cannot be null."); |
112 | Assert.notNull(job.getId(), "Job Id cannot be null."); |
113 | |
114 | return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId()); |
115 | } |
116 | |
117 | /** |
118 | * |
119 | * SQL implementation using Sequences via the Spring incrementer |
120 | * abstraction. Once a new id has been obtained, the JobExecution is saved |
121 | * via a SQL INSERT statement. |
122 | * |
123 | * @see JobExecutionDao#saveJobExecution(JobExecution) |
124 | * @throws IllegalArgumentException if jobExecution is null, as well as any |
125 | * of it's fields to be persisted. |
126 | */ |
127 | public void saveJobExecution(JobExecution jobExecution) { |
128 | |
129 | validateJobExecution(jobExecution); |
130 | |
131 | jobExecution.incrementVersion(); |
132 | |
133 | jobExecution.setId(jobExecutionIncrementer.nextLongValue()); |
134 | Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), |
135 | jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(), |
136 | jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(), |
137 | jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated() }; |
138 | getJdbcTemplate().getJdbcOperations().update( |
139 | getQuery(SAVE_JOB_EXECUTION), |
140 | parameters, |
141 | new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, |
142 | Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }); |
143 | } |
144 | |
145 | /** |
146 | * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and |
147 | * Status cannot be null. |
148 | * |
149 | * @param jobExecution |
150 | * @throws IllegalArgumentException |
151 | */ |
152 | private void validateJobExecution(JobExecution jobExecution) { |
153 | |
154 | Assert.notNull(jobExecution); |
155 | Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null."); |
156 | Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null."); |
157 | Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null"); |
158 | } |
159 | |
160 | /** |
161 | * Update given JobExecution using a SQL UPDATE statement. The JobExecution |
162 | * is first checked to ensure all fields are not null, and that it has an |
163 | * ID. The database is then queried to ensure that the ID exists, which |
164 | * ensures that it is valid. |
165 | * |
166 | * @see JobExecutionDao#updateJobExecution(JobExecution) |
167 | */ |
168 | public void updateJobExecution(JobExecution jobExecution) { |
169 | |
170 | validateJobExecution(jobExecution); |
171 | |
172 | Assert.notNull(jobExecution.getId(), |
173 | "JobExecution ID cannot be null. JobExecution must be saved before it can be updated"); |
174 | |
175 | Assert.notNull(jobExecution.getVersion(), |
176 | "JobExecution version cannot be null. JobExecution must be saved before it can be updated"); |
177 | |
178 | synchronized (jobExecution) { |
179 | Integer version = jobExecution.getVersion() + 1; |
180 | |
181 | String exitDescription = jobExecution.getExitStatus().getExitDescription(); |
182 | if (exitDescription != null && exitDescription.length() > exitMessageLength) { |
183 | exitDescription = exitDescription.substring(0, exitMessageLength); |
184 | logger.debug("Truncating long message before update of JobExecution: " + jobExecution); |
185 | } |
186 | Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(), |
187 | jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription, |
188 | version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(), |
189 | jobExecution.getVersion() }; |
190 | |
191 | // Check if given JobExecution's Id already exists, if none is found |
192 | // it |
193 | // is invalid and |
194 | // an exception should be thrown. |
195 | if (getJdbcTemplate().queryForInt(getQuery(CHECK_JOB_EXECUTION_EXISTS), |
196 | new Object[] { jobExecution.getId() }) != 1) { |
197 | throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found."); |
198 | } |
199 | |
200 | int count = getJdbcTemplate().getJdbcOperations().update( |
201 | getQuery(UPDATE_JOB_EXECUTION), |
202 | parameters, |
203 | new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, |
204 | Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER }); |
205 | |
206 | // Avoid concurrent modifications... |
207 | if (count == 0) { |
208 | int curentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION), |
209 | new Object[] { jobExecution.getId() }); |
210 | throw new OptimisticLockingFailureException("Attempt to update job execution id=" |
211 | + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion() |
212 | + "), where current version is " + curentVersion); |
213 | } |
214 | |
215 | jobExecution.incrementVersion(); |
216 | } |
217 | } |
218 | |
219 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
220 | |
221 | Long id = jobInstance.getId(); |
222 | |
223 | List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION), |
224 | new JobExecutionRowMapper(jobInstance), id); |
225 | |
226 | Assert.state(executions.size() <= 1, "There must be at most one latest job execution"); |
227 | |
228 | if (executions.isEmpty()) { |
229 | return null; |
230 | } |
231 | else { |
232 | return executions.get(0); |
233 | } |
234 | } |
235 | |
236 | /* |
237 | * (non-Javadoc) |
238 | * |
239 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
240 | * getLastJobExecution(java.lang.String) |
241 | */ |
242 | public JobExecution getJobExecution(Long executionId) { |
243 | try { |
244 | JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID), |
245 | new JobExecutionRowMapper(), executionId); |
246 | return jobExecution; |
247 | } |
248 | catch (EmptyResultDataAccessException e) { |
249 | return null; |
250 | } |
251 | } |
252 | |
253 | /* |
254 | * (non-Javadoc) |
255 | * |
256 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
257 | * findRunningJobExecutions(java.lang.String) |
258 | */ |
259 | public Set<JobExecution> findRunningJobExecutions(String jobName) { |
260 | |
261 | final Set<JobExecution> result = new HashSet<JobExecution>(); |
262 | RowCallbackHandler handler = new RowCallbackHandler() { |
263 | public void processRow(ResultSet rs) throws SQLException { |
264 | JobExecutionRowMapper mapper = new JobExecutionRowMapper(); |
265 | result.add(mapper.mapRow(rs, 0)); |
266 | } |
267 | }; |
268 | getJdbcTemplate().getJdbcOperations() |
269 | .query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler); |
270 | |
271 | return result; |
272 | } |
273 | |
274 | public void synchronizeStatus(JobExecution jobExecution) { |
275 | int currentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION), |
276 | jobExecution.getId()); |
277 | |
278 | if (currentVersion != jobExecution.getVersion().intValue()) { |
279 | String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId()); |
280 | jobExecution.upgradeStatus(BatchStatus.valueOf(status)); |
281 | jobExecution.setVersion(currentVersion); |
282 | } |
283 | } |
284 | |
285 | /** |
286 | * Re-usable mapper for {@link JobExecution} instances. |
287 | * |
288 | * @author Dave Syer |
289 | * |
290 | */ |
291 | private static class JobExecutionRowMapper implements ParameterizedRowMapper<JobExecution> { |
292 | |
293 | private JobInstance jobInstance; |
294 | |
295 | public JobExecutionRowMapper() { |
296 | } |
297 | |
298 | public JobExecutionRowMapper(JobInstance jobInstance) { |
299 | this.jobInstance = jobInstance; |
300 | } |
301 | |
302 | public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException { |
303 | Long id = rs.getLong(1); |
304 | JobExecution jobExecution; |
305 | |
306 | if (jobInstance == null) { |
307 | jobExecution = new JobExecution(id); |
308 | } |
309 | else { |
310 | jobExecution = new JobExecution(jobInstance, id); |
311 | } |
312 | |
313 | jobExecution.setStartTime(rs.getTimestamp(2)); |
314 | jobExecution.setEndTime(rs.getTimestamp(3)); |
315 | jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4))); |
316 | jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6))); |
317 | jobExecution.setCreateTime(rs.getTimestamp(7)); |
318 | jobExecution.setLastUpdated(rs.getTimestamp(8)); |
319 | jobExecution.setVersion(rs.getInt(9)); |
320 | return jobExecution; |
321 | } |
322 | |
323 | } |
324 | } |