View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.repository.dao;
18  
19  import java.sql.ResultSet;
20  import java.sql.SQLException;
21  import java.sql.Types;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.springframework.batch.core.BatchStatus;
29  import org.springframework.batch.core.ExitStatus;
30  import org.springframework.batch.core.JobExecution;
31  import org.springframework.batch.core.JobInstance;
32  import org.springframework.beans.factory.InitializingBean;
33  import org.springframework.dao.EmptyResultDataAccessException;
34  import org.springframework.dao.OptimisticLockingFailureException;
35  import org.springframework.jdbc.core.RowCallbackHandler;
36  import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
37  import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
38  import org.springframework.util.Assert;
39  
40  /**
41   * JDBC implementation of {@link JobExecutionDao}. Uses sequences (via Spring's
42   * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys
43   * before inserting a new row. Objects are checked to ensure all mandatory
44   * fields to be stored are not null. If any are found to be null, an
45   * IllegalArgumentException will be thrown. This could be left to JdbcTemplate,
46   * however, the exception will be fairly vague, and fails to highlight which
47   * field caused the exception.
48   *
49   * @author Lucas Ward
50   * @author Dave Syer
51   * @author Robert Kasanicky
52   * @author Michael Minella
53   */
54  public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
55  
56  	private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class);
57  
58  	private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, "
59  			+ "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
60  
61  	private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?";
62  
63  	private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
64  
65  	private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, "
66  			+ " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?";
67  
68  	private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
69  			+ " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc";
70  
71  	private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION "
72  			+ "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E.JOB_INSTANCE_ID = E2.JOB_INSTANCE_ID)";
73  
74  	private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION"
75  			+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
76  
77  	private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
78  			+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";
79  
80  	private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?";
81  
82  	private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
83  
84  	private DataFieldMaxValueIncrementer jobExecutionIncrementer;
85  
86  	/**
87  	 * Public setter for the exit message length in database. Do not set this if
88  	 * you haven't modified the schema.
89  	 * @param exitMessageLength the exitMessageLength to set
90  	 */
91  	public void setExitMessageLength(int exitMessageLength) {
92  		this.exitMessageLength = exitMessageLength;
93  	}
94  
95  	/**
96  	 * Setter for {@link DataFieldMaxValueIncrementer} to be used when
97  	 * generating primary keys for {@link JobExecution} instances.
98  	 *
99  	 * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer}
100 	 */
101 	public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
102 		this.jobExecutionIncrementer = jobExecutionIncrementer;
103 	}
104 
105 	@Override
106 	public void afterPropertiesSet() throws Exception {
107 		super.afterPropertiesSet();
108 		Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null.");
109 	}
110 
111 	@Override
112 	public List<JobExecution> findJobExecutions(final JobInstance job) {
113 
114 		Assert.notNull(job, "Job cannot be null.");
115 		Assert.notNull(job.getId(), "Job Id cannot be null.");
116 
117 		return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId());
118 	}
119 
120 	/**
121 	 *
122 	 * SQL implementation using Sequences via the Spring incrementer
123 	 * abstraction. Once a new id has been obtained, the JobExecution is saved
124 	 * via a SQL INSERT statement.
125 	 *
126 	 * @see JobExecutionDao#saveJobExecution(JobExecution)
127 	 * @throws IllegalArgumentException if jobExecution is null, as well as any
128 	 * of it's fields to be persisted.
129 	 */
130 	@Override
131 	public void saveJobExecution(JobExecution jobExecution) {
132 
133 		validateJobExecution(jobExecution);
134 
135 		jobExecution.incrementVersion();
136 
137 		jobExecution.setId(jobExecutionIncrementer.nextLongValue());
138 		Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(),
139 				jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(),
140 				jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(),
141 				jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated() };
142 		getJdbcTemplate().update(
143 				getQuery(SAVE_JOB_EXECUTION),
144 				parameters,
145 				new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
146 					Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP });
147 	}
148 
149 	/**
150 	 * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and
151 	 * Status cannot be null.
152 	 *
153 	 * @param jobExecution
154 	 * @throws IllegalArgumentException
155 	 */
156 	private void validateJobExecution(JobExecution jobExecution) {
157 
158 		Assert.notNull(jobExecution);
159 		Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null.");
160 		Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null.");
161 		Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null");
162 	}
163 
164 	/**
165 	 * Update given JobExecution using a SQL UPDATE statement. The JobExecution
166 	 * is first checked to ensure all fields are not null, and that it has an
167 	 * ID. The database is then queried to ensure that the ID exists, which
168 	 * ensures that it is valid.
169 	 *
170 	 * @see JobExecutionDao#updateJobExecution(JobExecution)
171 	 */
172 	@Override
173 	public void updateJobExecution(JobExecution jobExecution) {
174 
175 		validateJobExecution(jobExecution);
176 
177 		Assert.notNull(jobExecution.getId(),
178 				"JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
179 
180 		Assert.notNull(jobExecution.getVersion(),
181 				"JobExecution version cannot be null. JobExecution must be saved before it can be updated");
182 
183 		synchronized (jobExecution) {
184 			Integer version = jobExecution.getVersion() + 1;
185 
186 			String exitDescription = jobExecution.getExitStatus().getExitDescription();
187 			if (exitDescription != null && exitDescription.length() > exitMessageLength) {
188 				exitDescription = exitDescription.substring(0, exitMessageLength);
189 				logger.debug("Truncating long message before update of JobExecution: " + jobExecution);
190 			}
191 			Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(),
192 					jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription,
193 					version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(),
194 					jobExecution.getVersion() };
195 
196 			// Check if given JobExecution's Id already exists, if none is found
197 			// it
198 			// is invalid and
199 			// an exception should be thrown.
200 			if (getJdbcTemplate().queryForInt(getQuery(CHECK_JOB_EXECUTION_EXISTS),
201 					new Object[] { jobExecution.getId() }) != 1) {
202 				throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found.");
203 			}
204 
205 			int count = getJdbcTemplate().update(
206 					getQuery(UPDATE_JOB_EXECUTION),
207 					parameters,
208 					new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
209 						Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER });
210 
211 			// Avoid concurrent modifications...
212 			if (count == 0) {
213 				int curentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION),
214 						new Object[] { jobExecution.getId() });
215 				throw new OptimisticLockingFailureException("Attempt to update job execution id="
216 						+ jobExecution.getId() + " with wrong version (" + jobExecution.getVersion()
217 						+ "), where current version is " + curentVersion);
218 			}
219 
220 			jobExecution.incrementVersion();
221 		}
222 	}
223 
224 	@Override
225 	public JobExecution getLastJobExecution(JobInstance jobInstance) {
226 
227 		Long id = jobInstance.getId();
228 
229 		List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION),
230 				new JobExecutionRowMapper(jobInstance), id);
231 
232 		Assert.state(executions.size() <= 1, "There must be at most one latest job execution");
233 
234 		if (executions.isEmpty()) {
235 			return null;
236 		}
237 		else {
238 			return executions.get(0);
239 		}
240 	}
241 
242 	/*
243 	 * (non-Javadoc)
244 	 *
245 	 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
246 	 * getLastJobExecution(java.lang.String)
247 	 */
248 	@Override
249 	public JobExecution getJobExecution(Long executionId) {
250 		try {
251 			JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID),
252 					new JobExecutionRowMapper(), executionId);
253 			return jobExecution;
254 		}
255 		catch (EmptyResultDataAccessException e) {
256 			return null;
257 		}
258 	}
259 
260 	/*
261 	 * (non-Javadoc)
262 	 *
263 	 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
264 	 * findRunningJobExecutions(java.lang.String)
265 	 */
266 	@Override
267 	public Set<JobExecution> findRunningJobExecutions(String jobName) {
268 
269 		final Set<JobExecution> result = new HashSet<JobExecution>();
270 		RowCallbackHandler handler = new RowCallbackHandler() {
271 			@Override
272 			public void processRow(ResultSet rs) throws SQLException {
273 				JobExecutionRowMapper mapper = new JobExecutionRowMapper();
274 				result.add(mapper.mapRow(rs, 0));
275 			}
276 		};
277 		getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler);
278 
279 		return result;
280 	}
281 
282 	@Override
283 	public void synchronizeStatus(JobExecution jobExecution) {
284 		int currentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_JOB_EXECUTION),
285 				jobExecution.getId());
286 
287 		if (currentVersion != jobExecution.getVersion().intValue()) {
288 			String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId());
289 			jobExecution.upgradeStatus(BatchStatus.valueOf(status));
290 			jobExecution.setVersion(currentVersion);
291 		}
292 	}
293 
294 	/**
295 	 * Re-usable mapper for {@link JobExecution} instances.
296 	 *
297 	 * @author Dave Syer
298 	 *
299 	 */
300 	private static class JobExecutionRowMapper implements ParameterizedRowMapper<JobExecution> {
301 
302 		private JobInstance jobInstance;
303 
304 		public JobExecutionRowMapper() {
305 		}
306 
307 		public JobExecutionRowMapper(JobInstance jobInstance) {
308 			this.jobInstance = jobInstance;
309 		}
310 
311 		@Override
312 		public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
313 			Long id = rs.getLong(1);
314 			JobExecution jobExecution;
315 
316 			if (jobInstance == null) {
317 				jobExecution = new JobExecution(id);
318 			}
319 			else {
320 				jobExecution = new JobExecution(jobInstance, id);
321 			}
322 
323 			jobExecution.setStartTime(rs.getTimestamp(2));
324 			jobExecution.setEndTime(rs.getTimestamp(3));
325 			jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4)));
326 			jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6)));
327 			jobExecution.setCreateTime(rs.getTimestamp(7));
328 			jobExecution.setLastUpdated(rs.getTimestamp(8));
329 			jobExecution.setVersion(rs.getInt(9));
330 			return jobExecution;
331 		}
332 
333 	}
334 }