1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.repeat.support;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertNotSame;
22 import static org.junit.Assert.assertSame;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31
32 import org.junit.Test;
33 import org.springframework.batch.item.ExecutionContext;
34 import org.springframework.batch.repeat.RepeatCallback;
35 import org.springframework.batch.repeat.RepeatContext;
36 import org.springframework.batch.repeat.RepeatStatus;
37 import org.springframework.batch.repeat.callback.NestedRepeatCallback;
38 import org.springframework.batch.repeat.exception.ExceptionHandler;
39 import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
40 import org.springframework.core.task.SimpleAsyncTaskExecutor;
41
42 public class TaskExecutorRepeatTemplateAsynchronousTests extends AbstractTradeBatchTests {
43
44 RepeatTemplate template = getRepeatTemplate();
45
46 int count = 0;
47
48
49 public RepeatTemplate getRepeatTemplate() {
50 TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
51 template.setTaskExecutor(new SimpleAsyncTaskExecutor());
52
53 template.setCompletionPolicy(new SimpleCompletionPolicy(8));
54 return template;
55 }
56
57 @Test
58 public void testEarlyCompletionWithException() throws Exception {
59
60 TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
61 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
62 template.setCompletionPolicy(new SimpleCompletionPolicy(20));
63 taskExecutor.setConcurrencyLimit(2);
64 template.setTaskExecutor(taskExecutor);
65 try {
66 template.iterate(new RepeatCallback() {
67 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
68 count++;
69 throw new IllegalStateException("foo!");
70 }
71 });
72 fail("Expected IllegalStateException");
73 }
74 catch (IllegalStateException e) {
75 assertEquals("foo!", e.getMessage());
76 }
77
78 assertTrue("Too few attempts: " + count, count >= 1);
79 assertTrue("Too many attempts: " + count, count <= 10);
80
81 }
82
83 @Test
84 public void testExceptionHandlerSwallowsException() throws Exception {
85
86 TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
87 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
88 template.setCompletionPolicy(new SimpleCompletionPolicy(4));
89 taskExecutor.setConcurrencyLimit(2);
90 template.setTaskExecutor(taskExecutor);
91
92 template.setExceptionHandler(new ExceptionHandler() {
93 public void handleException(RepeatContext context, Throwable throwable) throws Throwable {
94 count++;
95 }
96 });
97 template.iterate(new RepeatCallback() {
98 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
99 throw new IllegalStateException("foo!");
100 }
101 });
102
103 assertTrue("Too few attempts: " + count, count >= 1);
104 assertTrue("Too many attempts: " + count, count <= 10);
105
106 }
107
108 @Test
109 public void testNestedSession() throws Exception {
110
111 RepeatTemplate outer = getRepeatTemplate();
112 RepeatTemplate inner = new RepeatTemplate();
113
114 outer.iterate(new NestedRepeatCallback(inner, new RepeatCallback() {
115 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
116 count++;
117 assertNotNull(context);
118 assertNotSame("Nested batch should have new session", context, context.getParent());
119 assertSame(context, RepeatSynchronizationManager.getContext());
120 return RepeatStatus.FINISHED;
121 }
122 }) {
123 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
124 count++;
125 assertNotNull(context);
126 assertSame(context, RepeatSynchronizationManager.getContext());
127 return super.doInIteration(context);
128 }
129 });
130
131 assertTrue("Too few attempts: " + count, count >= 1);
132 assertTrue("Too many attempts: " + count, count <= 10);
133
134 }
135
136
137
138
139
140
141
142
143 @Test
144 public void testMultiThreadAsynchronousExecution() throws Exception {
145
146 final String threadName = Thread.currentThread().getName();
147 final Set<String> threadNames = new HashSet<String>();
148
149 final RepeatCallback callback = new RepeatCallback() {
150 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
151 assertNotSame(threadName, Thread.currentThread().getName());
152 threadNames.add(Thread.currentThread().getName());
153 Thread.sleep(100);
154 Trade item = provider.read();
155 if (item != null) {
156 processor.write(Collections.singletonList(item));
157 }
158 return RepeatStatus.continueIf(item != null);
159 }
160 };
161
162 template.iterate(callback);
163
164
165 assertEquals(NUMBER_OF_ITEMS, processor.count);
166 assertTrue(threadNames.size() > 1);
167 }
168
169 @Test
170 public void testThrottleLimit() throws Exception {
171
172 int throttleLimit = 600;
173
174 TaskExecutorRepeatTemplate template = new TaskExecutorRepeatTemplate();
175 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
176 taskExecutor.setConcurrencyLimit(300);
177 template.setTaskExecutor(taskExecutor);
178 template.setThrottleLimit(throttleLimit);
179
180 final String threadName = Thread.currentThread().getName();
181 final Set<String> threadNames = new HashSet<String>();
182 final List<String> items = new ArrayList<String>();
183
184 final RepeatCallback callback = new RepeatCallback() {
185 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
186 assertNotSame(threadName, Thread.currentThread().getName());
187 Trade item = provider.read();
188 threadNames.add(Thread.currentThread().getName() + " : " + item);
189 items.add("" + item);
190 if (item != null) {
191 processor.write(Collections.singletonList(item));
192
193 for (int i = 0; i < 10; i++) {
194 TradeItemReader provider = new TradeItemReader(resource);
195 provider.open(new ExecutionContext());
196 while (provider.read() != null)
197 continue;
198 provider.close();
199 }
200 }
201 return RepeatStatus.continueIf(item != null);
202 }
203 };
204
205 template.iterate(callback);
206
207
208 assertEquals(NUMBER_OF_ITEMS, processor.count);
209 assertTrue(threadNames.size() > 1);
210 int frequency = Collections.frequency(items, "null");
211
212 assertTrue(frequency <= throttleLimit);
213 }
214
215
216
217
218
219
220
221 @Test
222 public void testSingleThreadAsynchronousExecution() throws Exception {
223 TaskExecutorRepeatTemplate jobTemplate = new TaskExecutorRepeatTemplate();
224 final RepeatTemplate stepTemplate = new RepeatTemplate();
225 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
226 taskExecutor.setConcurrencyLimit(2);
227 jobTemplate.setTaskExecutor(taskExecutor);
228
229 final String threadName = Thread.currentThread().getName();
230 final Set<String> threadNames = new HashSet<String>();
231
232 final RepeatCallback stepCallback = new ItemReaderRepeatCallback<Trade>(provider, processor) {
233 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
234 assertNotSame(threadName, Thread.currentThread().getName());
235 threadNames.add(Thread.currentThread().getName());
236 Thread.sleep(100);
237 TradeItemReader provider = new TradeItemReader(resource);
238 provider.open(new ExecutionContext());
239 while (provider.read() != null)
240 ;
241 return super.doInIteration(context);
242 }
243 };
244 RepeatCallback jobCallback = new RepeatCallback() {
245 public RepeatStatus doInIteration(RepeatContext context) throws Exception {
246 stepTemplate.iterate(stepCallback);
247 return RepeatStatus.FINISHED;
248 }
249 };
250
251 jobTemplate.iterate(jobCallback);
252
253
254 assertEquals(NUMBER_OF_ITEMS, processor.count);
255
256
257
258
259 assertTrue(threadNames.size() >= 1);
260 }
261
262 }