1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.apache.commons.logging.Log; |
22 | import org.apache.commons.logging.LogFactory; |
23 | import org.springframework.batch.core.StepContribution; |
24 | import org.springframework.batch.core.StepListener; |
25 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
26 | import org.springframework.batch.item.ItemReader; |
27 | import org.springframework.batch.repeat.RepeatCallback; |
28 | import org.springframework.batch.repeat.RepeatContext; |
29 | import org.springframework.batch.repeat.RepeatOperations; |
30 | import org.springframework.batch.repeat.RepeatStatus; |
31 | |
32 | /** |
33 | * Simple implementation of the ChunkProvider interface that does basic chunk |
34 | * providing from an {@link ItemReader}. |
35 | * |
36 | * @author Dave Syer |
37 | * @see ChunkOrientedTasklet |
38 | */ |
39 | public class SimpleChunkProvider<I> implements ChunkProvider<I> { |
40 | |
41 | protected final Log logger = LogFactory.getLog(getClass()); |
42 | |
43 | protected final ItemReader<? extends I> itemReader; |
44 | |
45 | private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>(); |
46 | |
47 | private final RepeatOperations repeatOperations; |
48 | |
49 | public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) { |
50 | this.itemReader = itemReader; |
51 | this.repeatOperations = repeatOperations; |
52 | } |
53 | |
54 | /** |
55 | * Register some {@link StepListener}s with the handler. Each will get the |
56 | * callbacks in the order specified at the correct stage. |
57 | * |
58 | * @param listeners |
59 | */ |
60 | public void setListeners(List<? extends StepListener> listeners) { |
61 | for (StepListener listener : listeners) { |
62 | registerListener(listener); |
63 | } |
64 | } |
65 | |
66 | /** |
67 | * Register a listener for callbacks at the appropriate stages in a process. |
68 | * |
69 | * @param listener a {@link StepListener} |
70 | */ |
71 | public void registerListener(StepListener listener) { |
72 | this.listener.register(listener); |
73 | } |
74 | |
75 | /** |
76 | * @return the listener |
77 | */ |
78 | protected MulticasterBatchListener<I, ?> getListener() { |
79 | return listener; |
80 | } |
81 | |
82 | /** |
83 | * Surrounds the read call with listener callbacks. |
84 | * @return item |
85 | * @throws Exception |
86 | */ |
87 | protected final I doRead() throws Exception { |
88 | try { |
89 | listener.beforeRead(); |
90 | I item = itemReader.read(); |
91 | if(item != null) { |
92 | listener.afterRead(item); |
93 | } |
94 | return item; |
95 | } |
96 | catch (Exception e) { |
97 | listener.onReadError(e); |
98 | throw e; |
99 | } |
100 | } |
101 | |
102 | public Chunk<I> provide(final StepContribution contribution) throws Exception { |
103 | |
104 | final Chunk<I> inputs = new Chunk<I>(); |
105 | repeatOperations.iterate(new RepeatCallback() { |
106 | |
107 | public RepeatStatus doInIteration(final RepeatContext context) throws Exception { |
108 | I item = null; |
109 | try { |
110 | item = read(contribution, inputs); |
111 | } |
112 | catch (SkipOverflowException e) { |
113 | // read() tells us about an excess of skips by throwing an |
114 | // exception |
115 | return RepeatStatus.FINISHED; |
116 | } |
117 | if (item == null) { |
118 | inputs.setEnd(); |
119 | return RepeatStatus.FINISHED; |
120 | } |
121 | inputs.add(item); |
122 | contribution.incrementReadCount(); |
123 | return RepeatStatus.CONTINUABLE; |
124 | } |
125 | |
126 | }); |
127 | |
128 | return inputs; |
129 | |
130 | } |
131 | |
132 | public void postProcess(StepContribution contribution, Chunk<I> chunk) { |
133 | // do nothing |
134 | } |
135 | |
136 | /** |
137 | * Delegates to {@link #doRead()}. Subclasses can add additional behaviour |
138 | * (e.g. exception handling). |
139 | * |
140 | * @param contribution the current step execution contribution |
141 | * @param chunk the current chunk |
142 | * @return a new item for processing |
143 | * |
144 | * @throws SkipOverflowException if specifically the chunk is accumulating |
145 | * too much data (e.g. skips) and it wants to force a commit. |
146 | * |
147 | * @throws Exception if there is a generic issue |
148 | */ |
149 | protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception { |
150 | return doRead(); |
151 | } |
152 | |
153 | } |