1 | /* |
2 | * Copyright 2012 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.data; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.List; |
21 | |
22 | import org.springframework.batch.item.ItemWriter; |
23 | import org.springframework.beans.factory.InitializingBean; |
24 | import org.springframework.data.mongodb.core.MongoOperations; |
25 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
26 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
27 | import org.springframework.util.Assert; |
28 | import org.springframework.util.CollectionUtils; |
29 | import org.springframework.util.StringUtils; |
30 | |
31 | /** |
32 | * <p> |
33 | * A {@link ItemWriter} implementation that writes to a MongoDB store using an implementation of Spring Data's |
34 | * {@link MongoOperations}. Since MongoDB is not a transactional store, a best effort is made to persist |
35 | * written data at the last moment, yet still honor job status contracts. No attempt to roll back is made |
36 | * if an error occurs during writing. |
37 | * </p> |
38 | * |
39 | * <p> |
40 | * This writer is thread safe once all properties are set (normal singleton behavior) so it can be used in multiple |
41 | * concurrent transactions. |
42 | * </p> |
43 | * |
44 | * @author Michael Minella |
45 | * |
46 | */ |
47 | public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean { |
48 | |
49 | private static final String BUFFER_KEY_PREFIX = MongoItemWriter.class.getName() + ".BUFFER_KEY"; |
50 | private MongoOperations template; |
51 | private final String bufferKey; |
52 | private String collection; |
53 | private boolean delete = false; |
54 | |
55 | public MongoItemWriter() { |
56 | super(); |
57 | this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode(); |
58 | } |
59 | |
60 | /** |
61 | * Indicates if the items being passed to the writer are to be saved or |
62 | * removed from the data store. If set to false (default), the items will |
63 | * be saved. If set to true, the items will be removed. |
64 | * |
65 | * @param delete removal indicator |
66 | */ |
67 | public void setDelete(boolean delete) { |
68 | this.delete = delete; |
69 | } |
70 | |
71 | /** |
72 | * Set the {@link MongoOperations} to be used to save items to be written. |
73 | * |
74 | * @param template the template implementation to be used. |
75 | */ |
76 | public void setTemplate(MongoOperations template) { |
77 | this.template = template; |
78 | } |
79 | |
80 | /** |
81 | * Set the name of the Mongo collection to be written to. |
82 | * |
83 | * @param collection the name of the collection. |
84 | */ |
85 | public void setCollection(String collection) { |
86 | this.collection = collection; |
87 | } |
88 | |
89 | /** |
90 | * If a transaction is active, buffer items to be written just before commit. |
91 | * Otherwise write items using the provided template. |
92 | * |
93 | * @see org.springframework.batch.item.ItemWriter#write(List) |
94 | */ |
95 | public void write(List<? extends T> items) throws Exception { |
96 | if(!transactionActive()) { |
97 | doWrite(items); |
98 | return; |
99 | } |
100 | |
101 | List bufferedItems = getCurrentBuffer(); |
102 | bufferedItems.addAll(items); |
103 | } |
104 | |
105 | /** |
106 | * Performs the actual write to the store via the template. |
107 | * This can be overridden by a subclass if necessary. |
108 | * |
109 | * @param items the list of items to be persisted. |
110 | */ |
111 | protected void doWrite(List<? extends T> items) { |
112 | if(! CollectionUtils.isEmpty(items)) { |
113 | if(delete) { |
114 | if(StringUtils.hasText(collection)) { |
115 | for (Object object : items) { |
116 | template.remove(object, collection); |
117 | } |
118 | } |
119 | else { |
120 | for (Object object : items) { |
121 | template.remove(object); |
122 | } |
123 | } |
124 | } |
125 | else { |
126 | if(StringUtils.hasText(collection)) { |
127 | for (Object object : items) { |
128 | template.save(object, collection); |
129 | } |
130 | } |
131 | else { |
132 | for (Object object : items) { |
133 | template.save(object); |
134 | } |
135 | } |
136 | } |
137 | } |
138 | } |
139 | |
140 | private boolean transactionActive() { |
141 | return TransactionSynchronizationManager.isActualTransactionActive(); |
142 | } |
143 | |
144 | private List<? extends T> getCurrentBuffer() { |
145 | if(!TransactionSynchronizationManager.hasResource(bufferKey)) { |
146 | TransactionSynchronizationManager.bindResource(bufferKey, new ArrayList()); |
147 | |
148 | TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { |
149 | @Override |
150 | public void beforeCommit(boolean readOnly) { |
151 | List items = (List) TransactionSynchronizationManager.getResource(bufferKey); |
152 | |
153 | if(!CollectionUtils.isEmpty(items)) { |
154 | if(!readOnly) { |
155 | doWrite(items); |
156 | } |
157 | } |
158 | } |
159 | |
160 | @Override |
161 | public void afterCompletion(int status) { |
162 | if(TransactionSynchronizationManager.hasResource(bufferKey)) { |
163 | TransactionSynchronizationManager.unbindResource(bufferKey); |
164 | } |
165 | } |
166 | }); |
167 | } |
168 | |
169 | return (List) TransactionSynchronizationManager.getResource(bufferKey); |
170 | } |
171 | |
172 | public void afterPropertiesSet() throws Exception { |
173 | Assert.state(template != null, "A MongoOperations implementation is required."); |
174 | } |
175 | } |