1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.item.support; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.Arrays; |
20 | import java.util.Iterator; |
21 | import java.util.List; |
22 | |
23 | import org.springframework.batch.item.ExecutionContext; |
24 | import org.springframework.batch.item.ItemStream; |
25 | import org.springframework.batch.item.ItemStreamException; |
26 | |
27 | /** |
28 | * Simple {@link ItemStream} that delegates to a list of other streams. |
29 | * |
30 | * @author Dave Syer |
31 | * |
32 | */ |
33 | public class CompositeItemStream implements ItemStream { |
34 | |
35 | private List streams = new ArrayList(); |
36 | |
37 | /** |
38 | * Public setter for the listeners. |
39 | * |
40 | * @param listeners |
41 | */ |
42 | public void setStreams(ItemStream[] listeners) { |
43 | this.streams = Arrays.asList(listeners); |
44 | } |
45 | |
46 | /** |
47 | * |
48 | */ |
49 | public CompositeItemStream() { |
50 | super(); |
51 | } |
52 | |
53 | |
54 | /** |
55 | * Simple aggregate {@link ExecutionContext} provider for the contributions |
56 | * registered under the given key. |
57 | * |
58 | * @see org.springframework.batch.item.ItemStream#update(ExecutionContext) |
59 | */ |
60 | public void update(ExecutionContext executionContext) { |
61 | synchronized (streams) { |
62 | for (Iterator it = streams.iterator(); it.hasNext();) { |
63 | ItemStream itemStream = (ItemStream) it.next(); |
64 | itemStream.update(executionContext); |
65 | } |
66 | } |
67 | } |
68 | |
69 | /** |
70 | * Register a {@link ItemStream} as one of the interesting providers under |
71 | * the provided key. |
72 | * |
73 | */ |
74 | public void register(ItemStream stream) { |
75 | synchronized (streams) { |
76 | if (!streams.contains(stream)) { |
77 | streams.add(stream); |
78 | } |
79 | } |
80 | } |
81 | |
82 | /** |
83 | * Broadcast the call to close. |
84 | * @throws ItemStreamException |
85 | */ |
86 | public void close(ExecutionContext executionContext) throws ItemStreamException { |
87 | synchronized (streams) { |
88 | for (Iterator it = streams.iterator(); it.hasNext();) { |
89 | ItemStream itemStream = (ItemStream) it.next(); |
90 | itemStream.close(executionContext); |
91 | } |
92 | } |
93 | } |
94 | |
95 | /** |
96 | * Broadcast the call to open. |
97 | * @throws ItemStreamException |
98 | */ |
99 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
100 | synchronized (streams) { |
101 | for (Iterator it = streams.iterator(); it.hasNext();) { |
102 | ItemStream itemStream = (ItemStream) it.next(); |
103 | itemStream.open(executionContext); |
104 | } |
105 | } |
106 | } |
107 | |
108 | } |