View Javadoc

1   /*
2    * Copyright 2006-2008 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.concurrent.CopyOnWriteArrayList;
22  
23  import javax.persistence.EntityManager;
24  import javax.persistence.EntityManagerFactory;
25  import javax.persistence.EntityTransaction;
26  import javax.persistence.Query;
27  
28  import org.springframework.batch.item.ExecutionContext;
29  import org.springframework.batch.item.database.orm.JpaQueryProvider;
30  import org.springframework.dao.DataAccessResourceFailureException;
31  import org.springframework.util.Assert;
32  import org.springframework.util.ClassUtils;
33  
34  /**
35   * <p>
36   * {@link org.springframework.batch.item.ItemReader} for reading database
37   * records built on top of JPA.
38   * </p>
39   * 
40   * <p>
41   * It executes the JPQL {@link #setQueryString(String)} to retrieve requested
42   * data. The query is executed using paged requests of a size specified in
43   * {@link #setPageSize(int)}. Additional pages are requested when needed as
44   * {@link #read()} method is called, returning an object corresponding to
45   * current position.
46   * </p>
47   * 
48   * <p>
49   * The performance of the paging depends on the JPA implementation and its use
50   * of database specific features to limit the number of returned rows.
51   * </p>
52   * 
53   * <p>
54   * Setting a fairly large page size and using a commit interval that matches the
55   * page size should provide better performance.
56   * </p>
57   * 
58   * <p>
59   * In order to reduce the memory usage for large results the persistence context
60   * is flushed and cleared after each page is read. This causes any entities read
61   * to be detached. If you make changes to the entities and want the changes
62   * persisted then you must explicitly merge the entities.
63   * </p>
64   * 
65   * <p>
66   * The reader must be configured with an
67   * {@link javax.persistence.EntityManagerFactory}. All entity access is
68   * performed within a new transaction, independent of any existing Spring
69   * managed transactions.
70   * </p>
71   * 
72   * <p>
73   * The implementation is thread-safe in between calls to
74   * {@link #open(ExecutionContext)}, but remember to use
75   * <code>saveState=false</code> if used in a multi-threaded client (no restart
76   * available).
77   * </p>
78   * 
79   * 
80   * @author Thomas Risberg
81   * @author Dave Syer
82   * @since 2.0
83   */
84  public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> {
85  
86  	private EntityManagerFactory entityManagerFactory;
87  
88  	private EntityManager entityManager;
89  
90  	private final Map<String, Object> jpaPropertyMap = new HashMap<String, Object>();
91  
92  	private String queryString;
93  
94  	private JpaQueryProvider queryProvider;
95  
96  	private Map<String, Object> parameterValues;
97  
98  	public JpaPagingItemReader() {
99  		setName(ClassUtils.getShortName(JpaPagingItemReader.class));
100 	}
101 
102 	/**
103 	 * Create a query using an appropriate query provider (entityManager OR
104 	 * queryProvider).
105 	 */
106 	private Query createQuery() {
107 		if (queryProvider == null) {
108 			return entityManager.createQuery(queryString);
109 		}
110 		else {
111 			return queryProvider.createQuery();
112 		}
113 	}
114 
115 	public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
116 		this.entityManagerFactory = entityManagerFactory;
117 	}
118 
119 	/**
120 	 * The parameter values to be used for the query execution.
121 	 * 
122 	 * @param parameterValues the values keyed by the parameter named used in
123 	 * the query string.
124 	 */
125 	public void setParameterValues(Map<String, Object> parameterValues) {
126 		this.parameterValues = parameterValues;
127 	}
128 
129 	public void afterPropertiesSet() throws Exception {
130 		super.afterPropertiesSet();
131 
132 		if (queryProvider == null) {
133 			Assert.notNull(entityManagerFactory);
134 			Assert.hasLength(queryString);
135 		}
136 		// making sure that the appropriate (JPA) query provider is set
137 		else {
138 			Assert.isTrue(queryProvider != null, "JPA query provider must be set");
139 		}
140 	}
141 
142 	/**
143 	 * @param queryString JPQL query string
144 	 */
145 	public void setQueryString(String queryString) {
146 		this.queryString = queryString;
147 	}
148 
149 	/**
150 	 * @param queryProvider JPA query provider
151 	 */
152 	public void setQueryProvider(JpaQueryProvider queryProvider) {
153 		this.queryProvider = queryProvider;
154 	}
155 
156 	@Override
157 	protected void doOpen() throws Exception {
158 		super.doOpen();
159 
160 		entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
161 		if (entityManager == null) {
162 			throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
163 		}
164 		// set entityManager to queryProvider, so it participates
165 		// in JpaPagingItemReader's managed transaction
166 		if (queryProvider != null) {
167 			queryProvider.setEntityManager(entityManager);
168 		}
169 
170 	}
171 
172 	@Override
173 	@SuppressWarnings("unchecked")
174 	protected void doReadPage() {
175 
176 		EntityTransaction tx = entityManager.getTransaction();
177 		tx.begin();
178 
179 		entityManager.flush();
180 		entityManager.clear();
181 
182 		Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
183 
184 		if (parameterValues != null) {
185 			for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
186 				query.setParameter(me.getKey(), me.getValue());
187 			}
188 		}
189 
190 		if (results == null) {
191 			results = new CopyOnWriteArrayList<T>();
192 		}
193 		else {
194 			results.clear();
195 		}
196 		results.addAll(query.getResultList());
197 
198 		tx.commit();
199 	}
200 
201 	@Override
202 	protected void doJumpToPage(int itemIndex) {
203 	}
204 
205 	@Override
206 	protected void doClose() throws Exception {
207 		entityManager.close();
208 		super.doClose();
209 	}
210 
211 }