1 | package org.springframework.batch.core.repository.dao; |
2 | |
3 | import java.sql.ResultSet; |
4 | import java.sql.SQLException; |
5 | import java.sql.Timestamp; |
6 | import java.sql.Types; |
7 | import java.util.Iterator; |
8 | import java.util.List; |
9 | import java.util.Map; |
10 | import java.util.Map.Entry; |
11 | |
12 | import org.springframework.batch.core.Job; |
13 | import org.springframework.batch.core.JobInstance; |
14 | import org.springframework.batch.core.JobParameters; |
15 | import org.springframework.beans.factory.InitializingBean; |
16 | import org.springframework.jdbc.core.RowMapper; |
17 | import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; |
18 | import org.springframework.util.Assert; |
19 | import org.springframework.util.StringUtils; |
20 | |
21 | /** |
22 | * Jdbc implementation of {@link JobInstanceDao}. Uses sequences (via Spring's |
23 | * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys |
24 | * before inserting a new row. Objects are checked to ensure all mandatory |
25 | * fields to be stored are not null. If any are found to be null, an |
26 | * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, |
27 | * however, the exception will be fairly vague, and fails to highlight which |
28 | * field caused the exception. |
29 | * |
30 | * @author Lucas Ward |
31 | * @author Dave Syer |
32 | * @author Robert Kasanicky |
33 | */ |
34 | public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements JobInstanceDao, InitializingBean { |
35 | |
36 | private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)" |
37 | + " values (?, ?, ?, ?)"; |
38 | |
39 | private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_PARAMS(JOB_INSTANCE_ID, KEY_NAME, TYPE_CD, " |
40 | + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL) values (?, ?, ?, ?, ?, ?, ?)"; |
41 | |
42 | private static final String FIND_JOBS_WITH_KEY = "SELECT JOB_INSTANCE_ID from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?"; |
43 | |
44 | private static final String FIND_JOBS_WITH_EMPTY_KEY = |
45 | "SELECT JOB_INSTANCE_ID from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)"; |
46 | |
47 | private DataFieldMaxValueIncrementer jobIncrementer; |
48 | |
49 | /** |
50 | * In this jdbc implementation a job id is obtained by asking the |
51 | * jobIncrementer (which is likely a sequence) for the nextLong, and then |
52 | * passing the Id and parameter values into an INSERT statement. |
53 | * |
54 | * @see JobInstanceDao#createJobInstance(Job, JobParameters) |
55 | * @throws IllegalArgumentException if any {@link JobParameters} fields are |
56 | * null. |
57 | */ |
58 | public JobInstance createJobInstance(Job job, JobParameters jobParameters) { |
59 | |
60 | Assert.notNull(job, "Job must not be null."); |
61 | Assert.hasLength(job.getName(), "Job must have a name"); |
62 | Assert.notNull(jobParameters, "JobParameters must not be null."); |
63 | |
64 | Assert.state(getJobInstance(job, jobParameters) == null, "JobInstance must not already exist"); |
65 | |
66 | Long jobId = new Long(jobIncrementer.nextLongValue()); |
67 | |
68 | JobInstance jobInstance = new JobInstance(jobId, jobParameters, job.getName()); |
69 | jobInstance.incrementVersion(); |
70 | |
71 | Object[] parameters = new Object[] { jobId, job.getName(), createJobKey(jobParameters), jobInstance.getVersion() }; |
72 | getJdbcTemplate().update(getQuery(CREATE_JOB_INSTANCE), parameters, |
73 | new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER }); |
74 | |
75 | insertJobParameters(jobId, jobParameters); |
76 | |
77 | return jobInstance; |
78 | } |
79 | |
80 | private String createJobKey(JobParameters jobParameters) { |
81 | |
82 | Map props = jobParameters.getParameters(); |
83 | StringBuffer stringBuffer = new StringBuffer(); |
84 | for (Iterator it = props.entrySet().iterator(); it.hasNext();) { |
85 | Entry entry = (Entry) it.next(); |
86 | stringBuffer.append(entry.toString() + ";"); |
87 | } |
88 | |
89 | return stringBuffer.toString(); |
90 | } |
91 | |
92 | /** |
93 | * Convenience method that inserts all parameters from the provided |
94 | * JobParameters. |
95 | * |
96 | */ |
97 | private void insertJobParameters(Long jobId, JobParameters jobParameters) { |
98 | |
99 | Map parameters = jobParameters.getStringParameters(); |
100 | |
101 | if (!parameters.isEmpty()) { |
102 | for (Iterator it = parameters.entrySet().iterator(); it.hasNext();) { |
103 | Entry entry = (Entry) it.next(); |
104 | insertParameter(jobId, ParameterType.STRING, entry.getKey().toString(), entry.getValue()); |
105 | } |
106 | } |
107 | |
108 | parameters = jobParameters.getLongParameters(); |
109 | |
110 | if (!parameters.isEmpty()) { |
111 | for (Iterator it = parameters.entrySet().iterator(); it.hasNext();) { |
112 | Entry entry = (Entry) it.next(); |
113 | insertParameter(jobId, ParameterType.LONG, entry.getKey().toString(), entry.getValue()); |
114 | } |
115 | } |
116 | |
117 | parameters = jobParameters.getDoubleParameters(); |
118 | |
119 | if (!parameters.isEmpty()) { |
120 | for (Iterator it = parameters.entrySet().iterator(); it.hasNext();) { |
121 | Entry entry = (Entry) it.next(); |
122 | insertParameter(jobId, ParameterType.DOUBLE, entry.getKey().toString(), entry.getValue()); |
123 | } |
124 | } |
125 | |
126 | parameters = jobParameters.getDateParameters(); |
127 | |
128 | if (!parameters.isEmpty()) { |
129 | for (Iterator it = parameters.entrySet().iterator(); it.hasNext();) { |
130 | Entry entry = (Entry) it.next(); |
131 | insertParameter(jobId, ParameterType.DATE, entry.getKey().toString(), entry.getValue()); |
132 | } |
133 | } |
134 | } |
135 | |
136 | /** |
137 | * Convenience method that inserts an individual records into the |
138 | * JobParameters table. |
139 | */ |
140 | private void insertParameter(Long jobId, ParameterType type, String key, Object value) { |
141 | |
142 | Object[] args = new Object[0]; |
143 | int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, |
144 | Types.BIGINT, Types.DOUBLE }; |
145 | |
146 | if (type == ParameterType.STRING) { |
147 | args = new Object[] { jobId, key, type, value, new Timestamp(0L), new Long(0), new Double(0) }; |
148 | } |
149 | else if (type == ParameterType.LONG) { |
150 | args = new Object[] { jobId, key, type, "", new Timestamp(0L), value, new Double(0) }; |
151 | } |
152 | else if (type == ParameterType.DOUBLE) { |
153 | args = new Object[] { jobId, key, type, "", new Timestamp(0L), new Long(0), value }; |
154 | } |
155 | else if (type == ParameterType.DATE) { |
156 | args = new Object[] { jobId, key, type, "", value, new Long(0), new Double(0) }; |
157 | } |
158 | |
159 | getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes); |
160 | } |
161 | |
162 | /** |
163 | * The job table is queried for <strong>any</strong> jobs that match the |
164 | * given identifier, adding them to a list via the RowMapper callback. |
165 | * |
166 | * @see JobInstanceDao#getJobInstance(Job, JobParameters) |
167 | * @throws IllegalArgumentException if any {@link JobParameters} fields are |
168 | * null. |
169 | */ |
170 | public JobInstance getJobInstance(final Job job, final JobParameters jobParameters) { |
171 | |
172 | Assert.notNull(job, "Job must not be null."); |
173 | Assert.hasLength(job.getName(), "Job must have a name"); |
174 | Assert.notNull(jobParameters, "JobParameters must not be null."); |
175 | |
176 | String jobKey = createJobKey(jobParameters); |
177 | |
178 | Object[] parameters = new Object[] { job.getName(), jobKey }; |
179 | |
180 | RowMapper rowMapper = new RowMapper() { |
181 | public Object mapRow(ResultSet rs, int rowNum) throws SQLException { |
182 | JobInstance jobInstance = new JobInstance(new Long(rs.getLong(1)), jobParameters, job.getName()); |
183 | return jobInstance; |
184 | } |
185 | }; |
186 | |
187 | List instances; |
188 | if (StringUtils.hasLength(jobKey)) { |
189 | instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY), parameters, rowMapper); |
190 | } |
191 | else { |
192 | instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_EMPTY_KEY), parameters, rowMapper); |
193 | } |
194 | |
195 | if (instances.isEmpty()) { |
196 | return null; |
197 | } else { |
198 | Assert.state(instances.size() == 1); |
199 | return (JobInstance) instances.get(0); |
200 | } |
201 | } |
202 | |
203 | /** |
204 | * Setter for {@link DataFieldMaxValueIncrementer} to be used when |
205 | * generating primary keys for {@link JobInstance} instances. |
206 | * |
207 | * @param jobIncrementer the {@link DataFieldMaxValueIncrementer} |
208 | */ |
209 | public void setJobIncrementer(DataFieldMaxValueIncrementer jobIncrementer) { |
210 | this.jobIncrementer = jobIncrementer; |
211 | } |
212 | |
213 | public void afterPropertiesSet() throws Exception { |
214 | super.afterPropertiesSet(); |
215 | Assert.notNull(jobIncrementer); |
216 | } |
217 | |
218 | private static class ParameterType { |
219 | |
220 | private final String type; |
221 | |
222 | private ParameterType(String type) { |
223 | this.type = type; |
224 | } |
225 | |
226 | public String toString() { |
227 | return type; |
228 | } |
229 | |
230 | public static final ParameterType STRING = new ParameterType("STRING"); |
231 | |
232 | public static final ParameterType DATE = new ParameterType("DATE"); |
233 | |
234 | public static final ParameterType LONG = new ParameterType("LONG"); |
235 | |
236 | public static final ParameterType DOUBLE = new ParameterType("DOUBLE"); |
237 | |
238 | private static final ParameterType[] VALUES = { STRING, DATE, LONG, DOUBLE }; |
239 | |
240 | public static ParameterType getType(String typeAsString) { |
241 | |
242 | for (int i = 0; i < VALUES.length; i++) { |
243 | if (VALUES[i].toString().equals(typeAsString)) { |
244 | return (ParameterType) VALUES[i]; |
245 | } |
246 | } |
247 | |
248 | return null; |
249 | } |
250 | } |
251 | } |