1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.apache.commons.logging.Log; |
22 | import org.apache.commons.logging.LogFactory; |
23 | import org.springframework.batch.core.StepContribution; |
24 | import org.springframework.batch.core.StepListener; |
25 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
26 | import org.springframework.batch.item.ItemReader; |
27 | import org.springframework.batch.repeat.RepeatCallback; |
28 | import org.springframework.batch.repeat.RepeatContext; |
29 | import org.springframework.batch.repeat.RepeatOperations; |
30 | import org.springframework.batch.repeat.RepeatStatus; |
31 | |
32 | /** |
33 | * Simple implementation of the ChunkProvider interface that does basic |
34 | * chunk providing from an {@link ItemReader}. |
35 | * |
36 | * @author Dave Syer |
37 | * @see ChunkOrientedTasklet |
38 | */ |
39 | public class SimpleChunkProvider<I> implements ChunkProvider<I> { |
40 | |
41 | protected final Log logger = LogFactory.getLog(getClass()); |
42 | |
43 | protected final ItemReader<? extends I> itemReader; |
44 | |
45 | private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>(); |
46 | |
47 | private final RepeatOperations repeatOperations; |
48 | |
49 | public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) { |
50 | this.itemReader = itemReader; |
51 | this.repeatOperations = repeatOperations; |
52 | } |
53 | |
54 | /** |
55 | * Register some {@link StepListener}s with the handler. Each will get the |
56 | * callbacks in the order specified at the correct stage. |
57 | * |
58 | * @param listeners |
59 | */ |
60 | public void setListeners(List<? extends StepListener> listeners) { |
61 | for (StepListener listener : listeners) { |
62 | registerListener(listener); |
63 | } |
64 | } |
65 | |
66 | /** |
67 | * Register a listener for callbacks at the appropriate stages in a process. |
68 | * |
69 | * @param listener a {@link StepListener} |
70 | */ |
71 | public void registerListener(StepListener listener) { |
72 | this.listener.register(listener); |
73 | } |
74 | |
75 | /** |
76 | * @return the listener |
77 | */ |
78 | protected MulticasterBatchListener<I, ?> getListener() { |
79 | return listener; |
80 | } |
81 | |
82 | /** |
83 | * Surrounds the read call with listener callbacks. |
84 | * @return item |
85 | * @throws Exception |
86 | */ |
87 | protected final I doRead() throws Exception { |
88 | try { |
89 | listener.beforeRead(); |
90 | I item = itemReader.read(); |
91 | listener.afterRead(item); |
92 | return item; |
93 | } |
94 | catch (Exception e) { |
95 | listener.onReadError(e); |
96 | throw e; |
97 | } |
98 | } |
99 | |
100 | public Chunk<I> provide(final StepContribution contribution) throws Exception { |
101 | |
102 | final Chunk<I> inputs = new Chunk<I>(); |
103 | repeatOperations.iterate(new RepeatCallback() { |
104 | |
105 | public RepeatStatus doInIteration(final RepeatContext context) throws Exception { |
106 | I item = read(contribution, inputs); |
107 | if (item == null) { |
108 | inputs.setEnd(); |
109 | return RepeatStatus.FINISHED; |
110 | } |
111 | inputs.add(item); |
112 | contribution.incrementReadCount(); |
113 | return RepeatStatus.CONTINUABLE; |
114 | } |
115 | |
116 | }); |
117 | |
118 | return inputs; |
119 | |
120 | } |
121 | |
122 | public void postProcess(StepContribution contribution, Chunk<I> chunk) { |
123 | // do nothing |
124 | } |
125 | |
126 | protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception { |
127 | return doRead(); |
128 | } |
129 | |
130 | } |