1 | package org.springframework.batch.core.repository.dao; |
2 | |
3 | import java.sql.ResultSet; |
4 | import java.sql.SQLException; |
5 | import java.sql.Types; |
6 | import java.util.List; |
7 | |
8 | import org.apache.commons.logging.Log; |
9 | import org.apache.commons.logging.LogFactory; |
10 | import org.springframework.batch.core.BatchStatus; |
11 | import org.springframework.batch.core.JobExecution; |
12 | import org.springframework.batch.core.JobInstance; |
13 | import org.springframework.batch.repeat.ExitStatus; |
14 | import org.springframework.beans.factory.InitializingBean; |
15 | import org.springframework.jdbc.core.RowMapper; |
16 | import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; |
17 | import org.springframework.util.Assert; |
18 | |
19 | /** |
20 | * Jdbc implementation of {@link JobExecutionDao}. Uses sequences (via Spring's |
21 | * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys |
22 | * before inserting a new row. Objects are checked to ensure all mandatory |
23 | * fields to be stored are not null. If any are found to be null, an |
24 | * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, |
25 | * however, the exception will be fairly vague, and fails to highlight which |
26 | * field caused the exception. |
27 | * |
28 | * @author Lucas Ward |
29 | * @author Dave Syer |
30 | * @author Robert Kasanicky |
31 | */ |
32 | public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean { |
33 | |
34 | private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class); |
35 | |
36 | private static final int EXIT_MESSAGE_LENGTH = 250; |
37 | |
38 | private static final String GET_JOB_EXECUTION_COUNT = "SELECT count(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION " |
39 | + "where JOB_INSTANCE_ID = ?"; |
40 | |
41 | private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, " |
42 | + "END_TIME, STATUS, CONTINUABLE, EXIT_CODE, EXIT_MESSAGE, VERSION) values (?, ?, ?, ?, ?, ?, ?, ?, ?)"; |
43 | |
44 | private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?"; |
45 | |
46 | private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, " |
47 | + " STATUS = ?, CONTINUABLE = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ? where JOB_EXECUTION_ID = ?"; |
48 | |
49 | private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, CONTINUABLE, EXIT_CODE, EXIT_MESSAGE from %PREFIX%JOB_EXECUTION" |
50 | + " where JOB_INSTANCE_ID = ?"; |
51 | |
52 | private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, CONTINUABLE, EXIT_CODE, EXIT_MESSAGE from %PREFIX%JOB_EXECUTION" |
53 | + " where JOB_INSTANCE_ID = ? and START_TIME = (SELECT max(START_TIME) from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ?)"; |
54 | |
55 | private DataFieldMaxValueIncrementer jobExecutionIncrementer; |
56 | |
57 | public List findJobExecutions(final JobInstance job) { |
58 | |
59 | Assert.notNull(job, "Job cannot be null."); |
60 | Assert.notNull(job.getId(), "Job Id cannot be null."); |
61 | |
62 | return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new Object[] { job.getId() }, |
63 | new JobExecutionRowMapper(job)); |
64 | } |
65 | |
66 | /** |
67 | * @see JobExecutionDao#getJobExecutionCount(JobInstance) |
68 | * @throws IllegalArgumentException if jobId is null. |
69 | */ |
70 | public int getJobExecutionCount(JobInstance jobInstance) { |
71 | Long jobId = jobInstance.getId(); |
72 | Assert.notNull(jobId, "JobId cannot be null"); |
73 | |
74 | Object[] parameters = new Object[] { jobId }; |
75 | |
76 | return getJdbcTemplate().queryForInt(getQuery(GET_JOB_EXECUTION_COUNT), parameters); |
77 | } |
78 | |
79 | /** |
80 | * |
81 | * SQL implementation using Sequences via the Spring incrementer |
82 | * abstraction. Once a new id has been obtained, the JobExecution is saved |
83 | * via a SQL INSERT statement. |
84 | * |
85 | * @see JobExecutionDao#saveJobExecution(JobExecution) |
86 | * @throws IllegalArgumentException if jobExecution is null, as well as any |
87 | * of it's fields to be persisted. |
88 | */ |
89 | public void saveJobExecution(JobExecution jobExecution) { |
90 | |
91 | validateJobExecution(jobExecution); |
92 | |
93 | jobExecution.incrementVersion(); |
94 | |
95 | jobExecution.setId(new Long(jobExecutionIncrementer.nextLongValue())); |
96 | Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), |
97 | jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(), |
98 | jobExecution.getExitStatus().isContinuable() ? "Y" : "N", jobExecution.getExitStatus().getExitCode(), |
99 | jobExecution.getExitStatus().getExitDescription(), jobExecution.getVersion() }; |
100 | getJdbcTemplate().update( |
101 | getQuery(SAVE_JOB_EXECUTION), |
102 | parameters, |
103 | new int[] { Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.CHAR, |
104 | Types.VARCHAR, Types.VARCHAR, Types.INTEGER }); |
105 | } |
106 | |
107 | /** |
108 | * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and |
109 | * Status cannot be null. |
110 | * |
111 | * @param jobExecution |
112 | * @throws IllegalArgumentException |
113 | */ |
114 | private void validateJobExecution(JobExecution jobExecution) { |
115 | |
116 | Assert.notNull(jobExecution); |
117 | Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null."); |
118 | Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null."); |
119 | } |
120 | |
121 | /** |
122 | * Update given JobExecution using a SQL UPDATE statement. The JobExecution |
123 | * is first checked to ensure all fields are not null, and that it has an |
124 | * ID. The database is then queried to ensure that the ID exists, which |
125 | * ensures that it is valid. |
126 | * |
127 | * @see JobExecutionDao#updateJobExecution(JobExecution) |
128 | */ |
129 | public void updateJobExecution(JobExecution jobExecution) { |
130 | |
131 | validateJobExecution(jobExecution); |
132 | |
133 | jobExecution.incrementVersion(); |
134 | |
135 | String exitDescription = jobExecution.getExitStatus().getExitDescription(); |
136 | if (exitDescription != null && exitDescription.length() > EXIT_MESSAGE_LENGTH) { |
137 | exitDescription = exitDescription.substring(0, EXIT_MESSAGE_LENGTH); |
138 | logger.debug("Truncating long message before update of JobExecution: " + jobExecution); |
139 | } |
140 | Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(), |
141 | jobExecution.getStatus().toString(), jobExecution.getExitStatus().isContinuable() ? "Y" : "N", |
142 | jobExecution.getExitStatus().getExitCode(), exitDescription, jobExecution.getVersion(), |
143 | jobExecution.getId() }; |
144 | |
145 | if (jobExecution.getId() == null) { |
146 | throw new IllegalArgumentException("JobExecution ID cannot be null. JobExecution must be saved " |
147 | + "before it can be updated."); |
148 | } |
149 | |
150 | // Check if given JobExecution's Id already exists, if none is found it |
151 | // is invalid and |
152 | // an exception should be thrown. |
153 | if (getJdbcTemplate().queryForInt(getQuery(CHECK_JOB_EXECUTION_EXISTS), new Object[] { jobExecution.getId() }) != 1) { |
154 | throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found."); |
155 | } |
156 | |
157 | getJdbcTemplate().update( |
158 | getQuery(UPDATE_JOB_EXECUTION), |
159 | parameters, |
160 | new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.CHAR, Types.VARCHAR, Types.VARCHAR, |
161 | Types.INTEGER, Types.INTEGER }); |
162 | } |
163 | |
164 | /** |
165 | * Setter for {@link DataFieldMaxValueIncrementer} to be used when |
166 | * generating primary keys for {@link JobExecution} instances. |
167 | * |
168 | * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer} |
169 | */ |
170 | public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { |
171 | this.jobExecutionIncrementer = jobExecutionIncrementer; |
172 | } |
173 | |
174 | public void afterPropertiesSet() throws Exception { |
175 | super.afterPropertiesSet(); |
176 | Assert.notNull(jobExecutionIncrementer); |
177 | } |
178 | |
179 | /** |
180 | * Re-usable mapper for {@link JobExecution} instances. |
181 | * |
182 | * @author Dave Syer |
183 | * |
184 | */ |
185 | private static class JobExecutionRowMapper implements RowMapper { |
186 | |
187 | private JobInstance job; |
188 | |
189 | public JobExecutionRowMapper(JobInstance job) { |
190 | super(); |
191 | this.job = job; |
192 | } |
193 | |
194 | public Object mapRow(ResultSet rs, int rowNum) throws SQLException { |
195 | JobExecution jobExecution = new JobExecution(job); |
196 | jobExecution.setId(new Long(rs.getLong(1))); |
197 | jobExecution.setStartTime(rs.getTimestamp(2)); |
198 | jobExecution.setEndTime(rs.getTimestamp(3)); |
199 | jobExecution.setStatus(BatchStatus.getStatus(rs.getString(4))); |
200 | jobExecution.setExitStatus(new ExitStatus("Y".equals(rs.getString(5)), rs.getString(6), rs.getString(7))); |
201 | return jobExecution; |
202 | } |
203 | |
204 | } |
205 | |
206 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
207 | |
208 | Long id = jobInstance.getId(); |
209 | |
210 | List executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION), new Object[] { id, id }, |
211 | new JobExecutionRowMapper(jobInstance)); |
212 | |
213 | Assert.state(executions.size() <= 1, "There must be at most one latest job execution"); |
214 | |
215 | if (executions.isEmpty()) { |
216 | return null; |
217 | } |
218 | else { |
219 | return (JobExecution) executions.get(0); |
220 | } |
221 | } |
222 | |
223 | } |