View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.repository.dao;
18  
19  import java.io.UnsupportedEncodingException;
20  import java.math.BigInteger;
21  import java.security.MessageDigest;
22  import java.security.NoSuchAlgorithmException;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Timestamp;
26  import java.sql.Types;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  
34  import org.springframework.batch.core.JobExecution;
35  import org.springframework.batch.core.JobInstance;
36  import org.springframework.batch.core.JobParameter;
37  import org.springframework.batch.core.JobParameter.ParameterType;
38  import org.springframework.batch.core.JobParameters;
39  import org.springframework.beans.factory.InitializingBean;
40  import org.springframework.dao.DataAccessException;
41  import org.springframework.dao.EmptyResultDataAccessException;
42  import org.springframework.jdbc.core.ResultSetExtractor;
43  import org.springframework.jdbc.core.RowCallbackHandler;
44  import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
45  import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
46  import org.springframework.util.Assert;
47  import org.springframework.util.StringUtils;
48  
49  /**
50   * JDBC implementation of {@link JobInstanceDao}. Uses sequences (via Spring's
51   * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys
52   * before inserting a new row. Objects are checked to ensure all mandatory
53   * fields to be stored are not null. If any are found to be null, an
54   * IllegalArgumentException will be thrown. This could be left to JdbcTemplate,
55   * however, the exception will be fairly vague, and fails to highlight which
56   * field caused the exception.
57   *
58   * @author Lucas Ward
59   * @author Dave Syer
60   * @author Robert Kasanicky
61   */
62  public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements
63  JobInstanceDao, InitializingBean {
64  
65  	private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)"
66  			+ " values (?, ?, ?, ?)";
67  
68  	private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_PARAMS(JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, "
69  			+ "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL) values (?, ?, ?, ?, ?, ?, ?)";
70  
71  	private static final String FIND_JOBS_WITH_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ?";
72  
73  	private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME
74  			+ " and JOB_KEY = ?";
75  
76  	private static final String FIND_JOBS_WITH_EMPTY_KEY = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)";
77  
78  	private static final String GET_JOB_FROM_ID = "SELECT JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION from %PREFIX%JOB_INSTANCE where JOB_INSTANCE_ID = ?";
79  
80  	private static final String GET_JOB_FROM_EXECUTION_ID = "SELECT ji.JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, ji.VERSION from %PREFIX%JOB_INSTANCE ji, "
81  			+ "%PREFIX%JOB_EXECUTION je where JOB_EXECUTION_ID = ? and ji.JOB_INSTANCE_ID = je.JOB_INSTANCE_ID";
82  
83  	private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, "
84  			+ "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL from %PREFIX%JOB_PARAMS where JOB_INSTANCE_ID = ?";
85  
86  	private static final String FIND_JOB_NAMES = "SELECT distinct JOB_NAME from %PREFIX%JOB_INSTANCE order by JOB_NAME";
87  
88  	private static final String FIND_LAST_JOBS_BY_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc";
89  
90  	private DataFieldMaxValueIncrementer jobIncrementer;
91  
92  	/**
93  	 * In this jdbc implementation a job id is obtained by asking the
94  	 * jobIncrementer (which is likely a sequence) for the next long value, and
95  	 * then passing the Id and parameter values into an INSERT statement.
96  	 *
97  	 * @see JobInstanceDao#createJobInstance(String, JobParameters)
98  	 * @throws IllegalArgumentException
99  	 *             if any {@link JobParameters} fields are null.
100 	 */
101 	@Override
102 	public JobInstance createJobInstance(String jobName,
103 			JobParameters jobParameters) {
104 
105 		Assert.notNull(jobName, "Job name must not be null.");
106 		Assert.notNull(jobParameters, "JobParameters must not be null.");
107 
108 		Assert.state(getJobInstance(jobName, jobParameters) == null,
109 				"JobInstance must not already exist");
110 
111 		Long jobId = jobIncrementer.nextLongValue();
112 
113 		JobInstance jobInstance = new JobInstance(jobId, jobParameters, jobName);
114 		jobInstance.incrementVersion();
115 
116 		Object[] parameters = new Object[] { jobId, jobName,
117 				createJobKey(jobParameters), jobInstance.getVersion() };
118 		getJdbcTemplate().update(
119 				getQuery(CREATE_JOB_INSTANCE),
120 				parameters,
121 				new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR,
122 					Types.INTEGER });
123 
124 		insertJobParameters(jobId, jobParameters);
125 
126 		return jobInstance;
127 	}
128 
129 	protected String createJobKey(JobParameters jobParameters) {
130 
131 		Map<String, JobParameter> props = jobParameters.getParameters();
132 		StringBuffer stringBuffer = new StringBuffer();
133 		List<String> keys = new ArrayList<String>(props.keySet());
134 		Collections.sort(keys);
135 		for (String key : keys) {
136 			JobParameter jobParameter = props.get(key);
137 			String value = jobParameter.getValue()==null ? "" : jobParameter.toString();
138 			stringBuffer.append(key + "=" + value + ";");
139 		}
140 
141 		MessageDigest digest;
142 		try {
143 			digest = MessageDigest.getInstance("MD5");
144 		} catch (NoSuchAlgorithmException e) {
145 			throw new IllegalStateException(
146 					"MD5 algorithm not available.  Fatal (should be in the JDK).");
147 		}
148 
149 		try {
150 			byte[] bytes = digest.digest(stringBuffer.toString().getBytes(
151 					"UTF-8"));
152 			return String.format("%032x", new BigInteger(1, bytes));
153 		} catch (UnsupportedEncodingException e) {
154 			throw new IllegalStateException(
155 					"UTF-8 encoding not available.  Fatal (should be in the JDK).");
156 		}
157 	}
158 
159 	/**
160 	 * Convenience method that inserts all parameters from the provided
161 	 * JobParameters.
162 	 *
163 	 */
164 	private void insertJobParameters(Long jobId, JobParameters jobParameters) {
165 
166 		for (Entry<String, JobParameter> entry : jobParameters.getParameters()
167 				.entrySet()) {
168 			JobParameter jobParameter = entry.getValue();
169 			insertParameter(jobId, jobParameter.getType(), entry.getKey(),
170 					jobParameter.getValue());
171 		}
172 	}
173 
174 	/**
175 	 * Convenience method that inserts an individual records into the
176 	 * JobParameters table.
177 	 */
178 	private void insertParameter(Long jobId, ParameterType type, String key,
179 			Object value) {
180 
181 		Object[] args = new Object[0];
182 		int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR,
183 				Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT,
184 				Types.DOUBLE };
185 
186 		if (type == ParameterType.STRING) {
187 			args = new Object[] { jobId, key, type, value, new Timestamp(0L),
188 					0L, 0D };
189 		} else if (type == ParameterType.LONG) {
190 			args = new Object[] { jobId, key, type, "", new Timestamp(0L),
191 					value, new Double(0) };
192 		} else if (type == ParameterType.DOUBLE) {
193 			args = new Object[] { jobId, key, type, "", new Timestamp(0L), 0L,
194 					value };
195 		} else if (type == ParameterType.DATE) {
196 			args = new Object[] { jobId, key, type, "", value, 0L, 0D };
197 		}
198 
199 		getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes);
200 	}
201 
202 	/**
203 	 * The job table is queried for <strong>any</strong> jobs that match the
204 	 * given identifier, adding them to a list via the RowMapper callback.
205 	 *
206 	 * @see JobInstanceDao#getJobInstance(String, JobParameters)
207 	 * @throws IllegalArgumentException
208 	 *             if any {@link JobParameters} fields are null.
209 	 */
210 	@Override
211 	public JobInstance getJobInstance(final String jobName,
212 			final JobParameters jobParameters) {
213 
214 		Assert.notNull(jobName, "Job name must not be null.");
215 		Assert.notNull(jobParameters, "JobParameters must not be null.");
216 
217 		String jobKey = createJobKey(jobParameters);
218 
219 		ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper(
220 				jobParameters);
221 
222 		List<JobInstance> instances;
223 		if (StringUtils.hasLength(jobKey)) {
224 			instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY),
225 					rowMapper, jobName, jobKey);
226 		} else {
227 			instances = getJdbcTemplate().query(
228 					getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName,
229 					jobKey);
230 		}
231 
232 		if (instances.isEmpty()) {
233 			return null;
234 		} else {
235 			Assert.state(instances.size() == 1);
236 			return instances.get(0);
237 		}
238 	}
239 
240 	/*
241 	 * (non-Javadoc)
242 	 *
243 	 * @see
244 	 * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance
245 	 * (java.lang.Long)
246 	 */
247 	@Override
248 	public JobInstance getJobInstance(Long instanceId) {
249 
250 		try {
251 			return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_ID),
252 					new JobInstanceRowMapper(), instanceId);
253 		} catch (EmptyResultDataAccessException e) {
254 			return null;
255 		}
256 
257 	}
258 
259 	/**
260 	 * @param instanceId
261 	 * @return
262 	 */
263 	private JobParameters getJobParameters(Long instanceId) {
264 		final Map<String, JobParameter> map = new HashMap<String, JobParameter>();
265 		RowCallbackHandler handler = new RowCallbackHandler() {
266 			@Override
267 			public void processRow(ResultSet rs) throws SQLException {
268 				ParameterType type = ParameterType.valueOf(rs.getString(3));
269 				JobParameter value = null;
270 				if (type == ParameterType.STRING) {
271 					value = new JobParameter(rs.getString(4));
272 				} else if (type == ParameterType.LONG) {
273 					value = new JobParameter(rs.getLong(6));
274 				} else if (type == ParameterType.DOUBLE) {
275 					value = new JobParameter(rs.getDouble(7));
276 				} else if (type == ParameterType.DATE) {
277 					value = new JobParameter(rs.getTimestamp(5));
278 				}
279 				// No need to assert that value is not null because it's an enum
280 				map.put(rs.getString(2), value);
281 			}
282 		};
283 		getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), new Object[] { instanceId }, handler);
284 		return new JobParameters(map);
285 	}
286 
287 	/*
288 	 * (non-Javadoc)
289 	 *
290 	 * @see
291 	 * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobNames
292 	 * ()
293 	 */
294 	@Override
295 	public List<String> getJobNames() {
296 		return getJdbcTemplate().query(getQuery(FIND_JOB_NAMES),
297 				new ParameterizedRowMapper<String>() {
298 			@Override
299 			public String mapRow(ResultSet rs, int rowNum)
300 					throws SQLException {
301 				return rs.getString(1);
302 			}
303 		});
304 	}
305 
306 	/*
307 	 * (non-Javadoc)
308 	 *
309 	 * @seeorg.springframework.batch.core.repository.dao.JobInstanceDao#
310 	 * getLastJobInstances(java.lang.String, int)
311 	 */
312 	@Override
313 	@SuppressWarnings("rawtypes")
314 	public List<JobInstance> getJobInstances(String jobName, final int start,
315 			final int count) {
316 
317 		ResultSetExtractor extractor = new ResultSetExtractor() {
318 
319 			private List<JobInstance> list = new ArrayList<JobInstance>();
320 
321 			@Override
322 			public Object extractData(ResultSet rs) throws SQLException,
323 			DataAccessException {
324 				int rowNum = 0;
325 				while (rowNum < start && rs.next()) {
326 					rowNum++;
327 				}
328 				while (rowNum < start + count && rs.next()) {
329 					ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
330 					list.add(rowMapper.mapRow(rs, rowNum));
331 					rowNum++;
332 				}
333 				return list;
334 			}
335 
336 		};
337 
338 		@SuppressWarnings("unchecked")
339 		List<JobInstance> result = (List<JobInstance>) getJdbcTemplate().query(getQuery(FIND_LAST_JOBS_BY_NAME),
340 				new Object[] { jobName }, extractor);
341 
342 		return result;
343 	}
344 
345 	/*
346 	 * (non-Javadoc)
347 	 *
348 	 * @see
349 	 * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance
350 	 * (org.springframework.batch.core.JobExecution)
351 	 */
352 	@Override
353 	public JobInstance getJobInstance(JobExecution jobExecution) {
354 
355 		try {
356 			return getJdbcTemplate().queryForObject(
357 					getQuery(GET_JOB_FROM_EXECUTION_ID),
358 					new JobInstanceRowMapper(), jobExecution.getId());
359 		} catch (EmptyResultDataAccessException e) {
360 			return null;
361 		}
362 	}
363 
364 	/**
365 	 * Setter for {@link DataFieldMaxValueIncrementer} to be used when
366 	 * generating primary keys for {@link JobInstance} instances.
367 	 *
368 	 * @param jobIncrementer
369 	 *            the {@link DataFieldMaxValueIncrementer}
370 	 */
371 	public void setJobIncrementer(DataFieldMaxValueIncrementer jobIncrementer) {
372 		this.jobIncrementer = jobIncrementer;
373 	}
374 
375 	@Override
376 	public void afterPropertiesSet() throws Exception {
377 		super.afterPropertiesSet();
378 		Assert.notNull(jobIncrementer);
379 	}
380 
381 	/**
382 	 * @author Dave Syer
383 	 *
384 	 */
385 	private final class JobInstanceRowMapper implements
386 	ParameterizedRowMapper<JobInstance> {
387 
388 		private JobParameters jobParameters;
389 
390 		public JobInstanceRowMapper() {
391 		}
392 
393 		public JobInstanceRowMapper(JobParameters jobParameters) {
394 			this.jobParameters = jobParameters;
395 		}
396 
397 		@Override
398 		public JobInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
399 			Long id = rs.getLong(1);
400 			if (jobParameters == null) {
401 				jobParameters = getJobParameters(id);
402 			}
403 			JobInstance jobInstance = new JobInstance(rs.getLong(1),
404 					jobParameters, rs.getString(2));
405 			// should always be at version=0 because they never get updated
406 			jobInstance.incrementVersion();
407 			return jobInstance;
408 		}
409 	}
410 }