1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.item.database; |
17 | |
18 | import java.util.Iterator; |
19 | import java.util.List; |
20 | |
21 | import org.springframework.batch.item.ExecutionContext; |
22 | import org.springframework.batch.item.ItemReader; |
23 | import org.springframework.batch.item.ItemStream; |
24 | import org.springframework.batch.item.NoWorkFoundException; |
25 | import org.springframework.beans.factory.InitializingBean; |
26 | import org.springframework.util.Assert; |
27 | |
28 | /** |
29 | * <p> |
30 | * Convenience class for driving query item readers. Item readers of this type |
31 | * use a 'driving query' to return back a list of keys. A key can be defined as |
32 | * anything that can uniquely identify a record so that a more detailed record |
33 | * can be retrieved for each object. This allows a much smaller footprint to be |
34 | * stored in memory for processing. The following 'Customer' example table will |
35 | * help illustrate this: |
36 | * |
37 | * <pre> |
38 | * CREATE TABLE CUSTOMER ( |
39 | * ID BIGINT IDENTITY PRIMARY KEY, |
40 | * NAME VARCHAR(45), |
41 | * CREDIT FLOAT |
42 | * ); |
43 | * </pre> |
44 | * |
45 | * <p> |
46 | * A cursor based solution would simply open up a cursor over ID, NAME, and |
47 | * CREDIT, and move it from one to the next. This can cause issues on databases |
48 | * with pessimistic locking strategies. A 'driving query' approach would be to |
49 | * return only the ID of the customer, then use a separate DAO to retrieve the |
50 | * name and credit for each ID. This means that there will be a call to a |
51 | * separate DAO for each call to {@link ItemReader#read()}. |
52 | * </p> |
53 | * |
54 | * <p> |
55 | * Mutability: Because this base class cannot guarantee that the keys returned |
56 | * by subclasses are immutable, care should be taken to not modify a key value. |
57 | * Doing so would cause issues if a rollback occurs. For example, if a call to |
58 | * read() is made, and the returned key is modified, a rollback will cause the |
59 | * next call to read() to return the same object that was originally returned, |
60 | * since there is no way to create a defensive copy, and re-querying the |
61 | * database for all the keys would be too resource intensive. |
62 | * </p> |
63 | * |
64 | * |
65 | * @author Lucas Ward |
66 | */ |
67 | public class DrivingQueryItemReader implements ItemReader, InitializingBean, ItemStream { |
68 | |
69 | private boolean initialized = false; |
70 | |
71 | private List keys; |
72 | |
73 | private Iterator keysIterator; |
74 | |
75 | private int currentIndex = 0; |
76 | |
77 | private int lastCommitIndex = 0; |
78 | |
79 | private KeyCollector keyGenerator; |
80 | |
81 | private boolean saveState = false; |
82 | |
83 | public DrivingQueryItemReader() { |
84 | |
85 | } |
86 | |
87 | /** |
88 | * Initialize the input source with the provided keys list. |
89 | * |
90 | * @param keys |
91 | */ |
92 | public DrivingQueryItemReader(List keys) { |
93 | this.keys = keys; |
94 | this.keysIterator = keys.iterator(); |
95 | } |
96 | |
97 | /** |
98 | * Return the next key in the List. |
99 | * |
100 | * @return next key in the list if not index is not at the last element, |
101 | * null otherwise. |
102 | */ |
103 | public Object read() { |
104 | |
105 | if (keysIterator.hasNext()) { |
106 | currentIndex++; |
107 | return keysIterator.next(); |
108 | } |
109 | |
110 | return null; |
111 | } |
112 | |
113 | /** |
114 | * Get the current key. This method will return the same object returned by |
115 | * the last read() method. If no items have been read yet the ItemReader |
116 | * yet, then null will be returned. |
117 | * |
118 | * @return the current key. |
119 | */ |
120 | protected Object getCurrentKey() { |
121 | if (initialized && currentIndex > 0) { |
122 | return keys.get(currentIndex - 1); |
123 | } |
124 | |
125 | return null; |
126 | } |
127 | |
128 | /** |
129 | * Close the resource by setting the list of keys to null, allowing them to |
130 | * be garbage collected. |
131 | */ |
132 | public void close(ExecutionContext executionContext) { |
133 | initialized = false; |
134 | currentIndex = 0; |
135 | lastCommitIndex = 0; |
136 | keys = null; |
137 | keysIterator = null; |
138 | } |
139 | |
140 | /** |
141 | * Initialize the item reader by delegating to the subclass in order to |
142 | * retrieve the keys. |
143 | * |
144 | * @throws IllegalStateException if the keys list is null or initialized is |
145 | * true. |
146 | */ |
147 | public void open(ExecutionContext executionContext) { |
148 | |
149 | Assert.state(keys == null && !initialized, "Cannot open an already opened input source" |
150 | + ", call close() first."); |
151 | keys = keyGenerator.retrieveKeys(executionContext); |
152 | if (keys == null || keys.size() == 0) { |
153 | throw new NoWorkFoundException("KeyGenerator must return at least 1 key"); |
154 | } |
155 | keysIterator = keys.listIterator(); |
156 | initialized = true; |
157 | } |
158 | |
159 | public void update(ExecutionContext executionContext) { |
160 | if (saveState) { |
161 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
162 | if (getCurrentKey() != null) { |
163 | keyGenerator.updateContext(getCurrentKey(), executionContext); |
164 | } |
165 | } |
166 | } |
167 | |
168 | public void afterPropertiesSet() throws Exception { |
169 | Assert.notNull(keyGenerator, "The KeyGenerator must not be null."); |
170 | } |
171 | |
172 | /** |
173 | * Set the key generation strategy to use for this input source. |
174 | * |
175 | * @param keyGenerator |
176 | */ |
177 | public void setKeyCollector(KeyCollector keyGenerator) { |
178 | this.keyGenerator = keyGenerator; |
179 | } |
180 | |
181 | /** |
182 | * Mark is supported as long as this {@link ItemStream} is used in a |
183 | * single-threaded environment. The state backing the mark is a single |
184 | * counter, keeping track of the current position, so multiple threads |
185 | * cannot be accommodated. |
186 | * |
187 | * @see org.springframework.batch.item.ItemReader#mark() |
188 | */ |
189 | public void mark() { |
190 | lastCommitIndex = currentIndex; |
191 | } |
192 | |
193 | /* |
194 | * (non-Javadoc) |
195 | * |
196 | * @see org.springframework.batch.io.support.AbstractTransactionalIoSource#reset(org.springframework.batch.item.ExecutionContext) |
197 | */ |
198 | public void reset() { |
199 | keysIterator = keys.listIterator(lastCommitIndex); |
200 | currentIndex = lastCommitIndex; |
201 | } |
202 | |
203 | public void setSaveState(boolean saveState) { |
204 | this.saveState = saveState; |
205 | } |
206 | } |