View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.item;
18  
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Iterator;
22  import java.util.List;
23  
24  import org.springframework.classify.Classifier;
25  import org.springframework.retry.ExhaustedRetryException;
26  import org.springframework.retry.RecoveryCallback;
27  import org.springframework.retry.RetryCallback;
28  import org.springframework.retry.RetryContext;
29  import org.springframework.retry.RetryListener;
30  import org.springframework.retry.RetryOperations;
31  import org.springframework.retry.RetryPolicy;
32  import org.springframework.retry.RetryState;
33  import org.springframework.retry.backoff.BackOffPolicy;
34  import org.springframework.retry.context.RetryContextSupport;
35  import org.springframework.retry.policy.RetryContextCache;
36  import org.springframework.retry.support.DefaultRetryState;
37  import org.springframework.retry.support.RetrySynchronizationManager;
38  import org.springframework.retry.support.RetryTemplate;
39  
40  /**
41   * A special purpose retry template that deals specifically with multi-valued
42   * stateful retry. This is useful in the case where the operation to be retried
43   * operates on multiple items, and when it fails there is no way to decide which
44   * (if any) of the items was responsible. The {@link RetryState} used in the
45   * execute methods is composite, and when a failure occurs, all of the keys in
46   * the composite are "tarred with the same brush". Subsequent attempts to
47   * execute with any of the keys that have failed previously results in a new
48   * attempt and the previous state is used to check the {@link RetryPolicy}. If
49   * one of the failed items eventually succeeds then the others in the current
50   * composite for that attempt will be cleared from the context cache (as
51   * normal), but there may still be entries in the cache for the original failed
52   * items. This might mean that an item that did not cause a failure is never
53   * retried because other items in the same batch fail fatally first.
54   *
55   * @author Dave Syer
56   *
57   */
58  public class BatchRetryTemplate implements RetryOperations {
59  
60  	private class BatchRetryState extends DefaultRetryState {
61  
62  		private final Collection<RetryState> keys;
63  
64  		public BatchRetryState(Collection<RetryState> keys) {
65  			super(keys);
66  			this.keys = new ArrayList<RetryState>(keys);
67  		}
68  
69  	}
70  
71  	@SuppressWarnings("serial")
72  	private static class BatchRetryContext extends RetryContextSupport {
73  
74  		private final Collection<RetryContext> contexts;
75  
76  		public BatchRetryContext(RetryContext parent, Collection<RetryContext> contexts) {
77  
78  			super(parent);
79  
80  			this.contexts = contexts;
81  			int count = 0;
82  
83  			for (RetryContext context : contexts) {
84  				int retryCount = context.getRetryCount();
85  				if (retryCount > count) {
86  					count = retryCount;
87  					registerThrowable(context.getLastThrowable());
88  				}
89  			}
90  
91  		}
92  
93  	}
94  
95  	private static class InnerRetryTemplate extends RetryTemplate {
96  
97  		@Override
98  		protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
99  
100 			BatchRetryContext batchContext = (BatchRetryContext) context;
101 
102 			for (RetryContext nextContext : batchContext.contexts) {
103 				if (!super.canRetry(retryPolicy, nextContext)) {
104 					return false;
105 				}
106 			}
107 
108 			return true;
109 
110 		}
111 
112 		@Override
113 		protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
114 
115 			BatchRetryState batchState = (BatchRetryState) state;
116 
117 			Collection<RetryContext> contexts = new ArrayList<RetryContext>();
118 			for (RetryState retryState : batchState.keys) {
119 				contexts.add(super.open(retryPolicy, retryState));
120 			}
121 
122 			return new BatchRetryContext(RetrySynchronizationManager.getContext(), contexts);
123 
124 		}
125 
126 		@Override
127 		protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, RetryContext context, Throwable e) {
128 
129 			BatchRetryState batchState = (BatchRetryState) state;
130 			BatchRetryContext batchContext = (BatchRetryContext) context;
131 
132 			Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
133 			for (RetryState retryState : batchState.keys) {
134 				RetryContext nextContext = contextIterator.next();
135 				super.registerThrowable(retryPolicy, retryState, nextContext, e);
136 			}
137 
138 		}
139 
140 		@Override
141 		protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state, boolean succeeded) {
142 
143 			BatchRetryState batchState = (BatchRetryState) state;
144 			BatchRetryContext batchContext = (BatchRetryContext) context;
145 
146 			Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
147 			for (RetryState retryState : batchState.keys) {
148 				RetryContext nextContext = contextIterator.next();
149 				super.close(retryPolicy, nextContext, retryState, succeeded);
150 			}
151 
152 		}
153 
154 		@Override
155 		protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context,
156 				RetryState state) throws Exception {
157 
158 			BatchRetryState batchState = (BatchRetryState) state;
159 			BatchRetryContext batchContext = (BatchRetryContext) context;
160 
161 			// Accumulate exceptions to be thrown so all the keys get a crack
162 			Exception rethrowable = null;
163 			ExhaustedRetryException exhausted = null;
164 
165 			Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
166 			for (RetryState retryState : batchState.keys) {
167 
168 				RetryContext nextContext = contextIterator.next();
169 
170 				try {
171 					super.handleRetryExhausted(null, nextContext, retryState);
172 				}
173 				catch (ExhaustedRetryException e) {
174 					exhausted = e;
175 				}
176 				catch (Exception e) {
177 					rethrowable = e;
178 				}
179 
180 			}
181 
182 			if (recoveryCallback != null) {
183 				return recoveryCallback.recover(context);
184 			}
185 
186 			if (exhausted != null) {
187 				throw exhausted;
188 			}
189 
190 			throw rethrowable;
191 
192 		}
193 
194 	}
195 
196 	private final InnerRetryTemplate delegate = new InnerRetryTemplate();
197 
198 	private final RetryTemplate regular = new RetryTemplate();
199 
200 	private RetryPolicy retryPolicy;
201 
202 	public <T> T execute(RetryCallback<T> retryCallback, Collection<RetryState> states) throws ExhaustedRetryException,
203 	Exception {
204 		RetryState batchState = new BatchRetryState(states);
205 		return delegate.execute(retryCallback, batchState);
206 	}
207 
208 	public <T> T execute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback,
209 			Collection<RetryState> states) throws ExhaustedRetryException, Exception {
210 		RetryState batchState = new BatchRetryState(states);
211 		return delegate.execute(retryCallback, recoveryCallback, batchState);
212 	}
213 
214 	@Override
215 	public final <T> T execute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback,
216 			RetryState retryState) throws Exception, ExhaustedRetryException {
217 		return regular.execute(retryCallback, recoveryCallback, retryState);
218 	}
219 
220 	@Override
221 	public final <T> T execute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback) throws Exception {
222 		return regular.execute(retryCallback, recoveryCallback);
223 	}
224 
225 	@Override
226 	public final <T> T execute(RetryCallback<T> retryCallback, RetryState retryState) throws Exception,
227 	ExhaustedRetryException {
228 		return regular.execute(retryCallback, retryState);
229 	}
230 
231 	@Override
232 	public final <T> T execute(RetryCallback<T> retryCallback) throws Exception {
233 		return regular.execute(retryCallback);
234 	}
235 
236 	public static List<RetryState> createState(List<?> keys) {
237 		List<RetryState> states = new ArrayList<RetryState>();
238 		for (Object key : keys) {
239 			states.add(new DefaultRetryState(key));
240 		}
241 		return states;
242 	}
243 
244 	public static List<RetryState> createState(List<?> keys, Classifier<? super Throwable, Boolean> classifier) {
245 		List<RetryState> states = new ArrayList<RetryState>();
246 		for (Object key : keys) {
247 			states.add(new DefaultRetryState(key, classifier));
248 		}
249 		return states;
250 	}
251 
252 	public void registerListener(RetryListener listener) {
253 		delegate.registerListener(listener);
254 		regular.registerListener(listener);
255 	}
256 
257 	public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
258 		delegate.setBackOffPolicy(backOffPolicy);
259 		regular.setBackOffPolicy(backOffPolicy);
260 	}
261 
262 	public void setListeners(RetryListener[] listeners) {
263 		delegate.setListeners(listeners);
264 		regular.setListeners(listeners);
265 	}
266 
267 	public void setRetryContextCache(RetryContextCache retryContextCache) {
268 		delegate.setRetryContextCache(retryContextCache);
269 		regular.setRetryContextCache(retryContextCache);
270 	}
271 
272 	public void setRetryPolicy(RetryPolicy retryPolicy) {
273 		this.retryPolicy = retryPolicy;
274 		delegate.setRetryPolicy(retryPolicy);
275 		regular.setRetryPolicy(retryPolicy);
276 	}
277 
278 	public boolean canRetry(RetryContext context) {
279 		return context==null ? true : retryPolicy.canRetry(context);
280 	}
281 
282 }