1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.apache.commons.logging.Log; |
22 | import org.apache.commons.logging.LogFactory; |
23 | import org.springframework.batch.core.StepContribution; |
24 | import org.springframework.batch.core.StepListener; |
25 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
26 | import org.springframework.batch.item.ItemReader; |
27 | import org.springframework.batch.repeat.RepeatCallback; |
28 | import org.springframework.batch.repeat.RepeatContext; |
29 | import org.springframework.batch.repeat.RepeatOperations; |
30 | import org.springframework.batch.repeat.RepeatStatus; |
31 | |
32 | /** |
33 | * Simple implementation of the ChunkProvider interface that does basic chunk |
34 | * providing from an {@link ItemReader}. |
35 | * |
36 | * @author Dave Syer |
37 | * @author Michael Minella |
38 | * @see ChunkOrientedTasklet |
39 | */ |
40 | public class SimpleChunkProvider<I> implements ChunkProvider<I> { |
41 | |
42 | protected final Log logger = LogFactory.getLog(getClass()); |
43 | |
44 | protected final ItemReader<? extends I> itemReader; |
45 | |
46 | private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>(); |
47 | |
48 | private final RepeatOperations repeatOperations; |
49 | |
50 | public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) { |
51 | this.itemReader = itemReader; |
52 | this.repeatOperations = repeatOperations; |
53 | } |
54 | |
55 | /** |
56 | * Register some {@link StepListener}s with the handler. Each will get the |
57 | * callbacks in the order specified at the correct stage. |
58 | * |
59 | * @param listeners |
60 | */ |
61 | public void setListeners(List<? extends StepListener> listeners) { |
62 | for (StepListener listener : listeners) { |
63 | registerListener(listener); |
64 | } |
65 | } |
66 | |
67 | /** |
68 | * Register a listener for callbacks at the appropriate stages in a process. |
69 | * |
70 | * @param listener a {@link StepListener} |
71 | */ |
72 | public void registerListener(StepListener listener) { |
73 | this.listener.register(listener); |
74 | } |
75 | |
76 | /** |
77 | * @return the listener |
78 | */ |
79 | protected MulticasterBatchListener<I, ?> getListener() { |
80 | return listener; |
81 | } |
82 | |
83 | /** |
84 | * Surrounds the read call with listener callbacks. |
85 | * @return item |
86 | * @throws Exception |
87 | */ |
88 | protected final I doRead() throws Exception { |
89 | try { |
90 | listener.beforeRead(); |
91 | I item = itemReader.read(); |
92 | if(item != null) { |
93 | listener.afterRead(item); |
94 | } |
95 | return item; |
96 | } |
97 | catch (Exception e) { |
98 | logger.debug(e.getMessage() + " : " + e.getClass().getName()); |
99 | listener.onReadError(e); |
100 | throw e; |
101 | } |
102 | } |
103 | |
104 | @Override |
105 | public Chunk<I> provide(final StepContribution contribution) throws Exception { |
106 | |
107 | final Chunk<I> inputs = new Chunk<I>(); |
108 | repeatOperations.iterate(new RepeatCallback() { |
109 | |
110 | @Override |
111 | public RepeatStatus doInIteration(final RepeatContext context) throws Exception { |
112 | I item = null; |
113 | try { |
114 | item = read(contribution, inputs); |
115 | } |
116 | catch (SkipOverflowException e) { |
117 | // read() tells us about an excess of skips by throwing an |
118 | // exception |
119 | return RepeatStatus.FINISHED; |
120 | } |
121 | if (item == null) { |
122 | inputs.setEnd(); |
123 | return RepeatStatus.FINISHED; |
124 | } |
125 | inputs.add(item); |
126 | contribution.incrementReadCount(); |
127 | return RepeatStatus.CONTINUABLE; |
128 | } |
129 | |
130 | }); |
131 | |
132 | return inputs; |
133 | |
134 | } |
135 | |
136 | @Override |
137 | public void postProcess(StepContribution contribution, Chunk<I> chunk) { |
138 | // do nothing |
139 | } |
140 | |
141 | /** |
142 | * Delegates to {@link #doRead()}. Subclasses can add additional behaviour |
143 | * (e.g. exception handling). |
144 | * |
145 | * @param contribution the current step execution contribution |
146 | * @param chunk the current chunk |
147 | * @return a new item for processing |
148 | * |
149 | * @throws SkipOverflowException if specifically the chunk is accumulating |
150 | * too much data (e.g. skips) and it wants to force a commit. |
151 | * |
152 | * @throws Exception if there is a generic issue |
153 | */ |
154 | protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception { |
155 | return doRead(); |
156 | } |
157 | |
158 | } |