View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.repository.dao;
18  
19  import java.sql.ResultSet;
20  import java.sql.SQLException;
21  import java.sql.Types;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.springframework.batch.core.BatchStatus;
27  import org.springframework.batch.core.ExitStatus;
28  import org.springframework.batch.core.JobExecution;
29  import org.springframework.batch.core.StepExecution;
30  import org.springframework.beans.factory.InitializingBean;
31  import org.springframework.dao.OptimisticLockingFailureException;
32  import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
33  import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
34  import org.springframework.util.Assert;
35  
36  /**
37   * JDBC implementation of {@link StepExecutionDao}.<br/>
38   *
39   * Allows customization of the tables names used by Spring Batch for step meta
40   * data via a prefix property.<br/>
41   *
42   * Uses sequences or tables (via Spring's {@link DataFieldMaxValueIncrementer}
43   * abstraction) to create all primary keys before inserting a new row. All
44   * objects are checked to ensure all fields to be stored are not null. If any
45   * are found to be null, an IllegalArgumentException will be thrown. This could
46   * be left to JdbcTemplate, however, the exception will be fairly vague, and
47   * fails to highlight which field caused the exception.<br/>
48   *
49   * @author Lucas Ward
50   * @author Dave Syer
51   * @author Robert Kasanicky
52   *
53   * @see StepExecutionDao
54   */
55  public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implements StepExecutionDao, InitializingBean {
56  
57  	private static final Log logger = LogFactory.getLog(JdbcStepExecutionDao.class);
58  
59  	private static final String SAVE_STEP_EXECUTION = "INSERT into %PREFIX%STEP_EXECUTION(STEP_EXECUTION_ID, VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, "
60  			+ "END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED) "
61  			+ "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
62  
63  	private static final String UPDATE_STEP_EXECUTION = "UPDATE %PREFIX%STEP_EXECUTION set START_TIME = ?, END_TIME = ?, "
64  			+ "STATUS = ?, COMMIT_COUNT = ?, READ_COUNT = ?, FILTER_COUNT = ?, WRITE_COUNT = ?, EXIT_CODE = ?, "
65  			+ "EXIT_MESSAGE = ?, VERSION = ?, READ_SKIP_COUNT = ?, PROCESS_SKIP_COUNT = ?, WRITE_SKIP_COUNT = ?, ROLLBACK_COUNT = ?, LAST_UPDATED = ?"
66  			+ " where STEP_EXECUTION_ID = ? and VERSION = ?";
67  
68  	private static final String GET_RAW_STEP_EXECUTIONS = "SELECT STEP_EXECUTION_ID, STEP_NAME, START_TIME, END_TIME, STATUS, COMMIT_COUNT,"
69  			+ " READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED, VERSION from %PREFIX%STEP_EXECUTION where JOB_EXECUTION_ID = ?";
70  
71  	private static final String GET_STEP_EXECUTIONS = GET_RAW_STEP_EXECUTIONS + " order by STEP_EXECUTION_ID";
72  
73  	private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?";
74  
75  	private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?";
76  
77  	private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
78  
79  	private DataFieldMaxValueIncrementer stepExecutionIncrementer;
80  
81  	/**
82  	 * Public setter for the exit message length in database. Do not set this if
83  	 * you haven't modified the schema.
84  	 * @param exitMessageLength the exitMessageLength to set
85  	 */
86  	public void setExitMessageLength(int exitMessageLength) {
87  		this.exitMessageLength = exitMessageLength;
88  	}
89  
90  	public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) {
91  		this.stepExecutionIncrementer = stepExecutionIncrementer;
92  	}
93  
94  	@Override
95  	public void afterPropertiesSet() throws Exception {
96  		super.afterPropertiesSet();
97  		Assert.notNull(stepExecutionIncrementer, "StepExecutionIncrementer cannot be null.");
98  	}
99  
100 	/**
101 	 * Save a StepExecution. A unique id will be generated by the
102 	 * stepExecutionIncrementor, and then set in the StepExecution. All values
103 	 * will then be stored via an INSERT statement.
104 	 *
105 	 * @see StepExecutionDao#saveStepExecution(StepExecution)
106 	 */
107 	@Override
108 	public void saveStepExecution(StepExecution stepExecution) {
109 
110 		Assert.isNull(stepExecution.getId(),
111 				"to-be-saved (not updated) StepExecution can't already have an id assigned");
112 		Assert.isNull(stepExecution.getVersion(),
113 				"to-be-saved (not updated) StepExecution can't already have a version assigned");
114 
115 		validateStepExecution(stepExecution);
116 
117 		String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription());
118 
119 		stepExecution.setId(stepExecutionIncrementer.nextLongValue());
120 		stepExecution.incrementVersion(); // should be 0 now
121 		Object[] parameters = new Object[] { stepExecution.getId(), stepExecution.getVersion(),
122 				stepExecution.getStepName(), stepExecution.getJobExecutionId(), stepExecution.getStartTime(),
123 				stepExecution.getEndTime(), stepExecution.getStatus().toString(), stepExecution.getCommitCount(),
124 				stepExecution.getReadCount(), stepExecution.getFilterCount(), stepExecution.getWriteCount(),
125 				stepExecution.getExitStatus().getExitCode(), exitDescription, stepExecution.getReadSkipCount(),
126 				stepExecution.getWriteSkipCount(), stepExecution.getProcessSkipCount(),
127 				stepExecution.getRollbackCount(), stepExecution.getLastUpdated() };
128 		getJdbcTemplate().update(
129 				getQuery(SAVE_STEP_EXECUTION),
130 				parameters,
131 				new int[] { Types.BIGINT, Types.INTEGER, Types.VARCHAR, Types.BIGINT, Types.TIMESTAMP,
132 					Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER,
133 					Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER,
134 					Types.TIMESTAMP });
135 	}
136 
137 	/**
138 	 * Validate StepExecution. At a minimum, JobId, StartTime, and Status cannot
139 	 * be null. EndTime can be null for an unfinished job.
140 	 *
141 	 * @throws IllegalArgumentException
142 	 */
143 	private void validateStepExecution(StepExecution stepExecution) {
144 		Assert.notNull(stepExecution);
145 		Assert.notNull(stepExecution.getStepName(), "StepExecution step name cannot be null.");
146 		Assert.notNull(stepExecution.getStartTime(), "StepExecution start time cannot be null.");
147 		Assert.notNull(stepExecution.getStatus(), "StepExecution status cannot be null.");
148 	}
149 
150 	@Override
151 	public void updateStepExecution(StepExecution stepExecution) {
152 
153 		validateStepExecution(stepExecution);
154 		Assert.notNull(stepExecution.getId(), "StepExecution Id cannot be null. StepExecution must saved"
155 				+ " before it can be updated.");
156 
157 		// Do not check for existence of step execution considering
158 		// it is saved at every commit point.
159 
160 		String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription());
161 
162 		// Attempt to prevent concurrent modification errors by blocking here if
163 		// someone is already trying to do it.
164 		synchronized (stepExecution) {
165 
166 			Integer version = stepExecution.getVersion() + 1;
167 			Object[] parameters = new Object[] { stepExecution.getStartTime(), stepExecution.getEndTime(),
168 					stepExecution.getStatus().toString(), stepExecution.getCommitCount(), stepExecution.getReadCount(),
169 					stepExecution.getFilterCount(), stepExecution.getWriteCount(),
170 					stepExecution.getExitStatus().getExitCode(), exitDescription, version,
171 					stepExecution.getReadSkipCount(), stepExecution.getProcessSkipCount(),
172 					stepExecution.getWriteSkipCount(), stepExecution.getRollbackCount(),
173 					stepExecution.getLastUpdated(), stepExecution.getId(), stepExecution.getVersion() };
174 			int count = getJdbcTemplate().update(
175 					getQuery(UPDATE_STEP_EXECUTION),
176 					parameters,
177 					new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER,
178 						Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER,
179 						Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP,
180 						Types.BIGINT, Types.INTEGER });
181 
182 			// Avoid concurrent modifications...
183 			if (count == 0) {
184 				int curentVersion = getJdbcTemplate().queryForInt(getQuery(CURRENT_VERSION_STEP_EXECUTION),
185 						new Object[] { stepExecution.getId() });
186 				throw new OptimisticLockingFailureException("Attempt to update step execution id="
187 						+ stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
188 						+ "), where current version is " + curentVersion);
189 			}
190 
191 			stepExecution.incrementVersion();
192 
193 		}
194 	}
195 
196 	/**
197 	 * Truncate the exit description if the length exceeds
198 	 * {@link #DEFAULT_EXIT_MESSAGE_LENGTH}.
199 	 * @param description the string to truncate
200 	 * @return truncated description
201 	 */
202 	private String truncateExitDescription(String description) {
203 		if (description != null && description.length() > exitMessageLength) {
204 			logger.debug("Truncating long message before update of StepExecution, original message is: " + description);
205 			return description.substring(0, exitMessageLength);
206 		}
207 		else {
208 			return description;
209 		}
210 	}
211 
212 	@Override
213 	public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
214 		List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_STEP_EXECUTION),
215 				new StepExecutionRowMapper(jobExecution), jobExecution.getId(), stepExecutionId);
216 
217 		Assert.state(executions.size() <= 1,
218 				"There can be at most one step execution with given name for single job execution");
219 		if (executions.isEmpty()) {
220 			return null;
221 		}
222 		else {
223 			return executions.get(0);
224 		}
225 	}
226 
227 	@Override
228 	public void addStepExecutions(JobExecution jobExecution) {
229 		getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),
230 				jobExecution.getId());
231 	}
232 
233 	private static class StepExecutionRowMapper implements ParameterizedRowMapper<StepExecution> {
234 
235 		private final JobExecution jobExecution;
236 
237 		public StepExecutionRowMapper(JobExecution jobExecution) {
238 			this.jobExecution = jobExecution;
239 		}
240 
241 		@Override
242 		public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
243 			StepExecution stepExecution = new StepExecution(rs.getString(2), jobExecution, rs.getLong(1));
244 			stepExecution.setStartTime(rs.getTimestamp(3));
245 			stepExecution.setEndTime(rs.getTimestamp(4));
246 			stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
247 			stepExecution.setCommitCount(rs.getInt(6));
248 			stepExecution.setReadCount(rs.getInt(7));
249 			stepExecution.setFilterCount(rs.getInt(8));
250 			stepExecution.setWriteCount(rs.getInt(9));
251 			stepExecution.setExitStatus(new ExitStatus(rs.getString(10), rs.getString(11)));
252 			stepExecution.setReadSkipCount(rs.getInt(12));
253 			stepExecution.setWriteSkipCount(rs.getInt(13));
254 			stepExecution.setProcessSkipCount(rs.getInt(14));
255 			stepExecution.setRollbackCount(rs.getInt(15));
256 			stepExecution.setLastUpdated(rs.getTimestamp(16));
257 			stepExecution.setVersion(rs.getInt(17));
258 			return stepExecution;
259 		}
260 
261 	}
262 
263 }