View Javadoc
1   /*
2    * Copyright 2006-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.admin.service;
17  
18  import java.sql.ResultSet;
19  import java.sql.SQLException;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  
26  import javax.sql.DataSource;
27  
28  import org.springframework.batch.core.BatchStatus;
29  import org.springframework.batch.core.ExitStatus;
30  import org.springframework.batch.core.JobExecution;
31  import org.springframework.batch.core.JobInstance;
32  import org.springframework.batch.core.JobParameters;
33  import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao;
34  import org.springframework.batch.item.database.Order;
35  import org.springframework.batch.item.database.PagingQueryProvider;
36  import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
37  import org.springframework.dao.IncorrectResultSizeDataAccessException;
38  import org.springframework.jdbc.core.JdbcTemplate;
39  import org.springframework.jdbc.core.RowMapper;
40  import org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer;
41  import org.springframework.util.Assert;
42  
43  /**
44   * @author Dave Syer
45   * @author Michael Minella
46   * 
47   */
48  public class JdbcSearchableJobExecutionDao extends JdbcJobExecutionDao implements SearchableJobExecutionDao {
49  
50  	private static final String GET_COUNT = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION";
51  
52  	private static final String GET_COUNT_BY_JOB_NAME = "SELECT COUNT(1) from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I "
53  			+ "where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=?";
54  
55  	private static final String FIELDS = "E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, "
56  			+ "E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, I.JOB_INSTANCE_ID, I.JOB_NAME";
57  
58  	private static final String GET_RUNNING_EXECUTIONS = "SELECT " + FIELDS
59  			+ " from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I "
60  			+ "where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and E.END_TIME is NULL";
61  
62  	private PagingQueryProvider allExecutionsPagingQueryProvider;
63  
64  	private PagingQueryProvider byJobNamePagingQueryProvider;
65  
66  	private DataSource dataSource;
67  
68  	/**
69  	 * @param dataSource the dataSource to set
70  	 */
71  	public void setDataSource(DataSource dataSource) {
72  		this.dataSource = dataSource;
73  	}
74  
75  	/**
76  	 * @see JdbcJobExecutionDao#afterPropertiesSet()
77  	 */
78  	@Override
79  	public void afterPropertiesSet() throws Exception {
80  
81  		Assert.state(dataSource != null, "DataSource must be provided");
82  
83  		if (getJdbcTemplate() == null) {
84  			setJdbcTemplate(new JdbcTemplate(dataSource));
85  		}
86  		setJobExecutionIncrementer(new AbstractDataFieldMaxValueIncrementer() {
87  			@Override
88  			protected long getNextKey() {
89  				return 0;
90  			}
91  		});
92  
93  		allExecutionsPagingQueryProvider = getPagingQueryProvider();
94  		byJobNamePagingQueryProvider = getPagingQueryProvider("I.JOB_NAME=?");
95  
96  		super.afterPropertiesSet();
97  
98  	}
99  
100 	/**
101 	 * @return a {@link PagingQueryProvider} for all job executions
102 	 * @throws Exception
103 	 */
104 	private PagingQueryProvider getPagingQueryProvider() throws Exception {
105 		return getPagingQueryProvider(null);
106 	}
107 
108 	/**
109 	 * @return a {@link PagingQueryProvider} for all job executions with the
110 	 * provided where clause
111 	 * @throws Exception
112 	 */
113 	private PagingQueryProvider getPagingQueryProvider(String whereClause) throws Exception {
114 		return getPagingQueryProvider(null, whereClause);
115 	}
116 
117 	/**
118 	 * @return a {@link PagingQueryProvider} with a where clause to narrow the
119 	 * query
120 	 * @throws Exception
121 	 */
122 	private PagingQueryProvider getPagingQueryProvider(String fromClause, String whereClause) throws Exception {
123 		SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
124 		factory.setDataSource(dataSource);
125 		fromClause = "%PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I" + (fromClause == null ? "" : ", " + fromClause);
126 		factory.setFromClause(getQuery(fromClause));
127 		factory.setSelectClause(FIELDS);
128 		Map<String, Order> sortKeys = new HashMap<String, Order>();
129 		sortKeys.put("JOB_EXECUTION_ID", Order.DESCENDING);
130 		factory.setSortKeys(sortKeys);
131 		whereClause = "E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID" + (whereClause == null ? "" : " and " + whereClause);
132 		factory.setWhereClause(whereClause);
133 
134 		return (PagingQueryProvider) factory.getObject();
135 	}
136 
137 	/**
138 	 * @see SearchableJobExecutionDao#countJobExecutions()
139 	 */
140 	@Override
141 	public int countJobExecutions() {
142 		return getJdbcTemplate().queryForObject(getQuery(GET_COUNT), Integer.class);
143 	}
144 
145 	/**
146 	 * @see SearchableJobExecutionDao#countJobExecutions(String)
147 	 */
148 	@Override
149 	public int countJobExecutions(String jobName) {
150 		return getJdbcTemplate().queryForObject(getQuery(GET_COUNT_BY_JOB_NAME), Integer.class, jobName);
151 	}
152 
153 	/**
154 	 * @see SearchableJobExecutionDao#getRunningJobExecutions()
155 	 */
156 	@Override
157 	public Collection<JobExecution> getRunningJobExecutions() {
158 		return getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new JobExecutionRowMapper());
159 	}
160 
161 	/**
162 	 * @see SearchableJobExecutionDao#getJobExecutions(String, int, int)
163 	 */
164 	@Override
165 	public List<JobExecution> getJobExecutions(String jobName, int start, int count) {
166 		if (start <= 0) {
167 			return getJdbcTemplate().query(byJobNamePagingQueryProvider.generateFirstPageQuery(count),
168 					new JobExecutionRowMapper(), jobName);
169 		}
170 		try {
171 			Long startAfterValue = getJdbcTemplate().queryForObject(
172 					byJobNamePagingQueryProvider.generateJumpToItemQuery(start, count), Long.class, jobName);
173 			return getJdbcTemplate().query(byJobNamePagingQueryProvider.generateRemainingPagesQuery(count),
174 					new JobExecutionRowMapper(), jobName, startAfterValue);
175 		}
176 		catch (IncorrectResultSizeDataAccessException e) {
177 			return Collections.emptyList();
178 		}
179 	}
180 
181 	/**
182 	 * @see SearchableJobExecutionDao#getJobExecutions(int, int)
183 	 */
184 	@Override
185 	public List<JobExecution> getJobExecutions(int start, int count) {
186 		if (start <= 0) {
187 			return getJdbcTemplate().query(allExecutionsPagingQueryProvider.generateFirstPageQuery(count),
188 					new JobExecutionRowMapper());
189 		}
190 		try {
191 			Long startAfterValue = getJdbcTemplate().queryForObject(
192 					allExecutionsPagingQueryProvider.generateJumpToItemQuery(start, count), Long.class);
193 			return getJdbcTemplate().query(allExecutionsPagingQueryProvider.generateRemainingPagesQuery(count),
194 					new JobExecutionRowMapper(), startAfterValue);
195 		}
196 		catch (IncorrectResultSizeDataAccessException e) {
197 			return Collections.emptyList();
198 		}
199 	}
200 
201 	@Override
202 	public void saveJobExecution(JobExecution jobExecution) {
203 		throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
204 	}
205 
206 	@Override
207 	public void synchronizeStatus(JobExecution jobExecution) {
208 		throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
209 	}
210 
211 	@Override
212 	public void updateJobExecution(JobExecution jobExecution) {
213 		throw new UnsupportedOperationException("SearchableJobExecutionDao is read only");
214 	}
215 
216 	/**
217 	 * Re-usable mapper for {@link JobExecution} instances.
218 	 * 
219 	 * @author Dave Syer
220 	 * 
221 	 */
222 	protected class JobExecutionRowMapper implements RowMapper<JobExecution> {
223 
224 		public JobExecutionRowMapper() {
225 		}
226 
227 		@Override
228 		public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
229 			Long id = rs.getLong(1);
230 			JobExecution jobExecution;
231 			
232 			JobParameters jobParameters = getJobParameters(id);
233 
234 			JobInstance jobInstance = new JobInstance(rs.getLong(10), rs.getString(11));
235 			jobExecution = new JobExecution(jobInstance, jobParameters);
236 			jobExecution.setId(id);
237 
238 			jobExecution.setStartTime(rs.getTimestamp(2));
239 			jobExecution.setEndTime(rs.getTimestamp(3));
240 			jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4)));
241 			jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6)));
242 			jobExecution.setCreateTime(rs.getTimestamp(7));
243 			jobExecution.setLastUpdated(rs.getTimestamp(8));
244 			jobExecution.setVersion(rs.getInt(9));
245 			return jobExecution;
246 		}
247 
248 	}
249 }