View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNotNull;
21  import static org.junit.Assert.assertNotSame;
22  import static org.junit.Assert.assertSame;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Set;
31  
32  import org.junit.Test;
33  import org.springframework.batch.item.ExecutionContext;
34  import org.springframework.batch.repeat.RepeatCallback;
35  import org.springframework.batch.repeat.RepeatContext;
36  import org.springframework.batch.repeat.RepeatStatus;
37  import org.springframework.batch.repeat.callback.NestedRepeatCallback;
38  import org.springframework.batch.repeat.exception.ExceptionHandler;
39  import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
40  import org.springframework.core.task.SimpleAsyncTaskExecutor;
41  
42  public class TaskExecutorRepeatTemplateAsynchronousTests extends AbstractTradeBatchTests {
43  
44  	RepeatTemplate template = getRepeatTemplate();
45  
46  	int count = 0;
47  
48  	// @Override
49  	public RepeatTemplate getRepeatTemplate() {
50  		TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
51  		template.setTaskExecutor(new SimpleAsyncTaskExecutor());
52  		// Set default completion above number of items in input file
53  		template.setCompletionPolicy(new SimpleCompletionPolicy(8));
54  		return template;
55  	}
56  
57  	@Test
58  	public void testEarlyCompletionWithException() throws Exception {
59  
60  		TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
61  		SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
62  		template.setCompletionPolicy(new SimpleCompletionPolicy(20));
63  		taskExecutor.setConcurrencyLimit(2);
64  		template.setTaskExecutor(taskExecutor);
65  		try {
66  			template.iterate(new RepeatCallback() {
67  				public RepeatStatus doInIteration(RepeatContext context) throws Exception {
68  					count++;
69  					throw new IllegalStateException("foo!");
70  				}
71  			});
72  			fail("Expected IllegalStateException");
73  		}
74  		catch (IllegalStateException e) {
75  			assertEquals("foo!", e.getMessage());
76  		}
77  
78  		assertTrue("Too few attempts: " + count, count >= 1);
79  		assertTrue("Too many attempts: " + count, count <= 10);
80  
81  	}
82  
83  	@Test
84  	public void testExceptionHandlerSwallowsException() throws Exception {
85  
86  		TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
87  		SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
88  		template.setCompletionPolicy(new SimpleCompletionPolicy(4));
89  		taskExecutor.setConcurrencyLimit(2);
90  		template.setTaskExecutor(taskExecutor);
91  
92  		template.setExceptionHandler(new ExceptionHandler() {
93  			public void handleException(RepeatContext context, Throwable throwable) throws Throwable {
94  				count++;
95  			}
96  		});
97  		template.iterate(new RepeatCallback() {
98  			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
99  				throw new IllegalStateException("foo!");
100 			}
101 		});
102 
103 		assertTrue("Too few attempts: " + count, count >= 1);
104 		assertTrue("Too many attempts: " + count, count <= 10);
105 
106 	}
107 
108 	@Test
109 	public void testNestedSession() throws Exception {
110 
111 		RepeatTemplate outer = getRepeatTemplate();
112 		RepeatTemplate inner = new RepeatTemplate();
113 
114 		outer.iterate(new NestedRepeatCallback(inner, new RepeatCallback() {
115 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
116 				count++;
117 				assertNotNull(context);
118 				assertNotSame("Nested batch should have new session", context, context.getParent());
119 				assertSame(context, RepeatSynchronizationManager.getContext());
120 				return RepeatStatus.FINISHED;
121 			}
122 		}) {
123 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
124 				count++;
125 				assertNotNull(context);
126 				assertSame(context, RepeatSynchronizationManager.getContext());
127 				return super.doInIteration(context);
128 			}
129 		});
130 
131 		assertTrue("Too few attempts: " + count, count >= 1);
132 		assertTrue("Too many attempts: " + count, count <= 10);
133 
134 	}
135 
136 	/**
137 	 * Run a batch with a single template that itself has an async task
138 	 * executor. The result is a batch that runs in multiple threads (up to the
139 	 * throttle limit of the template).
140 	 * 
141 	 * @throws Exception
142 	 */
143 	@Test
144 	public void testMultiThreadAsynchronousExecution() throws Exception {
145 
146 		final String threadName = Thread.currentThread().getName();
147 		final Set<String> threadNames = new HashSet<String>();
148 
149 		final RepeatCallback callback = new RepeatCallback() {
150 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
151 				assertNotSame(threadName, Thread.currentThread().getName());
152 				threadNames.add(Thread.currentThread().getName());
153 				Thread.sleep(100);
154 				Trade item = provider.read();
155 				if (item != null) {
156 					processor.write(Collections.singletonList(item));
157 				}
158 				return RepeatStatus.continueIf(item != null);
159 			}
160 		};
161 
162 		template.iterate(callback);
163 		// Shouldn't be necessary to wait:
164 		// Thread.sleep(500);
165 		assertEquals(NUMBER_OF_ITEMS, processor.count);
166 		assertTrue(threadNames.size() > 1);
167 	}
168 
169 	@Test
170 	public void testThrottleLimit() throws Exception {
171 
172 		int throttleLimit = 600;
173 
174 		TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
175 		SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
176 		taskExecutor.setConcurrencyLimit(300);
177 		template.setTaskExecutor(taskExecutor);
178 		template.setThrottleLimit(throttleLimit);
179 
180 		final String threadName = Thread.currentThread().getName();
181 		final Set<String> threadNames = new HashSet<String>();
182 		final List<String> items = new ArrayList<String>();
183 
184 		final RepeatCallback callback = new RepeatCallback() {
185 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
186 				assertNotSame(threadName, Thread.currentThread().getName());
187 				Trade item = provider.read();
188 				threadNames.add(Thread.currentThread().getName() + " : " + item);
189 				items.add("" + item);
190 				if (item != null) {
191 					processor.write(Collections.singletonList(item));
192 					// Do some more I/O
193 					for (int i = 0; i < 10; i++) {
194 						TradeItemReader provider = new TradeItemReader(resource);
195 						provider.open(new ExecutionContext());
196 						while (provider.read() != null)
197 							continue;
198 						provider.close();
199 					}
200 				}
201 				return RepeatStatus.continueIf(item != null);
202 			}
203 		};
204 
205 		template.iterate(callback);
206 		// Shouldn't be necessary to wait:
207 		// Thread.sleep(500);
208 		assertEquals(NUMBER_OF_ITEMS, processor.count);
209 		assertTrue(threadNames.size() > 1);
210 		int frequency = Collections.frequency(items, "null");
211 		// System.err.println("Frequency: "+frequency);
212 		assertTrue(frequency <= throttleLimit);
213 	}
214 
215 	/**
216 	 * Wrap an otherwise synchronous batch in a callback to an asynchronous
217 	 * template.
218 	 * 
219 	 * @throws Exception
220 	 */
221 	@Test
222 	public void testSingleThreadAsynchronousExecution() throws Exception {
223 		TaskExecutorRepeatTemplate jobTemplate = new TaskExecutorRepeatTemplate();
224 		final RepeatTemplate stepTemplate = new RepeatTemplate();
225 		SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
226 		taskExecutor.setConcurrencyLimit(2);
227 		jobTemplate.setTaskExecutor(taskExecutor);
228 
229 		final String threadName = Thread.currentThread().getName();
230 		final Set<String> threadNames = new HashSet<String>();
231 
232 		final RepeatCallback stepCallback = new ItemReaderRepeatCallback<Trade>(provider, processor) {
233 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
234 				assertNotSame(threadName, Thread.currentThread().getName());
235 				threadNames.add(Thread.currentThread().getName());
236 				Thread.sleep(100);
237 				TradeItemReader provider = new TradeItemReader(resource);
238 				provider.open(new ExecutionContext());
239 				while (provider.read() != null)
240 					;
241 				return super.doInIteration(context);
242 			}
243 		};
244 		RepeatCallback jobCallback = new RepeatCallback() {
245 			public RepeatStatus doInIteration(RepeatContext context) throws Exception {
246 				stepTemplate.iterate(stepCallback);
247 				return RepeatStatus.FINISHED;
248 			}
249 		};
250 
251 		jobTemplate.iterate(jobCallback);
252 		// Shouldn't be necessary to wait:
253 		// Thread.sleep(500);
254 		assertEquals(NUMBER_OF_ITEMS, processor.count);
255 		// Because of the throttling and queueing internally to a TaskExecutor,
256 		// more than one thread will be used - the number used is the
257 		// concurrency limit in the task executor, plus 1.
258 		// System.err.println(threadNames);
259 		assertTrue(threadNames.size() >= 1);
260 	}
261 
262 }