View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.item.database;
17  
18  import java.util.List;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
23  import org.springframework.beans.factory.InitializingBean;
24  import org.springframework.util.Assert;
25  import org.springframework.util.ClassUtils;
26  
27  /**
28   * Abstract {@link org.springframework.batch.item.ItemReader} for to extend when
29   * reading database records in a paging fashion.
30   *
31   * <p>
32   * Implementations should execute queries using paged requests of a size
33   * specified in {@link #setPageSize(int)}. Additional pages are requested when
34   * needed as {@link #read()} method is called, returning an object corresponding
35   * to current position.
36   * </p>
37   *
38   * @author Thomas Risberg
39   * @author Dave Syer
40   * @since 2.0
41   */
42  public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
43  InitializingBean {
44  
45  	protected Log logger = LogFactory.getLog(getClass());
46  
47  	private volatile boolean initialized = false;
48  
49  	private int pageSize = 10;
50  
51  	private volatile int current = 0;
52  
53  	private volatile int page = 0;
54  
55  	protected volatile List<T> results;
56  
57  	private Object lock = new Object();
58  
59  	public AbstractPagingItemReader() {
60  		setName(ClassUtils.getShortName(AbstractPagingItemReader.class));
61  	}
62  
63  	/**
64  	 * The current page number.
65  	 * @return the current page
66  	 */
67  	public int getPage() {
68  		return page;
69  	}
70  
71  	/**
72  	 * The page size configured for this reader.
73  	 * @return the page size
74  	 */
75  	public int getPageSize() {
76  		return pageSize;
77  	}
78  
79  	/**
80  	 * The number of rows to retrieve at a time.
81  	 *
82  	 * @param pageSize the number of rows to fetch per page
83  	 */
84  	public void setPageSize(int pageSize) {
85  		this.pageSize = pageSize;
86  	}
87  
88  	/**
89  	 * Check mandatory properties.
90  	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
91  	 */
92  	@Override
93  	public void afterPropertiesSet() throws Exception {
94  		Assert.isTrue(pageSize > 0, "pageSize must be greater than zero");
95  	}
96  
97  	@Override
98  	protected T doRead() throws Exception {
99  
100 		synchronized (lock) {
101 
102 			if (results == null || current >= pageSize) {
103 
104 				if (logger.isDebugEnabled()) {
105 					logger.debug("Reading page " + getPage());
106 				}
107 
108 				doReadPage();
109 				page++;
110 				if (current >= pageSize) {
111 					current = 0;
112 				}
113 
114 			}
115 
116 			int next = current++;
117 			if (next < results.size()) {
118 				return results.get(next);
119 			}
120 			else {
121 				return null;
122 			}
123 
124 		}
125 
126 	}
127 
128 	abstract protected void doReadPage();
129 
130 	@Override
131 	protected void doOpen() throws Exception {
132 
133 		Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
134 		initialized = true;
135 
136 	}
137 
138 	@Override
139 	protected void doClose() throws Exception {
140 
141 		synchronized (lock) {
142 			initialized = false;
143 			current = 0;
144 			page = 0;
145 			results = null;
146 		}
147 
148 	}
149 
150 	@Override
151 	protected void jumpToItem(int itemIndex) throws Exception {
152 
153 		synchronized (lock) {
154 			page = itemIndex / pageSize;
155 			current = itemIndex % pageSize;
156 		}
157 
158 		doJumpToPage(itemIndex);
159 
160 		if (logger.isDebugEnabled()) {
161 			logger.debug("Jumping to page " + getPage() + " and index " + current);
162 		}
163 
164 	}
165 
166 	abstract protected void doJumpToPage(int itemIndex);
167 
168 }