View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.item.database;
17  
18  import java.util.Map;
19  
20  import org.hibernate.ScrollableResults;
21  import org.hibernate.Session;
22  import org.hibernate.SessionFactory;
23  import org.hibernate.StatelessSession;
24  import org.springframework.batch.item.ExecutionContext;
25  import org.springframework.batch.item.ItemReader;
26  import org.springframework.batch.item.ItemStream;
27  import org.springframework.batch.item.ItemStreamException;
28  import org.springframework.batch.item.database.orm.HibernateQueryProvider;
29  import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
30  import org.springframework.beans.factory.InitializingBean;
31  import org.springframework.util.Assert;
32  import org.springframework.util.ClassUtils;
33  
34  /**
35   * {@link ItemReader} for reading database records built on top of Hibernate. It
36   * executes the HQL query when initialized iterates over the result set as
37   * {@link #read()} method is called, returning an object corresponding to
38   * current row. The query can be set directly using
39   * {@link #setQueryString(String)}, a named query can be used by
40   * {@link #setQueryName(String)}, or a query provider strategy can be supplied
41   * via {@link #setQueryProvider(HibernateQueryProvider)}.
42   *
43   *
44   * <p>
45   * The reader can be configured to use either {@link StatelessSession}
46   * sufficient for simple mappings without the need to cascade to associated
47   * objects or standard hibernate {@link Session} for more advanced mappings or
48   * when caching is desired. When stateful session is used it will be cleared in
49   * the {@link #update(ExecutionContext)} method without being flushed (no data
50   * modifications are expected).
51   * </p>
52   *
53   * The implementation is <b>not</b> thread-safe.
54   *
55   * @author Robert Kasanicky
56   * @author Dave Syer
57   */
58  public class HibernateCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements ItemStream,
59  InitializingBean {
60  
61  	private HibernateItemReaderHelper<T> helper = new HibernateItemReaderHelper<T>();
62  
63  	public HibernateCursorItemReader() {
64  		setName(ClassUtils.getShortName(HibernateCursorItemReader.class));
65  	}
66  
67  	private ScrollableResults cursor;
68  
69  	private boolean initialized = false;
70  
71  	private int fetchSize;
72  
73  	private Map<String, Object> parameterValues;
74  
75  	@Override
76  	public void afterPropertiesSet() throws Exception {
77  		Assert.state(fetchSize >= 0, "fetchSize must not be negative");
78  		helper.afterPropertiesSet();
79  	}
80  
81  	/**
82  	 * The parameter values to apply to a query (map of name:value).
83  	 *
84  	 * @param parameterValues the parameter values to set
85  	 */
86  	public void setParameterValues(Map<String, Object> parameterValues) {
87  		this.parameterValues = parameterValues;
88  	}
89  
90  	/**
91  	 * A query name for an externalized query. Either this or the {
92  	 * {@link #setQueryString(String) query string} or the {
93  	 * {@link #setQueryProvider(HibernateQueryProvider) query provider} should
94  	 * be set.
95  	 *
96  	 * @param queryName name of a hibernate named query
97  	 */
98  	public void setQueryName(String queryName) {
99  		helper.setQueryName(queryName);
100 	}
101 
102 	/**
103 	 * Fetch size used internally by Hibernate to limit amount of data fetched
104 	 * from database per round trip.
105 	 *
106 	 * @param fetchSize the fetch size to pass down to Hibernate
107 	 */
108 	public void setFetchSize(int fetchSize) {
109 		this.fetchSize = fetchSize;
110 	}
111 
112 	/**
113 	 * A query provider. Either this or the {{@link #setQueryString(String)
114 	 * query string} or the {{@link #setQueryName(String) query name} should be
115 	 * set.
116 	 *
117 	 * @param queryProvider Hibernate query provider
118 	 */
119 	public void setQueryProvider(HibernateQueryProvider queryProvider) {
120 		helper.setQueryProvider(queryProvider);
121 	}
122 
123 	/**
124 	 * A query string in HQL. Either this or the {
125 	 * {@link #setQueryProvider(HibernateQueryProvider) query provider} or the {
126 	 * {@link #setQueryName(String) query name} should be set.
127 	 *
128 	 * @param queryString HQL query string
129 	 */
130 	public void setQueryString(String queryString) {
131 		helper.setQueryString(queryString);
132 	}
133 
134 	/**
135 	 * The Hibernate SessionFactory to use the create a session.
136 	 *
137 	 * @param sessionFactory the {@link SessionFactory} to set
138 	 */
139 	public void setSessionFactory(SessionFactory sessionFactory) {
140 		helper.setSessionFactory(sessionFactory);
141 	}
142 
143 	/**
144 	 * Can be set only in uninitialized state.
145 	 *
146 	 * @param useStatelessSession <code>true</code> to use
147 	 * {@link StatelessSession} <code>false</code> to use standard hibernate
148 	 * {@link Session}
149 	 */
150 	public void setUseStatelessSession(boolean useStatelessSession) {
151 		helper.setUseStatelessSession(useStatelessSession);
152 	}
153 
154 	@Override
155 	protected T doRead() throws Exception {
156 		if (cursor.next()) {
157 			Object[] data = cursor.get();
158 
159 			if (data.length > 1) {
160 				// If there are multiple items this must be a projection
161 				// and T is an array type.
162 				@SuppressWarnings("unchecked")
163 				T item = (T) data;
164 				return item;
165 			}
166 			else {
167 				// Assume if there is only one item that it is the data the user
168 				// wants.
169 				// If there is only one item this is going to be a nasty shock
170 				// if T is an array type but there's not much else we can do...
171 				@SuppressWarnings("unchecked")
172 				T item = (T) data[0];
173 				return item;
174 			}
175 
176 		}
177 		return null;
178 	}
179 
180 	/**
181 	 * Open hibernate session and create a forward-only cursor for the query.
182 	 */
183 	@Override
184 	protected void doOpen() throws Exception {
185 		Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
186 		cursor = helper.getForwardOnlyCursor(fetchSize, parameterValues);
187 		initialized = true;
188 	}
189 
190 	/**
191 	 * Update the context and clear the session if stateful.
192 	 *
193 	 * @param executionContext the current {@link ExecutionContext}
194 	 * @throws ItemStreamException if there is a problem
195 	 */
196 	@Override
197 	public void update(ExecutionContext executionContext) throws ItemStreamException {
198 		super.update(executionContext);
199 		helper.clear();
200 	}
201 
202 	/**
203 	 * Wind forward through the result set to the item requested. Also clears
204 	 * the session every now and then (if stateful) to avoid memory problems.
205 	 * The frequency of session clearing is the larger of the fetch size (if
206 	 * set) and 100.
207 	 *
208 	 * @param itemIndex the first item to read
209 	 * @throws Exception if there is a problem
210 	 * @see AbstractItemCountingItemStreamItemReader#jumpToItem(int)
211 	 */
212 	@Override
213 	protected void jumpToItem(int itemIndex) throws Exception {
214 		int flushSize = Math.max(fetchSize, 100);
215 		helper.jumpToItem(cursor, itemIndex, flushSize);
216 	}
217 
218 	/**
219 	 * Close the cursor and hibernate session.
220 	 */
221 	@Override
222 	protected void doClose() throws Exception {
223 
224 		initialized = false;
225 
226 		if (cursor != null) {
227 			cursor.close();
228 		}
229 
230 		helper.close();
231 
232 	}
233 }