1 | /* |
2 | * Copyright 2012 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.data; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Iterator; |
21 | import java.util.Map; |
22 | |
23 | import org.apache.commons.logging.Log; |
24 | import org.apache.commons.logging.LogFactory; |
25 | import org.springframework.batch.item.ItemReader; |
26 | import org.springframework.beans.factory.InitializingBean; |
27 | import org.springframework.data.neo4j.conversion.DefaultConverter; |
28 | import org.springframework.data.neo4j.conversion.Result; |
29 | import org.springframework.data.neo4j.conversion.ResultConverter; |
30 | import org.springframework.data.neo4j.template.Neo4jOperations; |
31 | import org.springframework.util.Assert; |
32 | import org.springframework.util.ClassUtils; |
33 | import org.springframework.util.StringUtils; |
34 | |
35 | /** |
36 | * <p> |
37 | * Restartable {@link ItemReader} that reads objects from the graph database Neo4j |
38 | * via a paging technique. |
39 | * </p> |
40 | * |
41 | * <p> |
42 | * It executes cypher queries built from the statement fragments provided to |
43 | * retrieve the requested data. The query is executed using paged requests of |
44 | * a size specified in {@link #setPageSize(int)}. Additional pages are requested |
45 | * as needed when the {@link #read()} method is called. On restart, the reader |
46 | * will begin again at the same number item it left off at. |
47 | * </p> |
48 | * |
49 | * <p> |
50 | * Performance is dependent on your Neo4J configuration (embedded or remote) as |
51 | * well as page size. Setting a fairly large page size and using a commit |
52 | * interval that matches the page size should provide better performance. |
53 | * </p> |
54 | * |
55 | * <p> |
56 | * This implementation is thread-safe between calls to |
57 | * {@link #open(org.springframework.batch.item.ExecutionContext)}, however you |
58 | * should set <code>saveState=false</code> if used in a multi-threaded |
59 | * environment (no restart available). |
60 | * </p> |
61 | * |
62 | * @author Michael Minella |
63 | * |
64 | */ |
65 | public class Neo4jItemReader<T> extends AbstractPaginatedDataItemReader<T> implements |
66 | InitializingBean { |
67 | |
68 | protected Log logger = LogFactory.getLog(getClass()); |
69 | |
70 | private Neo4jOperations template; |
71 | |
72 | private String startStatement; |
73 | private String returnStatement; |
74 | private String matchStatement; |
75 | private String whereStatement; |
76 | private String orderByStatement; |
77 | |
78 | private Class targetType; |
79 | |
80 | private Map<String, Object> parameterValues; |
81 | |
82 | private ResultConverter resultConverter; |
83 | |
84 | public Neo4jItemReader() { |
85 | setName(ClassUtils.getShortName(Neo4jItemReader.class)); |
86 | } |
87 | |
88 | /** |
89 | * The start segment of the cypher query. START is prepended |
90 | * to the statement provided and should <em>not</em> be |
91 | * included. |
92 | * |
93 | * @param startStatement the start fragment of the cypher query. |
94 | */ |
95 | public void setStartStatement(String startStatement) { |
96 | this.startStatement = startStatement; |
97 | } |
98 | |
99 | /** |
100 | * The return statement of the cypher query. RETURN is prepended |
101 | * to the statement provided and should <em>not</em> be |
102 | * included |
103 | * |
104 | * @param returnStatement the return fragment of the cypher query. |
105 | */ |
106 | public void setReturnStatement(String returnStatement) { |
107 | this.returnStatement = returnStatement; |
108 | } |
109 | |
110 | /** |
111 | * An optional match fragment of the cypher query. MATCH is |
112 | * prepended to the statement provided and should <em>not</em> |
113 | * be included. |
114 | * |
115 | * @param matchStatement the match fragment of the cypher query |
116 | */ |
117 | public void setMatchStatement(String matchStatement) { |
118 | this.matchStatement = matchStatement; |
119 | } |
120 | |
121 | /** |
122 | * An optional where fragement of the cypher query. WHERE is |
123 | * prepended to the statement provided and should <em>not</em> |
124 | * be included. |
125 | * |
126 | * @param whereStatement where fragment of the cypher query |
127 | */ |
128 | public void setWhereStatement(String whereStatement) { |
129 | this.whereStatement = whereStatement; |
130 | } |
131 | |
132 | /** |
133 | * A list of properties to order the results by. This is |
134 | * required so that subsequent page requests pull back the |
135 | * segment of results correctly. ORDER BY is prepended to |
136 | * the statement provided and should <em>not</em> be included. |
137 | * |
138 | * @param orderByStatement order by fragment of the cypher query. |
139 | */ |
140 | public void setOrderByStatement(String orderByStatement) { |
141 | this.orderByStatement = orderByStatement; |
142 | } |
143 | |
144 | /** |
145 | * Used to perform operations against the Neo4J database. |
146 | * |
147 | * @param template the Neo4jOperations instance to use |
148 | * @see Neo4jOperations |
149 | */ |
150 | public void setTemplate(Neo4jOperations template) { |
151 | this.template = template; |
152 | } |
153 | |
154 | /** |
155 | * The object type to be returned from each call to {@link #read()} |
156 | * |
157 | * @param targetType the type of object to return. |
158 | */ |
159 | public void setTargetType(Class targetType) { |
160 | this.targetType = targetType; |
161 | } |
162 | |
163 | /** |
164 | * Set the converter used to convert node to the targetType. By |
165 | * default, {@link DefaultConverter} is used. |
166 | * |
167 | * @param resultConverter the converter to use. |
168 | */ |
169 | public void setResultConverter(ResultConverter resultConverter) { |
170 | this.resultConverter = resultConverter; |
171 | } |
172 | |
173 | @Override |
174 | @SuppressWarnings({"unchecked", "rawtypes"}) |
175 | protected Iterator<T> doPageRead() { |
176 | Result<Map<String, Object>> queryResults = template.query( |
177 | generateLimitCypherQuery(), parameterValues); |
178 | |
179 | if(queryResults != null) { |
180 | if (resultConverter != null) { |
181 | return queryResults.to(targetType, resultConverter).iterator(); |
182 | } |
183 | else { |
184 | return queryResults.to(targetType).iterator(); |
185 | } |
186 | } |
187 | else { |
188 | return new ArrayList().iterator(); |
189 | } |
190 | } |
191 | |
192 | private String generateLimitCypherQuery() { |
193 | StringBuilder query = new StringBuilder(); |
194 | |
195 | query.append("START ").append(startStatement); |
196 | query.append(matchStatement != null ? " MATCH " + matchStatement : ""); |
197 | query.append(whereStatement != null ? " WHERE " + whereStatement : ""); |
198 | query.append(" RETURN ").append(returnStatement); |
199 | query.append(" ORDER BY ").append(orderByStatement); |
200 | query.append(" SKIP " + (pageSize * page)); |
201 | query.append(" LIMIT " + pageSize); |
202 | |
203 | String resultingQuery = query.toString(); |
204 | |
205 | if (logger.isDebugEnabled()) { |
206 | logger.debug(resultingQuery); |
207 | } |
208 | |
209 | return resultingQuery; |
210 | } |
211 | |
212 | /** |
213 | * Checks mandatory properties |
214 | * |
215 | * @see InitializingBean#afterPropertiesSet() |
216 | */ |
217 | @Override |
218 | public void afterPropertiesSet() throws Exception { |
219 | Assert.state(template != null, "A Neo4JOperations implementation is required"); |
220 | Assert.state(targetType != null, "The type to be returned is required"); |
221 | Assert.state(StringUtils.hasText(startStatement), "A START statement is required"); |
222 | Assert.state(StringUtils.hasText(returnStatement), "A RETURN statement is required"); |
223 | Assert.state(StringUtils.hasText(orderByStatement), "A ORDER BY statement is required"); |
224 | } |
225 | } |