1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.io.UnsupportedEncodingException; |
20 | import java.math.BigInteger; |
21 | import java.security.MessageDigest; |
22 | import java.security.NoSuchAlgorithmException; |
23 | import java.sql.ResultSet; |
24 | import java.sql.SQLException; |
25 | import java.sql.Timestamp; |
26 | import java.sql.Types; |
27 | import java.util.ArrayList; |
28 | import java.util.Collections; |
29 | import java.util.HashMap; |
30 | import java.util.List; |
31 | import java.util.Map; |
32 | import java.util.Map.Entry; |
33 | |
34 | import org.springframework.batch.core.JobExecution; |
35 | import org.springframework.batch.core.JobInstance; |
36 | import org.springframework.batch.core.JobParameter; |
37 | import org.springframework.batch.core.JobParameters; |
38 | import org.springframework.batch.core.JobParameter.ParameterType; |
39 | import org.springframework.beans.factory.InitializingBean; |
40 | import org.springframework.dao.DataAccessException; |
41 | import org.springframework.dao.EmptyResultDataAccessException; |
42 | import org.springframework.jdbc.core.ResultSetExtractor; |
43 | import org.springframework.jdbc.core.RowCallbackHandler; |
44 | import org.springframework.jdbc.core.simple.ParameterizedRowMapper; |
45 | import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; |
46 | import org.springframework.util.Assert; |
47 | import org.springframework.util.StringUtils; |
48 | |
49 | /** |
50 | * Jdbc implementation of {@link JobInstanceDao}. Uses sequences (via Spring's |
51 | * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys |
52 | * before inserting a new row. Objects are checked to ensure all mandatory |
53 | * fields to be stored are not null. If any are found to be null, an |
54 | * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, |
55 | * however, the exception will be fairly vague, and fails to highlight which |
56 | * field caused the exception. |
57 | * |
58 | * @author Lucas Ward |
59 | * @author Dave Syer |
60 | * @author Robert Kasanicky |
61 | */ |
62 | public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements |
63 | JobInstanceDao, InitializingBean { |
64 | |
65 | private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)" |
66 | + " values (?, ?, ?, ?)"; |
67 | |
68 | private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_PARAMS(JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, " |
69 | + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL) values (?, ?, ?, ?, ?, ?, ?)"; |
70 | |
71 | private static final String FIND_JOBS_WITH_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ?"; |
72 | |
73 | private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME |
74 | + " and JOB_KEY = ?"; |
75 | |
76 | private static final String FIND_JOBS_WITH_EMPTY_KEY = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)"; |
77 | |
78 | private static final String GET_JOB_FROM_ID = "SELECT JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION from %PREFIX%JOB_INSTANCE where JOB_INSTANCE_ID = ?"; |
79 | |
80 | private static final String GET_JOB_FROM_EXECUTION_ID = "SELECT ji.JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, ji.VERSION from %PREFIX%JOB_INSTANCE ji, " |
81 | + "%PREFIX%JOB_EXECUTION je where JOB_EXECUTION_ID = ? and ji.JOB_INSTANCE_ID = je.JOB_INSTANCE_ID"; |
82 | |
83 | private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, " |
84 | + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL from %PREFIX%JOB_PARAMS where JOB_INSTANCE_ID = ?"; |
85 | |
86 | private static final String FIND_JOB_NAMES = "SELECT distinct JOB_NAME from %PREFIX%JOB_INSTANCE order by JOB_NAME"; |
87 | |
88 | private static final String FIND_LAST_JOBS_BY_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc"; |
89 | |
90 | private DataFieldMaxValueIncrementer jobIncrementer; |
91 | |
92 | /** |
93 | * In this jdbc implementation a job id is obtained by asking the |
94 | * jobIncrementer (which is likely a sequence) for the next long value, and |
95 | * then passing the Id and parameter values into an INSERT statement. |
96 | * |
97 | * @see JobInstanceDao#createJobInstance(String, JobParameters) |
98 | * @throws IllegalArgumentException |
99 | * if any {@link JobParameters} fields are null. |
100 | */ |
101 | public JobInstance createJobInstance(String jobName, |
102 | JobParameters jobParameters) { |
103 | |
104 | Assert.notNull(jobName, "Job name must not be null."); |
105 | Assert.notNull(jobParameters, "JobParameters must not be null."); |
106 | |
107 | Assert.state(getJobInstance(jobName, jobParameters) == null, |
108 | "JobInstance must not already exist"); |
109 | |
110 | Long jobId = jobIncrementer.nextLongValue(); |
111 | |
112 | JobInstance jobInstance = new JobInstance(jobId, jobParameters, jobName); |
113 | jobInstance.incrementVersion(); |
114 | |
115 | Object[] parameters = new Object[] { jobId, jobName, |
116 | createJobKey(jobParameters), jobInstance.getVersion() }; |
117 | getJdbcTemplate().getJdbcOperations().update( |
118 | getQuery(CREATE_JOB_INSTANCE), |
119 | parameters, |
120 | new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, |
121 | Types.INTEGER }); |
122 | |
123 | insertJobParameters(jobId, jobParameters); |
124 | |
125 | return jobInstance; |
126 | } |
127 | |
128 | protected String createJobKey(JobParameters jobParameters) { |
129 | |
130 | Map<String, JobParameter> props = jobParameters.getParameters(); |
131 | StringBuffer stringBuffer = new StringBuffer(); |
132 | List<String> keys = new ArrayList<String>(props.keySet()); |
133 | Collections.sort(keys); |
134 | for (String key : keys) { |
135 | JobParameter jobParameter = props.get(key); |
136 | String value = jobParameter.getValue()==null ? "" : jobParameter.toString(); |
137 | stringBuffer.append(key + "=" + value + ";"); |
138 | } |
139 | |
140 | MessageDigest digest; |
141 | try { |
142 | digest = MessageDigest.getInstance("MD5"); |
143 | } catch (NoSuchAlgorithmException e) { |
144 | throw new IllegalStateException( |
145 | "MD5 algorithm not available. Fatal (should be in the JDK)."); |
146 | } |
147 | |
148 | try { |
149 | byte[] bytes = digest.digest(stringBuffer.toString().getBytes( |
150 | "UTF-8")); |
151 | return String.format("%032x", new BigInteger(1, bytes)); |
152 | } catch (UnsupportedEncodingException e) { |
153 | throw new IllegalStateException( |
154 | "UTF-8 encoding not available. Fatal (should be in the JDK)."); |
155 | } |
156 | } |
157 | |
158 | /** |
159 | * Convenience method that inserts all parameters from the provided |
160 | * JobParameters. |
161 | * |
162 | */ |
163 | private void insertJobParameters(Long jobId, JobParameters jobParameters) { |
164 | |
165 | for (Entry<String, JobParameter> entry : jobParameters.getParameters() |
166 | .entrySet()) { |
167 | JobParameter jobParameter = entry.getValue(); |
168 | insertParameter(jobId, jobParameter.getType(), entry.getKey(), |
169 | jobParameter.getValue()); |
170 | } |
171 | } |
172 | |
173 | /** |
174 | * Convenience method that inserts an individual records into the |
175 | * JobParameters table. |
176 | */ |
177 | private void insertParameter(Long jobId, ParameterType type, String key, |
178 | Object value) { |
179 | |
180 | Object[] args = new Object[0]; |
181 | int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR, |
182 | Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT, |
183 | Types.DOUBLE }; |
184 | |
185 | if (type == ParameterType.STRING) { |
186 | args = new Object[] { jobId, key, type, value, new Timestamp(0L), |
187 | 0L, 0D }; |
188 | } else if (type == ParameterType.LONG) { |
189 | args = new Object[] { jobId, key, type, "", new Timestamp(0L), |
190 | value, new Double(0) }; |
191 | } else if (type == ParameterType.DOUBLE) { |
192 | args = new Object[] { jobId, key, type, "", new Timestamp(0L), 0L, |
193 | value }; |
194 | } else if (type == ParameterType.DATE) { |
195 | args = new Object[] { jobId, key, type, "", value, 0L, 0D }; |
196 | } |
197 | |
198 | getJdbcTemplate().getJdbcOperations().update( |
199 | getQuery(CREATE_JOB_PARAMETERS), args, argTypes); |
200 | } |
201 | |
202 | /** |
203 | * The job table is queried for <strong>any</strong> jobs that match the |
204 | * given identifier, adding them to a list via the RowMapper callback. |
205 | * |
206 | * @see JobInstanceDao#getJobInstance(String, JobParameters) |
207 | * @throws IllegalArgumentException |
208 | * if any {@link JobParameters} fields are null. |
209 | */ |
210 | public JobInstance getJobInstance(final String jobName, |
211 | final JobParameters jobParameters) { |
212 | |
213 | Assert.notNull(jobName, "Job name must not be null."); |
214 | Assert.notNull(jobParameters, "JobParameters must not be null."); |
215 | |
216 | String jobKey = createJobKey(jobParameters); |
217 | |
218 | ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper( |
219 | jobParameters); |
220 | |
221 | List<JobInstance> instances; |
222 | if (StringUtils.hasLength(jobKey)) { |
223 | instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY), |
224 | rowMapper, jobName, jobKey); |
225 | } else { |
226 | instances = getJdbcTemplate().query( |
227 | getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName, |
228 | jobKey); |
229 | } |
230 | |
231 | if (instances.isEmpty()) { |
232 | return null; |
233 | } else { |
234 | Assert.state(instances.size() == 1); |
235 | return instances.get(0); |
236 | } |
237 | } |
238 | |
239 | /* |
240 | * (non-Javadoc) |
241 | * |
242 | * @see |
243 | * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance |
244 | * (java.lang.Long) |
245 | */ |
246 | public JobInstance getJobInstance(Long instanceId) { |
247 | |
248 | try { |
249 | return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_ID), |
250 | new JobInstanceRowMapper(), instanceId); |
251 | } catch (EmptyResultDataAccessException e) { |
252 | return null; |
253 | } |
254 | |
255 | } |
256 | |
257 | /** |
258 | * @param instanceId |
259 | * @return |
260 | */ |
261 | private JobParameters getJobParameters(Long instanceId) { |
262 | final Map<String, JobParameter> map = new HashMap<String, JobParameter>(); |
263 | RowCallbackHandler handler = new RowCallbackHandler() { |
264 | public void processRow(ResultSet rs) throws SQLException { |
265 | ParameterType type = ParameterType.valueOf(rs.getString(3)); |
266 | JobParameter value = null; |
267 | if (type == ParameterType.STRING) { |
268 | value = new JobParameter(rs.getString(4)); |
269 | } else if (type == ParameterType.LONG) { |
270 | value = new JobParameter(rs.getLong(6)); |
271 | } else if (type == ParameterType.DOUBLE) { |
272 | value = new JobParameter(rs.getDouble(7)); |
273 | } else if (type == ParameterType.DATE) { |
274 | value = new JobParameter(rs.getTimestamp(5)); |
275 | } |
276 | // No need to assert that value is not null because it's an enum |
277 | map.put(rs.getString(2), value); |
278 | } |
279 | }; |
280 | getJdbcTemplate().getJdbcOperations().query( |
281 | getQuery(FIND_PARAMS_FROM_ID), new Object[] { instanceId }, |
282 | handler); |
283 | return new JobParameters(map); |
284 | } |
285 | |
286 | /* |
287 | * (non-Javadoc) |
288 | * |
289 | * @see |
290 | * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobNames |
291 | * () |
292 | */ |
293 | public List<String> getJobNames() { |
294 | return getJdbcTemplate().query(getQuery(FIND_JOB_NAMES), |
295 | new ParameterizedRowMapper<String>() { |
296 | public String mapRow(ResultSet rs, int rowNum) |
297 | throws SQLException { |
298 | return rs.getString(1); |
299 | } |
300 | }); |
301 | } |
302 | |
303 | /* |
304 | * (non-Javadoc) |
305 | * |
306 | * @seeorg.springframework.batch.core.repository.dao.JobInstanceDao# |
307 | * getLastJobInstances(java.lang.String, int) |
308 | */ |
309 | public List<JobInstance> getJobInstances(String jobName, final int start, |
310 | final int count) { |
311 | |
312 | ResultSetExtractor extractor = new ResultSetExtractor() { |
313 | |
314 | private List<JobInstance> list = new ArrayList<JobInstance>(); |
315 | |
316 | public Object extractData(ResultSet rs) throws SQLException, |
317 | DataAccessException { |
318 | int rowNum = 0; |
319 | while (rowNum < start && rs.next()) { |
320 | rowNum++; |
321 | } |
322 | while (rowNum < start + count && rs.next()) { |
323 | ParameterizedRowMapper<JobInstance> rowMapper = new JobInstanceRowMapper(); |
324 | list.add(rowMapper.mapRow(rs, rowNum)); |
325 | rowNum++; |
326 | } |
327 | return list; |
328 | } |
329 | |
330 | }; |
331 | |
332 | @SuppressWarnings("unchecked") |
333 | List<JobInstance> result = (List<JobInstance>) getJdbcTemplate() |
334 | .getJdbcOperations().query(getQuery(FIND_LAST_JOBS_BY_NAME), |
335 | new Object[] { jobName }, extractor); |
336 | |
337 | return result; |
338 | } |
339 | |
340 | /* |
341 | * (non-Javadoc) |
342 | * |
343 | * @see |
344 | * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance |
345 | * (org.springframework.batch.core.JobExecution) |
346 | */ |
347 | public JobInstance getJobInstance(JobExecution jobExecution) { |
348 | |
349 | try { |
350 | return getJdbcTemplate().queryForObject( |
351 | getQuery(GET_JOB_FROM_EXECUTION_ID), |
352 | new JobInstanceRowMapper(), jobExecution.getId()); |
353 | } catch (EmptyResultDataAccessException e) { |
354 | return null; |
355 | } |
356 | } |
357 | |
358 | /** |
359 | * Setter for {@link DataFieldMaxValueIncrementer} to be used when |
360 | * generating primary keys for {@link JobInstance} instances. |
361 | * |
362 | * @param jobIncrementer |
363 | * the {@link DataFieldMaxValueIncrementer} |
364 | */ |
365 | public void setJobIncrementer(DataFieldMaxValueIncrementer jobIncrementer) { |
366 | this.jobIncrementer = jobIncrementer; |
367 | } |
368 | |
369 | public void afterPropertiesSet() throws Exception { |
370 | super.afterPropertiesSet(); |
371 | Assert.notNull(jobIncrementer); |
372 | } |
373 | |
374 | /** |
375 | * @author Dave Syer |
376 | * |
377 | */ |
378 | private final class JobInstanceRowMapper implements |
379 | ParameterizedRowMapper<JobInstance> { |
380 | |
381 | private JobParameters jobParameters; |
382 | |
383 | public JobInstanceRowMapper() { |
384 | } |
385 | |
386 | public JobInstanceRowMapper(JobParameters jobParameters) { |
387 | this.jobParameters = jobParameters; |
388 | } |
389 | |
390 | public JobInstance mapRow(ResultSet rs, int rowNum) throws SQLException { |
391 | Long id = rs.getLong(1); |
392 | if (jobParameters == null) { |
393 | jobParameters = getJobParameters(id); |
394 | } |
395 | JobInstance jobInstance = new JobInstance(rs.getLong(1), |
396 | jobParameters, rs.getString(2)); |
397 | // should always be at version=0 because they never get updated |
398 | jobInstance.incrementVersion(); |
399 | return jobInstance; |
400 | } |
401 | } |
402 | } |