View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.concurrent.CopyOnWriteArrayList;
22  
23  import javax.persistence.EntityManager;
24  import javax.persistence.EntityManagerFactory;
25  import javax.persistence.EntityTransaction;
26  import javax.persistence.Query;
27  
28  import org.springframework.batch.item.ExecutionContext;
29  import org.springframework.batch.item.database.orm.JpaQueryProvider;
30  import org.springframework.dao.DataAccessResourceFailureException;
31  import org.springframework.util.Assert;
32  import org.springframework.util.ClassUtils;
33  
34  /**
35   * <p>
36   * {@link org.springframework.batch.item.ItemReader} for reading database
37   * records built on top of JPA.
38   * </p>
39   *
40   * <p>
41   * It executes the JPQL {@link #setQueryString(String)} to retrieve requested
42   * data. The query is executed using paged requests of a size specified in
43   * {@link #setPageSize(int)}. Additional pages are requested when needed as
44   * {@link #read()} method is called, returning an object corresponding to
45   * current position.
46   * </p>
47   *
48   * <p>
49   * The performance of the paging depends on the JPA implementation and its use
50   * of database specific features to limit the number of returned rows.
51   * </p>
52   *
53   * <p>
54   * Setting a fairly large page size and using a commit interval that matches the
55   * page size should provide better performance.
56   * </p>
57   *
58   * <p>
59   * In order to reduce the memory usage for large results the persistence context
60   * is flushed and cleared after each page is read. This causes any entities read
61   * to be detached. If you make changes to the entities and want the changes
62   * persisted then you must explicitly merge the entities.
63   * </p>
64   *
65   * <p>
66   * The reader must be configured with an
67   * {@link javax.persistence.EntityManagerFactory}. All entity access is
68   * performed within a new transaction, independent of any existing Spring
69   * managed transactions.
70   * </p>
71   *
72   * <p>
73   * The implementation is thread-safe in between calls to
74   * {@link #open(ExecutionContext)}, but remember to use
75   * <code>saveState=false</code> if used in a multi-threaded client (no restart
76   * available).
77   * </p>
78   *
79   *
80   * @author Thomas Risberg
81   * @author Dave Syer
82   * @since 2.0
83   */
84  public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> {
85  
86  	private EntityManagerFactory entityManagerFactory;
87  
88  	private EntityManager entityManager;
89  
90  	private final Map<String, Object> jpaPropertyMap = new HashMap<String, Object>();
91  
92  	private String queryString;
93  
94  	private JpaQueryProvider queryProvider;
95  
96  	private Map<String, Object> parameterValues;
97  
98  	public JpaPagingItemReader() {
99  		setName(ClassUtils.getShortName(JpaPagingItemReader.class));
100 	}
101 
102 	/**
103 	 * Create a query using an appropriate query provider (entityManager OR
104 	 * queryProvider).
105 	 */
106 	private Query createQuery() {
107 		if (queryProvider == null) {
108 			return entityManager.createQuery(queryString);
109 		}
110 		else {
111 			return queryProvider.createQuery();
112 		}
113 	}
114 
115 	public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
116 		this.entityManagerFactory = entityManagerFactory;
117 	}
118 
119 	/**
120 	 * The parameter values to be used for the query execution.
121 	 *
122 	 * @param parameterValues the values keyed by the parameter named used in
123 	 * the query string.
124 	 */
125 	public void setParameterValues(Map<String, Object> parameterValues) {
126 		this.parameterValues = parameterValues;
127 	}
128 
129 	@Override
130 	public void afterPropertiesSet() throws Exception {
131 		super.afterPropertiesSet();
132 
133 		if (queryProvider == null) {
134 			Assert.notNull(entityManagerFactory);
135 			Assert.hasLength(queryString);
136 		}
137 		// making sure that the appropriate (JPA) query provider is set
138 		else {
139 			Assert.isTrue(queryProvider != null, "JPA query provider must be set");
140 		}
141 	}
142 
143 	/**
144 	 * @param queryString JPQL query string
145 	 */
146 	public void setQueryString(String queryString) {
147 		this.queryString = queryString;
148 	}
149 
150 	/**
151 	 * @param queryProvider JPA query provider
152 	 */
153 	public void setQueryProvider(JpaQueryProvider queryProvider) {
154 		this.queryProvider = queryProvider;
155 	}
156 
157 	@Override
158 	protected void doOpen() throws Exception {
159 		super.doOpen();
160 
161 		entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
162 		if (entityManager == null) {
163 			throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
164 		}
165 		// set entityManager to queryProvider, so it participates
166 		// in JpaPagingItemReader's managed transaction
167 		if (queryProvider != null) {
168 			queryProvider.setEntityManager(entityManager);
169 		}
170 
171 	}
172 
173 	@Override
174 	@SuppressWarnings("unchecked")
175 	protected void doReadPage() {
176 
177 		EntityTransaction tx = entityManager.getTransaction();
178 		tx.begin();
179 
180 		entityManager.flush();
181 		entityManager.clear();
182 
183 		Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
184 
185 		if (parameterValues != null) {
186 			for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
187 				query.setParameter(me.getKey(), me.getValue());
188 			}
189 		}
190 
191 		if (results == null) {
192 			results = new CopyOnWriteArrayList<T>();
193 		}
194 		else {
195 			results.clear();
196 		}
197 		results.addAll(query.getResultList());
198 
199 		tx.commit();
200 	}
201 
202 	@Override
203 	protected void doJumpToPage(int itemIndex) {
204 	}
205 
206 	@Override
207 	protected void doClose() throws Exception {
208 		entityManager.close();
209 		super.doClose();
210 	}
211 
212 }