1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.item.database; |
17 | |
18 | import java.util.List; |
19 | |
20 | import org.apache.commons.logging.Log; |
21 | import org.apache.commons.logging.LogFactory; |
22 | import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; |
23 | import org.springframework.beans.factory.InitializingBean; |
24 | import org.springframework.util.Assert; |
25 | import org.springframework.util.ClassUtils; |
26 | |
27 | /** |
28 | * Abstract {@link org.springframework.batch.item.ItemStreamReader} for to extend when |
29 | * reading database records in a paging fashion. |
30 | * |
31 | * <p> |
32 | * Implementations should execute queries using paged requests of a size |
33 | * specified in {@link #setPageSize(int)}. Additional pages are requested when |
34 | * needed as {@link #read()} method is called, returning an object corresponding |
35 | * to current position. |
36 | * </p> |
37 | * |
38 | * @author Thomas Risberg |
39 | * @author Dave Syer |
40 | * @since 2.0 |
41 | */ |
42 | public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> |
43 | implements InitializingBean { |
44 | |
45 | protected Log logger = LogFactory.getLog(getClass()); |
46 | |
47 | private volatile boolean initialized = false; |
48 | |
49 | private int pageSize = 10; |
50 | |
51 | private volatile int current = 0; |
52 | |
53 | private volatile int page = 0; |
54 | |
55 | protected volatile List<T> results; |
56 | |
57 | private Object lock = new Object(); |
58 | |
59 | public AbstractPagingItemReader() { |
60 | setName(ClassUtils.getShortName(AbstractPagingItemReader.class)); |
61 | } |
62 | |
63 | /** |
64 | * The current page number. |
65 | * @return the current page |
66 | */ |
67 | public int getPage() { |
68 | return page; |
69 | } |
70 | |
71 | /** |
72 | * The page size configured for this reader. |
73 | * @return the page size |
74 | */ |
75 | public int getPageSize() { |
76 | return pageSize; |
77 | } |
78 | |
79 | /** |
80 | * The number of rows to retrieve at a time. |
81 | * |
82 | * @param pageSize the number of rows to fetch per page |
83 | */ |
84 | public void setPageSize(int pageSize) { |
85 | this.pageSize = pageSize; |
86 | } |
87 | |
88 | /** |
89 | * Check mandatory properties. |
90 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
91 | */ |
92 | @Override |
93 | public void afterPropertiesSet() throws Exception { |
94 | Assert.isTrue(pageSize > 0, "pageSize must be greater than zero"); |
95 | } |
96 | |
97 | @Override |
98 | protected T doRead() throws Exception { |
99 | |
100 | synchronized (lock) { |
101 | |
102 | if (results == null || current >= pageSize) { |
103 | |
104 | if (logger.isDebugEnabled()) { |
105 | logger.debug("Reading page " + getPage()); |
106 | } |
107 | |
108 | doReadPage(); |
109 | page++; |
110 | if (current >= pageSize) { |
111 | current = 0; |
112 | } |
113 | |
114 | } |
115 | |
116 | int next = current++; |
117 | if (next < results.size()) { |
118 | return results.get(next); |
119 | } |
120 | else { |
121 | return null; |
122 | } |
123 | |
124 | } |
125 | |
126 | } |
127 | |
128 | abstract protected void doReadPage(); |
129 | |
130 | @Override |
131 | protected void doOpen() throws Exception { |
132 | |
133 | Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); |
134 | initialized = true; |
135 | |
136 | } |
137 | |
138 | @Override |
139 | protected void doClose() throws Exception { |
140 | |
141 | synchronized (lock) { |
142 | initialized = false; |
143 | current = 0; |
144 | page = 0; |
145 | results = null; |
146 | } |
147 | |
148 | } |
149 | |
150 | @Override |
151 | protected void jumpToItem(int itemIndex) throws Exception { |
152 | |
153 | synchronized (lock) { |
154 | page = itemIndex / pageSize; |
155 | current = itemIndex % pageSize; |
156 | } |
157 | |
158 | doJumpToPage(itemIndex); |
159 | |
160 | if (logger.isDebugEnabled()) { |
161 | logger.debug("Jumping to page " + getPage() + " and index " + current); |
162 | } |
163 | |
164 | } |
165 | |
166 | abstract protected void doJumpToPage(int itemIndex); |
167 | |
168 | } |