1 | /* |
2 | * Copyright 2006-2008 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.item.database; |
17 | |
18 | import java.util.List; |
19 | |
20 | import org.apache.commons.logging.Log; |
21 | import org.apache.commons.logging.LogFactory; |
22 | import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; |
23 | import org.springframework.beans.factory.InitializingBean; |
24 | import org.springframework.util.Assert; |
25 | import org.springframework.util.ClassUtils; |
26 | |
27 | /** |
28 | * Abstract {@link org.springframework.batch.item.ItemReader} for to extend when |
29 | * reading database records in a paging fashion. |
30 | * |
31 | * <p> |
32 | * Implementations should execute queries using paged requests of a size |
33 | * specified in {@link #setPageSize(int)}. Additional pages are requested when |
34 | * needed as {@link #read()} method is called, returning an object corresponding |
35 | * to current position. |
36 | * </p> |
37 | * |
38 | * @author Thomas Risberg |
39 | * @author Dave Syer |
40 | * @since 2.0 |
41 | */ |
42 | public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements |
43 | InitializingBean { |
44 | |
45 | protected Log logger = LogFactory.getLog(getClass()); |
46 | |
47 | private volatile boolean initialized = false; |
48 | |
49 | private int pageSize = 10; |
50 | |
51 | private volatile int current = 0; |
52 | |
53 | private volatile int page = 0; |
54 | |
55 | protected volatile List<T> results; |
56 | |
57 | private Object lock = new Object(); |
58 | |
59 | public AbstractPagingItemReader() { |
60 | setName(ClassUtils.getShortName(AbstractPagingItemReader.class)); |
61 | } |
62 | |
63 | /** |
64 | * The current page number. |
65 | * @return the current page |
66 | */ |
67 | public int getPage() { |
68 | return page; |
69 | } |
70 | |
71 | /** |
72 | * The page size configured for this reader. |
73 | * @return the page size |
74 | */ |
75 | public int getPageSize() { |
76 | return pageSize; |
77 | } |
78 | |
79 | /** |
80 | * The number of rows to retrieve at a time. |
81 | * |
82 | * @param pageSize the number of rows to fetch per page |
83 | */ |
84 | public void setPageSize(int pageSize) { |
85 | this.pageSize = pageSize; |
86 | } |
87 | |
88 | /** |
89 | * Check mandatory properties. |
90 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
91 | */ |
92 | public void afterPropertiesSet() throws Exception { |
93 | Assert.isTrue(pageSize > 0, "pageSize must be greater than zero"); |
94 | } |
95 | |
96 | @Override |
97 | protected T doRead() throws Exception { |
98 | |
99 | synchronized (lock) { |
100 | |
101 | if (results == null || current >= pageSize) { |
102 | |
103 | if (logger.isDebugEnabled()) { |
104 | logger.debug("Reading page " + getPage()); |
105 | } |
106 | |
107 | doReadPage(); |
108 | page++; |
109 | if (current >= pageSize) { |
110 | current = 0; |
111 | } |
112 | |
113 | } |
114 | |
115 | int next = current++; |
116 | if (next < results.size()) { |
117 | return results.get(next); |
118 | } |
119 | else { |
120 | return null; |
121 | } |
122 | |
123 | } |
124 | |
125 | } |
126 | |
127 | abstract protected void doReadPage(); |
128 | |
129 | @Override |
130 | protected void doOpen() throws Exception { |
131 | |
132 | Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); |
133 | initialized = true; |
134 | |
135 | } |
136 | |
137 | @Override |
138 | protected void doClose() throws Exception { |
139 | |
140 | synchronized (lock) { |
141 | initialized = false; |
142 | current = 0; |
143 | page = 0; |
144 | results = null; |
145 | } |
146 | |
147 | } |
148 | |
149 | @Override |
150 | protected void jumpToItem(int itemIndex) throws Exception { |
151 | |
152 | synchronized (lock) { |
153 | page = itemIndex / pageSize; |
154 | current = itemIndex % pageSize; |
155 | } |
156 | |
157 | doJumpToPage(itemIndex); |
158 | |
159 | if (logger.isDebugEnabled()) { |
160 | logger.debug("Jumping to page " + getPage() + " and index " + current); |
161 | } |
162 | |
163 | } |
164 | |
165 | abstract protected void doJumpToPage(int itemIndex); |
166 | |
167 | } |