| 1 | /* |
| 2 | * Copyright 2012 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.springframework.batch.item.data; |
| 18 | |
| 19 | import java.util.ArrayList; |
| 20 | import java.util.Iterator; |
| 21 | import java.util.Map; |
| 22 | |
| 23 | import org.apache.commons.logging.Log; |
| 24 | import org.apache.commons.logging.LogFactory; |
| 25 | import org.springframework.batch.item.ItemReader; |
| 26 | import org.springframework.beans.factory.InitializingBean; |
| 27 | import org.springframework.data.neo4j.conversion.DefaultConverter; |
| 28 | import org.springframework.data.neo4j.conversion.Result; |
| 29 | import org.springframework.data.neo4j.conversion.ResultConverter; |
| 30 | import org.springframework.data.neo4j.template.Neo4jOperations; |
| 31 | import org.springframework.util.Assert; |
| 32 | import org.springframework.util.ClassUtils; |
| 33 | import org.springframework.util.StringUtils; |
| 34 | |
| 35 | /** |
| 36 | * <p> |
| 37 | * Restartable {@link ItemReader} that reads objects from the graph database Neo4j |
| 38 | * via a paging technique. |
| 39 | * </p> |
| 40 | * |
| 41 | * <p> |
| 42 | * It executes cypher queries built from the statement fragments provided to |
| 43 | * retrieve the requested data. The query is executed using paged requests of |
| 44 | * a size specified in {@link #setPageSize(int)}. Additional pages are requested |
| 45 | * as needed when the {@link #read()} method is called. On restart, the reader |
| 46 | * will begin again at the same number item it left off at. |
| 47 | * </p> |
| 48 | * |
| 49 | * <p> |
| 50 | * Performance is dependent on your Neo4J configuration (embedded or remote) as |
| 51 | * well as page size. Setting a fairly large page size and using a commit |
| 52 | * interval that matches the page size should provide better performance. |
| 53 | * </p> |
| 54 | * |
| 55 | * <p> |
| 56 | * This implementation is thread-safe between calls to |
| 57 | * {@link #open(org.springframework.batch.item.ExecutionContext)}, however you |
| 58 | * should set <code>saveState=false</code> if used in a multi-threaded |
| 59 | * environment (no restart available). |
| 60 | * </p> |
| 61 | * |
| 62 | * @author Michael Minella |
| 63 | * |
| 64 | */ |
| 65 | public class Neo4jItemReader<T> extends AbstractPaginatedDataItemReader<T> implements |
| 66 | InitializingBean { |
| 67 | |
| 68 | protected Log logger = LogFactory.getLog(getClass()); |
| 69 | |
| 70 | private Neo4jOperations template; |
| 71 | |
| 72 | private String startStatement; |
| 73 | private String returnStatement; |
| 74 | private String matchStatement; |
| 75 | private String whereStatement; |
| 76 | private String orderByStatement; |
| 77 | |
| 78 | private Class targetType; |
| 79 | |
| 80 | private Map<String, Object> parameterValues; |
| 81 | |
| 82 | private ResultConverter resultConverter; |
| 83 | |
| 84 | public Neo4jItemReader() { |
| 85 | setName(ClassUtils.getShortName(Neo4jItemReader.class)); |
| 86 | } |
| 87 | |
| 88 | /** |
| 89 | * The start segment of the cypher query. START is prepended |
| 90 | * to the statement provided and should <em>not</em> be |
| 91 | * included. |
| 92 | * |
| 93 | * @param startStatement the start fragment of the cypher query. |
| 94 | */ |
| 95 | public void setStartStatement(String startStatement) { |
| 96 | this.startStatement = startStatement; |
| 97 | } |
| 98 | |
| 99 | /** |
| 100 | * The return statement of the cypher query. RETURN is prepended |
| 101 | * to the statement provided and should <em>not</em> be |
| 102 | * included |
| 103 | * |
| 104 | * @param returnStatement the return fragment of the cypher query. |
| 105 | */ |
| 106 | public void setReturnStatement(String returnStatement) { |
| 107 | this.returnStatement = returnStatement; |
| 108 | } |
| 109 | |
| 110 | /** |
| 111 | * An optional match fragment of the cypher query. MATCH is |
| 112 | * prepended to the statement provided and should <em>not</em> |
| 113 | * be included. |
| 114 | * |
| 115 | * @param matchStatement the match fragment of the cypher query |
| 116 | */ |
| 117 | public void setMatchStatement(String matchStatement) { |
| 118 | this.matchStatement = matchStatement; |
| 119 | } |
| 120 | |
| 121 | /** |
| 122 | * An optional where fragement of the cypher query. WHERE is |
| 123 | * prepended to the statement provided and should <em>not</em> |
| 124 | * be included. |
| 125 | * |
| 126 | * @param whereStatement where fragment of the cypher query |
| 127 | */ |
| 128 | public void setWhereStatement(String whereStatement) { |
| 129 | this.whereStatement = whereStatement; |
| 130 | } |
| 131 | |
| 132 | /** |
| 133 | * A list of properties to order the results by. This is |
| 134 | * required so that subsequent page requests pull back the |
| 135 | * segment of results correctly. ORDER BY is prepended to |
| 136 | * the statement provided and should <em>not</em> be included. |
| 137 | * |
| 138 | * @param orderByStatement order by fragment of the cypher query. |
| 139 | */ |
| 140 | public void setOrderByStatement(String orderByStatement) { |
| 141 | this.orderByStatement = orderByStatement; |
| 142 | } |
| 143 | |
| 144 | /** |
| 145 | * Used to perform operations against the Neo4J database. |
| 146 | * |
| 147 | * @param template the Neo4jOperations instance to use |
| 148 | * @see Neo4jOperations |
| 149 | */ |
| 150 | public void setTemplate(Neo4jOperations template) { |
| 151 | this.template = template; |
| 152 | } |
| 153 | |
| 154 | /** |
| 155 | * The object type to be returned from each call to {@link #read()} |
| 156 | * |
| 157 | * @param targetType the type of object to return. |
| 158 | */ |
| 159 | public void setTargetType(Class targetType) { |
| 160 | this.targetType = targetType; |
| 161 | } |
| 162 | |
| 163 | /** |
| 164 | * Set the converter used to convert node to the targetType. By |
| 165 | * default, {@link DefaultConverter} is used. |
| 166 | * |
| 167 | * @param resultConverter the converter to use. |
| 168 | */ |
| 169 | public void setResultConverter(ResultConverter resultConverter) { |
| 170 | this.resultConverter = resultConverter; |
| 171 | } |
| 172 | |
| 173 | @Override |
| 174 | @SuppressWarnings({"unchecked", "rawtypes"}) |
| 175 | protected Iterator<T> doPageRead() { |
| 176 | Result<Map<String, Object>> queryResults = template.query( |
| 177 | generateLimitCypherQuery(), parameterValues); |
| 178 | |
| 179 | if(queryResults != null) { |
| 180 | if (resultConverter != null) { |
| 181 | return queryResults.to(targetType, resultConverter).iterator(); |
| 182 | } |
| 183 | else { |
| 184 | return queryResults.to(targetType).iterator(); |
| 185 | } |
| 186 | } |
| 187 | else { |
| 188 | return new ArrayList().iterator(); |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | private String generateLimitCypherQuery() { |
| 193 | StringBuilder query = new StringBuilder(); |
| 194 | |
| 195 | query.append("START ").append(startStatement); |
| 196 | query.append(matchStatement != null ? " MATCH " + matchStatement : ""); |
| 197 | query.append(whereStatement != null ? " WHERE " + whereStatement : ""); |
| 198 | query.append(" RETURN ").append(returnStatement); |
| 199 | query.append(" ORDER BY ").append(orderByStatement); |
| 200 | query.append(" SKIP " + (pageSize * page)); |
| 201 | query.append(" LIMIT " + pageSize); |
| 202 | |
| 203 | String resultingQuery = query.toString(); |
| 204 | |
| 205 | if (logger.isDebugEnabled()) { |
| 206 | logger.debug(resultingQuery); |
| 207 | } |
| 208 | |
| 209 | return resultingQuery; |
| 210 | } |
| 211 | |
| 212 | /** |
| 213 | * Checks mandatory properties |
| 214 | * |
| 215 | * @see InitializingBean#afterPropertiesSet() |
| 216 | */ |
| 217 | @Override |
| 218 | public void afterPropertiesSet() throws Exception { |
| 219 | Assert.state(template != null, "A Neo4JOperations implementation is required"); |
| 220 | Assert.state(targetType != null, "The type to be returned is required"); |
| 221 | Assert.state(StringUtils.hasText(startStatement), "A START statement is required"); |
| 222 | Assert.state(StringUtils.hasText(returnStatement), "A RETURN statement is required"); |
| 223 | Assert.state(StringUtils.hasText(orderByStatement), "A ORDER BY statement is required"); |
| 224 | } |
| 225 | } |