1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.admin.service;
17
18 import java.sql.ResultSet;
19 import java.sql.SQLException;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25
26 import javax.sql.DataSource;
27
28 import org.springframework.batch.core.BatchStatus;
29 import org.springframework.batch.core.ExitStatus;
30 import org.springframework.batch.core.JobExecution;
31 import org.springframework.batch.core.JobInstance;
32 import org.springframework.batch.core.JobParameters;
33 import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao;
34 import org.springframework.batch.item.database.Order;
35 import org.springframework.batch.item.database.PagingQueryProvider;
36 import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
37 import org.springframework.dao.IncorrectResultSizeDataAccessException;
38 import org.springframework.jdbc.core.JdbcTemplate;
39 import org.springframework.jdbc.core.RowMapper;
40 import org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer;
41 import org.springframework.util.Assert;
42
43
44
45
46
47
48 public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implements SearchableJobExecutionDao {
49
50 private static final String GET_COUNT = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION";
51
52 private static final String GET_COUNT_BY_JOB_NAME = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I "
53 + "where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=?";
54
55 private static final String FIELDS = "E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, "
56 + "E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, I.JOB_INSTANCE_ID, I.JOB_NAME";
57
58 private static final String GET_RUNNING_EXECUTIONS = "SELECT " + FIELDS
59 + " from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I "
60 + "where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and E.END_TIME is NULL";
61
62 private PagingQueryProvider allExecutionsPagingQueryProvider;
63
64 private PagingQueryProvider byJobNamePagingQueryProvider;
65
66 private DataSource dataSource;
67
68
69
70
71 public void setDataSource(DataSource dataSource) {
72 this.dataSource = dataSource;
73 }
74
75
76
77
78 @Override
79 public void afterPropertiesSet() throws Exception {
80
81 Assert.state(dataSource != null, "DataSource must be provided");
82
83 if (getJdbcTemplate() == null) {
84 setJdbcTemplate(new JdbcTemplate(dataSource));
85 }
86 setJobExecutionIncrementer(new AbstractDataFieldMaxValueIncrementer() {
87 @Override
88 protected long getNextKey() {
89 return 0;
90 }
91 });
92
93 allExecutionsPagingQueryProvider = getPagingQueryProvider();
94 byJobNamePagingQueryProvider = getPagingQueryProvider("I.JOB_NAME=?");
95
96 super.afterPropertiesSet();
97
98 }
99
100
101
102
103
104 private PagingQueryProvider getPagingQueryProvider() throws Exception {
105 return getPagingQueryProvider(null);
106 }
107
108
109
110
111
112
113 private PagingQueryProvider getPagingQueryProvider(String whereClause) throws Exception {
114 return getPagingQueryProvider(null, whereClause);
115 }
116
117
118
119
120
121
122 private PagingQueryProvider getPagingQueryProvider(String fromClause, String whereClause) throws Exception {
123 SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
124 factory.setDataSource(dataSource);
125 fromClause = "%PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I" + (fromClause == null ? "" : ", " + fromClause);
126 factory.setFromClause(getQuery(fromClause));
127 factory.setSelectClause(FIELDS);
128 Map<String, Order> sortKeys = new HashMap<String, Order>();
129 sortKeys.put("JOB_EXECUTION_ID", Order.DESCENDING);
130 factory.setSortKeys(sortKeys);
131 whereClause = "E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID" + (whereClause == null ? "" : " and " + whereClause);
132 factory.setWhereClause(whereClause);
133
134 return (PagingQueryProvider) factory.getObject();
135 }
136
137
138
139
140 @Override
141 public int countJobExecutions() {
142 return getJdbcTemplate().queryForObject(getQuery(GET_COUNT), Integer.class);
143 }
144
145
146
147
148 @Override
149 public int countJobExecutions(String jobName) {
150 return getJdbcTemplate().queryForObject(getQuery(GET_COUNT_BY_JOB_NAME), Integer.class, jobName);
151 }
152
153
154
155
156 @Override
157 public Collection<JobExecution> getRunningJobExecutions() {
158 return getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new JobExecutionRowMapper());
159 }
160
161
162
163
164 @Override
165 public List<JobExecution> getJobExecutions(String jobName, int start, int count) {
166 if (start <= 0) {
167 return getJdbcTemplate().query(byJobNamePagingQueryProvider.generateFirstPageQuery(count),
168 new JobExecutionRowMapper(), jobName);
169 }
170 try {
171 Long startAfterValue = getJdbcTemplate().queryForObject(
172 byJobNamePagingQueryProvider.generateJumpToItemQuery(start, count), Long.class, jobName);
173 return getJdbcTemplate().query(byJobNamePagingQueryProvider.generateRemainingPagesQuery(count),
174 new JobExecutionRowMapper(), jobName, startAfterValue);
175 }
176 catch (IncorrectResultSizeDataAccessException e) {
177 return Collections.emptyList();
178 }
179 }
180
181
182
183
184 @Override
185 public List<JobExecution> getJobExecutions(int start, int count) {
186 if (start <= 0) {
187 return getJdbcTemplate().query(allExecutionsPagingQueryProvider.generateFirstPageQuery(count),
188 new JobExecutionRowMapper());
189 }
190 try {
191 Long startAfterValue = getJdbcTemplate().queryForObject(
192 allExecutionsPagingQueryProvider.generateJumpToItemQuery(start, count), Long.class);
193 return getJdbcTemplate().query(allExecutionsPagingQueryProvider.generateRemainingPagesQuery(count),
194 new JobExecutionRowMapper(), startAfterValue);
195 }
196 catch (IncorrectResultSizeDataAccessException e) {
197 return Collections.emptyList();
198 }
199 }
200
201 @Override
202 public void saveJobExecution(JobExecution jobExecution) {
203 throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
204 }
205
206 @Override
207 public void synchronizeStatus(JobExecution jobExecution) {
208 throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
209 }
210
211 @Override
212 public void updateJobExecution(JobExecution jobExecution) {
213 throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
214 }
215
216
217
218
219
220
221
222 protected class JobExecutionRowMapper implements RowMapper<JobExecution> {
223
224 public JobExecutionRowMapper() {
225 }
226
227 @Override
228 public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
229 Long id = rs.getLong(1);
230 JobExecution jobExecution;
231
232 JobParameters jobParameters = getJobParameters(id);
233
234 JobInstance jobInstance = new JobInstance(rs.getLong(10), rs.getString(11));
235 jobExecution = new JobExecution(jobInstance, jobParameters);
236 jobExecution.setId(id);
237
238 jobExecution.setStartTime(rs.getTimestamp(2));
239 jobExecution.setEndTime(rs.getTimestamp(3));
240 jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4)));
241 jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6)));
242 jobExecution.setCreateTime(rs.getTimestamp(7));
243 jobExecution.setLastUpdated(rs.getTimestamp(8));
244 jobExecution.setVersion(rs.getInt(9));
245 return jobExecution;
246 }
247
248 }
249 }