1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.sql.ResultSet; |
20 | import java.sql.SQLException; |
21 | import java.sql.Timestamp; |
22 | import java.sql.Types; |
23 | import java.util.HashMap; |
24 | import java.util.HashSet; |
25 | import java.util.List; |
26 | import java.util.Map; |
27 | import java.util.Map.Entry; |
28 | import java.util.Set; |
29 | |
30 | import org.apache.commons.logging.Log; |
31 | import org.apache.commons.logging.LogFactory; |
32 | import org.springframework.batch.core.BatchStatus; |
33 | import org.springframework.batch.core.ExitStatus; |
34 | import org.springframework.batch.core.JobExecution; |
35 | import org.springframework.batch.core.JobInstance; |
36 | import org.springframework.batch.core.JobParameter; |
37 | import org.springframework.batch.core.JobParameter.ParameterType; |
38 | import org.springframework.batch.core.JobParameters; |
39 | import org.springframework.beans.factory.InitializingBean; |
40 | import org.springframework.dao.EmptyResultDataAccessException; |
41 | import org.springframework.dao.OptimisticLockingFailureException; |
42 | import org.springframework.jdbc.core.RowCallbackHandler; |
43 | import org.springframework.jdbc.core.simple.ParameterizedRowMapper; |
44 | import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; |
45 | import org.springframework.util.Assert; |
46 | |
47 | /** |
48 | * JDBC implementation of {@link JobExecutionDao}. Uses sequences (via Spring's |
49 | * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys |
50 | * before inserting a new row. Objects are checked to ensure all mandatory |
51 | * fields to be stored are not null. If any are found to be null, an |
52 | * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, |
53 | * however, the exception will be fairly vague, and fails to highlight which |
54 | * field caused the exception. |
55 | * |
56 | * @author Lucas Ward |
57 | * @author Dave Syer |
58 | * @author Robert Kasanicky |
59 | * @author Michael Minella |
60 | */ |
61 | public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean { |
62 | |
63 | private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class); |
64 | |
65 | private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, " |
66 | + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; |
67 | |
68 | private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?"; |
69 | |
70 | private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; |
71 | |
72 | private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, " |
73 | + " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?"; |
74 | |
75 | private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" |
76 | + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc"; |
77 | |
78 | private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION " |
79 | + "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)"; |
80 | |
81 | private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION" |
82 | + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; |
83 | |
84 | private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " |
85 | + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; |
86 | |
87 | private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?"; |
88 | |
89 | private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, " |
90 | + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING from %PREFIX%JOB_EXECUTION_PARAMS where JOB_EXECUTION_ID = ?"; |
91 | |
92 | private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_EXECUTION_PARAMS(JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, " |
93 | + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING) values (?, ?, ?, ?, ?, ?, ?, ?)"; |
94 | |
95 | private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH; |
96 | |
97 | private DataFieldMaxValueIncrementer jobExecutionIncrementer; |
98 | |
99 | /** |
100 | * Public setter for the exit message length in database. Do not set this if |
101 | * you haven't modified the schema. |
102 | * @param exitMessageLength the exitMessageLength to set |
103 | */ |
104 | public void setExitMessageLength(int exitMessageLength) { |
105 | this.exitMessageLength = exitMessageLength; |
106 | } |
107 | |
108 | /** |
109 | * Setter for {@link DataFieldMaxValueIncrementer} to be used when |
110 | * generating primary keys for {@link JobExecution} instances. |
111 | * |
112 | * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer} |
113 | */ |
114 | public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { |
115 | this.jobExecutionIncrementer = jobExecutionIncrementer; |
116 | } |
117 | |
118 | @Override |
119 | public void afterPropertiesSet() throws Exception { |
120 | super.afterPropertiesSet(); |
121 | Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null."); |
122 | } |
123 | |
124 | @Override |
125 | public List<JobExecution> findJobExecutions(final JobInstance job) { |
126 | |
127 | Assert.notNull(job, "Job cannot be null."); |
128 | Assert.notNull(job.getId(), "Job Id cannot be null."); |
129 | |
130 | return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId()); |
131 | } |
132 | |
133 | /** |
134 | * |
135 | * SQL implementation using Sequences via the Spring incrementer |
136 | * abstraction. Once a new id has been obtained, the JobExecution is saved |
137 | * via a SQL INSERT statement. |
138 | * |
139 | * @see JobExecutionDao#saveJobExecution(JobExecution) |
140 | * @throws IllegalArgumentException if jobExecution is null, as well as any |
141 | * of it's fields to be persisted. |
142 | */ |
143 | @Override |
144 | public void saveJobExecution(JobExecution jobExecution) { |
145 | |
146 | validateJobExecution(jobExecution); |
147 | |
148 | jobExecution.incrementVersion(); |
149 | |
150 | jobExecution.setId(jobExecutionIncrementer.nextLongValue()); |
151 | Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), |
152 | jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(), |
153 | jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(), |
154 | jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated() }; |
155 | getJdbcTemplate().update( |
156 | getQuery(SAVE_JOB_EXECUTION), |
157 | parameters, |
158 | new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, |
159 | Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }); |
160 | |
161 | insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters()); |
162 | } |
163 | |
164 | /** |
165 | * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and |
166 | * Status cannot be null. |
167 | * |
168 | * @param jobExecution |
169 | * @throws IllegalArgumentException |
170 | */ |
171 | private void validateJobExecution(JobExecution jobExecution) { |
172 | |
173 | Assert.notNull(jobExecution); |
174 | Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null."); |
175 | Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null."); |
176 | Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null"); |
177 | } |
178 | |
179 | /** |
180 | * Update given JobExecution using a SQL UPDATE statement. The JobExecution |
181 | * is first checked to ensure all fields are not null, and that it has an |
182 | * ID. The database is then queried to ensure that the ID exists, which |
183 | * ensures that it is valid. |
184 | * |
185 | * @see JobExecutionDao#updateJobExecution(JobExecution) |
186 | */ |
187 | @Override |
188 | public void updateJobExecution(JobExecution jobExecution) { |
189 | |
190 | validateJobExecution(jobExecution); |
191 | |
192 | Assert.notNull(jobExecution.getId(), |
193 | "JobExecution ID cannot be null. JobExecution must be saved before it can be updated"); |
194 | |
195 | Assert.notNull(jobExecution.getVersion(), |
196 | "JobExecution version cannot be null. JobExecution must be saved before it can be updated"); |
197 | |
198 | synchronized (jobExecution) { |
199 | Integer version = jobExecution.getVersion() + 1; |
200 | |
201 | String exitDescription = jobExecution.getExitStatus().getExitDescription(); |
202 | if (exitDescription != null && exitDescription.length() > exitMessageLength) { |
203 | exitDescription = exitDescription.substring(0, exitMessageLength); |
204 | logger.debug("Truncating long message before update of JobExecution: " + jobExecution); |
205 | } |
206 | Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(), |
207 | jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription, |
208 | version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(), |
209 | jobExecution.getVersion() }; |
210 | |
211 | // Check if given JobExecution's Id already exists, if none is found |
212 | // it |
213 | // is invalid and |
214 | // an exception should be thrown. |
215 | if (getJdbcTemplate().queryForInt(getQuery(CHECK_JOB_EXECUTION_EXISTS), |
216 | new Object[] { jobExecution.getId() }) != 1) { |
217 | throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found."); |
218 | } |
219 | |
220 | int count = getJdbcTemplate().update( |
221 | getQuery(UPDATE_JOB_EXECUTION), |
222 | parameters, |
223 | new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, |
224 | Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER }); |
225 | |
226 | // Avoid concurrent modifications... |
227 | if (count == 0) { |
228 | int curentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION), |
229 | new Object[] { jobExecution.getId() }); |
230 | throw new OptimisticLockingFailureException("Attempt to update job execution id=" |
231 | + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion() |
232 | + "), where current version is " + curentVersion); |
233 | } |
234 | |
235 | jobExecution.incrementVersion(); |
236 | } |
237 | } |
238 | |
239 | @Override |
240 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
241 | |
242 | Long id = jobInstance.getId(); |
243 | |
244 | List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION), |
245 | new JobExecutionRowMapper(jobInstance), id, id); |
246 | |
247 | Assert.state(executions.size() <= 1, "There must be at most one latest job execution"); |
248 | |
249 | if (executions.isEmpty()) { |
250 | return null; |
251 | } |
252 | else { |
253 | return executions.get(0); |
254 | } |
255 | } |
256 | |
257 | /* |
258 | * (non-Javadoc) |
259 | * |
260 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
261 | * getLastJobExecution(java.lang.String) |
262 | */ |
263 | @Override |
264 | public JobExecution getJobExecution(Long executionId) { |
265 | try { |
266 | JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID), |
267 | new JobExecutionRowMapper(), executionId); |
268 | return jobExecution; |
269 | } |
270 | catch (EmptyResultDataAccessException e) { |
271 | return null; |
272 | } |
273 | } |
274 | |
275 | /* |
276 | * (non-Javadoc) |
277 | * |
278 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
279 | * findRunningJobExecutions(java.lang.String) |
280 | */ |
281 | @Override |
282 | public Set<JobExecution> findRunningJobExecutions(String jobName) { |
283 | |
284 | final Set<JobExecution> result = new HashSet<JobExecution>(); |
285 | RowCallbackHandler handler = new RowCallbackHandler() { |
286 | @Override |
287 | public void processRow(ResultSet rs) throws SQLException { |
288 | JobExecutionRowMapper mapper = new JobExecutionRowMapper(); |
289 | result.add(mapper.mapRow(rs, 0)); |
290 | } |
291 | }; |
292 | getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler); |
293 | |
294 | return result; |
295 | } |
296 | |
297 | @Override |
298 | public void synchronizeStatus(JobExecution jobExecution) { |
299 | int currentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION), |
300 | jobExecution.getId()); |
301 | |
302 | if (currentVersion != jobExecution.getVersion().intValue()) { |
303 | String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId()); |
304 | jobExecution.upgradeStatus(BatchStatus.valueOf(status)); |
305 | jobExecution.setVersion(currentVersion); |
306 | } |
307 | } |
308 | |
309 | /** |
310 | * Convenience method that inserts all parameters from the provided |
311 | * JobParameters. |
312 | * |
313 | */ |
314 | private void insertJobParameters(Long executionId, JobParameters jobParameters) { |
315 | |
316 | for (Entry<String, JobParameter> entry : jobParameters.getParameters() |
317 | .entrySet()) { |
318 | JobParameter jobParameter = entry.getValue(); |
319 | insertParameter(executionId, jobParameter.getType(), entry.getKey(), |
320 | jobParameter.getValue(), jobParameter.isIdentifying()); |
321 | } |
322 | } |
323 | |
324 | /** |
325 | * Convenience method that inserts an individual records into the |
326 | * JobParameters table. |
327 | */ |
328 | private void insertParameter(Long executionId, ParameterType type, String key, |
329 | Object value, boolean identifying) { |
330 | |
331 | Object[] args = new Object[0]; |
332 | int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR, |
333 | Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT, |
334 | Types.DOUBLE, Types.CHAR }; |
335 | |
336 | String identifyingFlag = identifying? "Y":"N"; |
337 | |
338 | if (type == ParameterType.STRING) { |
339 | args = new Object[] { executionId, key, type, value, new Timestamp(0L), |
340 | 0L, 0D, identifyingFlag}; |
341 | } else if (type == ParameterType.LONG) { |
342 | args = new Object[] { executionId, key, type, "", new Timestamp(0L), |
343 | value, new Double(0), identifyingFlag}; |
344 | } else if (type == ParameterType.DOUBLE) { |
345 | args = new Object[] { executionId, key, type, "", new Timestamp(0L), 0L, |
346 | value, identifyingFlag}; |
347 | } else if (type == ParameterType.DATE) { |
348 | args = new Object[] { executionId, key, type, "", value, 0L, 0D, identifyingFlag}; |
349 | } |
350 | |
351 | getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes); |
352 | } |
353 | |
354 | /** |
355 | * @param executionId |
356 | * @return |
357 | */ |
358 | protected JobParameters getJobParameters(Long executionId) { |
359 | final Map<String, JobParameter> map = new HashMap<String, JobParameter>(); |
360 | RowCallbackHandler handler = new RowCallbackHandler() { |
361 | @Override |
362 | public void processRow(ResultSet rs) throws SQLException { |
363 | ParameterType type = ParameterType.valueOf(rs.getString(3)); |
364 | JobParameter value = null; |
365 | |
366 | if (type == ParameterType.STRING) { |
367 | value = new JobParameter(rs.getString(4), rs.getString(8).equalsIgnoreCase("Y")); |
368 | } else if (type == ParameterType.LONG) { |
369 | value = new JobParameter(rs.getLong(6), rs.getString(8).equalsIgnoreCase("Y")); |
370 | } else if (type == ParameterType.DOUBLE) { |
371 | value = new JobParameter(rs.getDouble(7), rs.getString(8).equalsIgnoreCase("Y")); |
372 | } else if (type == ParameterType.DATE) { |
373 | value = new JobParameter(rs.getTimestamp(5), rs.getString(8).equalsIgnoreCase("Y")); |
374 | } |
375 | |
376 | // No need to assert that value is not null because it's an enum |
377 | map.put(rs.getString(2), value); |
378 | } |
379 | }; |
380 | |
381 | getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), new Object[] { executionId }, handler); |
382 | |
383 | return new JobParameters(map); |
384 | } |
385 | |
386 | /** |
387 | * Re-usable mapper for {@link JobExecution} instances. |
388 | * |
389 | * @author Dave Syer |
390 | * |
391 | */ |
392 | private final class JobExecutionRowMapper implements ParameterizedRowMapper<JobExecution> { |
393 | |
394 | private JobInstance jobInstance; |
395 | |
396 | private JobParameters jobParameters; |
397 | |
398 | public JobExecutionRowMapper() { |
399 | } |
400 | |
401 | public JobExecutionRowMapper(JobInstance jobInstance) { |
402 | this.jobInstance = jobInstance; |
403 | } |
404 | |
405 | @Override |
406 | public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException { |
407 | Long id = rs.getLong(1); |
408 | JobExecution jobExecution; |
409 | if (jobParameters == null) { |
410 | jobParameters = getJobParameters(id); |
411 | } |
412 | |
413 | if (jobInstance == null) { |
414 | jobExecution = new JobExecution(id, jobParameters); |
415 | } |
416 | else { |
417 | jobExecution = new JobExecution(jobInstance, id, jobParameters); |
418 | } |
419 | |
420 | jobExecution.setStartTime(rs.getTimestamp(2)); |
421 | jobExecution.setEndTime(rs.getTimestamp(3)); |
422 | jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4))); |
423 | jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6))); |
424 | jobExecution.setCreateTime(rs.getTimestamp(7)); |
425 | jobExecution.setLastUpdated(rs.getTimestamp(8)); |
426 | jobExecution.setVersion(rs.getInt(9)); |
427 | return jobExecution; |
428 | } |
429 | |
430 | } |
431 | } |