1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.item.database; |
17 | |
18 | import org.hibernate.ScrollableResults; |
19 | import org.hibernate.Session; |
20 | import org.hibernate.SessionFactory; |
21 | import org.hibernate.StatelessSession; |
22 | import org.springframework.batch.item.ExecutionContext; |
23 | import org.springframework.batch.item.ExecutionContextUserSupport; |
24 | import org.springframework.batch.item.ItemReader; |
25 | import org.springframework.batch.item.ItemStream; |
26 | import org.springframework.beans.factory.InitializingBean; |
27 | import org.springframework.util.Assert; |
28 | import org.springframework.util.ClassUtils; |
29 | |
30 | /** |
31 | * {@link ItemReader} for reading database records built on top of Hibernate. |
32 | * |
33 | * It executes the HQL {@link #setQueryString(String)} when initialized and |
34 | * iterates over the result set as {@link #read()} method is called, returning |
35 | * an object corresponding to current row. |
36 | * |
37 | * Input source can be configured to use either {@link StatelessSession} |
38 | * sufficient for simple mappings without the need to cascade to associated |
39 | * objects or standard hibernate {@link Session} for more advanced mappings or |
40 | * when caching is desired. |
41 | * |
42 | * When stateful session is used it will be cleared after successful commit |
43 | * without being flushed (no inserts or updates are expected). |
44 | * |
45 | * @author Robert Kasanicky |
46 | * @author Dave Syer |
47 | */ |
48 | public class HibernateCursorItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream, |
49 | InitializingBean { |
50 | |
51 | private static final String RESTART_DATA_ROW_NUMBER_KEY = "row.number"; |
52 | |
53 | private SessionFactory sessionFactory; |
54 | |
55 | private StatelessSession statelessSession; |
56 | |
57 | private Session statefulSession; |
58 | |
59 | private ScrollableResults cursor; |
60 | |
61 | private String queryString; |
62 | |
63 | private boolean useStatelessSession = true; |
64 | |
65 | private int lastCommitRowNumber = 0; |
66 | |
67 | /* Current count of processed records. */ |
68 | private int currentProcessedRow = 0; |
69 | |
70 | private boolean initialized = false; |
71 | |
72 | private boolean saveState = false; |
73 | |
74 | public HibernateCursorItemReader() { |
75 | setName(ClassUtils.getShortName(HibernateCursorItemReader.class)); |
76 | } |
77 | |
78 | public Object read() { |
79 | |
80 | if (cursor.next()) { |
81 | currentProcessedRow++; |
82 | Object[] data = cursor.get(); |
83 | if (data.length > 1) { |
84 | return data; |
85 | } |
86 | return data[0]; |
87 | } |
88 | return null; |
89 | } |
90 | |
91 | /** |
92 | * Closes the result set cursor and hibernate session. |
93 | */ |
94 | public void close(ExecutionContext executionContext) { |
95 | initialized = false; |
96 | if (cursor != null) { |
97 | cursor.close(); |
98 | } |
99 | currentProcessedRow = 0; |
100 | if (useStatelessSession) { |
101 | if (statelessSession != null) { |
102 | statelessSession.close(); |
103 | } |
104 | } |
105 | else { |
106 | if (statefulSession != null) { |
107 | statefulSession.close(); |
108 | } |
109 | } |
110 | } |
111 | |
112 | /** |
113 | * Creates cursor for the query. |
114 | */ |
115 | public void open(ExecutionContext executionContext) { |
116 | Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); |
117 | |
118 | if (useStatelessSession) { |
119 | statelessSession = sessionFactory.openStatelessSession(); |
120 | cursor = statelessSession.createQuery(queryString).scroll(); |
121 | } |
122 | else { |
123 | statefulSession = sessionFactory.openSession(); |
124 | cursor = statefulSession.createQuery(queryString).scroll(); |
125 | } |
126 | initialized = true; |
127 | |
128 | if (executionContext.containsKey(getKey(RESTART_DATA_ROW_NUMBER_KEY))) { |
129 | currentProcessedRow = Integer.parseInt(executionContext.getString(getKey(RESTART_DATA_ROW_NUMBER_KEY))); |
130 | cursor.setRowNumber(currentProcessedRow - 1); |
131 | } |
132 | |
133 | } |
134 | |
135 | /** |
136 | * @param sessionFactory hibernate session factory |
137 | */ |
138 | public void setSessionFactory(SessionFactory sessionFactory) { |
139 | this.sessionFactory = sessionFactory; |
140 | } |
141 | |
142 | public void afterPropertiesSet() throws Exception { |
143 | Assert.notNull(sessionFactory); |
144 | Assert.hasLength(queryString); |
145 | } |
146 | |
147 | /** |
148 | * @param queryString HQL query string |
149 | */ |
150 | public void setQueryString(String queryString) { |
151 | this.queryString = queryString; |
152 | } |
153 | |
154 | /** |
155 | * Can be set only in uninitialized state. |
156 | * |
157 | * @param useStatelessSession <code>true</code> to use |
158 | * {@link StatelessSession} <code>false</code> to use standard hibernate |
159 | * {@link Session} |
160 | */ |
161 | public void setUseStatelessSession(boolean useStatelessSession) { |
162 | Assert.state(!initialized); |
163 | this.useStatelessSession = useStatelessSession; |
164 | } |
165 | |
166 | /** |
167 | */ |
168 | public void update(ExecutionContext executionContext) { |
169 | if (saveState) { |
170 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
171 | executionContext.putString(getKey(RESTART_DATA_ROW_NUMBER_KEY), "" + currentProcessedRow); |
172 | } |
173 | } |
174 | |
175 | /** |
176 | * Mark is supported as long as this {@link ItemStream} is used in a |
177 | * single-threaded environment. The state backing the mark is a single |
178 | * counter, keeping track of the current position, so multiple threads |
179 | * cannot be accommodated. |
180 | * |
181 | * @see org.springframework.batch.item.ItemReader#mark() |
182 | */ |
183 | public void mark() { |
184 | lastCommitRowNumber = currentProcessedRow; |
185 | if (!useStatelessSession) { |
186 | statefulSession.clear(); |
187 | } |
188 | } |
189 | |
190 | /* |
191 | * (non-Javadoc) |
192 | * |
193 | * @see org.springframework.batch.item.stream.ItemStreamAdapter#reset(org.springframework.batch.item.ExecutionContext) |
194 | */ |
195 | public void reset() { |
196 | currentProcessedRow = lastCommitRowNumber; |
197 | if (lastCommitRowNumber == 0) { |
198 | cursor.beforeFirst(); |
199 | } |
200 | else { |
201 | // Set the cursor so that next time it is advanced it will |
202 | // come back to the committed row. |
203 | cursor.setRowNumber(lastCommitRowNumber - 1); |
204 | } |
205 | } |
206 | |
207 | public void setSaveState(boolean saveState) { |
208 | this.saveState = saveState; |
209 | } |
210 | } |