1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.io.File; |
20 | import java.io.IOException; |
21 | import java.util.List; |
22 | |
23 | import org.springframework.batch.item.ExecutionContext; |
24 | import org.springframework.batch.item.ItemStream; |
25 | import org.springframework.batch.item.ItemStreamException; |
26 | import org.springframework.batch.item.ItemWriter; |
27 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
28 | import org.springframework.core.io.FileSystemResource; |
29 | import org.springframework.core.io.Resource; |
30 | import org.springframework.util.Assert; |
31 | import org.springframework.util.ClassUtils; |
32 | |
33 | /** |
34 | * Wraps a {@link ResourceAwareItemWriterItemStream} and creates a new output |
35 | * resource when the count of items written in current resource exceeds |
36 | * {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized |
37 | * with {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. |
38 | * |
39 | * Note that new resources are created only at chunk boundaries i.e. the number |
40 | * of items written into one resource is between the limit set by |
41 | * {@link #setItemCountLimitPerResource(int)} and (limit + chunk size). |
42 | * |
43 | * @param <T> item type |
44 | * |
45 | * @author Robert Kasanicky |
46 | */ |
47 | public class MultiResourceItemWriter<T> extends ExecutionContextUserSupport implements ItemWriter<T>, ItemStream { |
48 | |
49 | final static private String RESOURCE_INDEX_KEY = "resource.index"; |
50 | |
51 | final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count"; |
52 | |
53 | private Resource resource; |
54 | |
55 | private ResourceAwareItemWriterItemStream<? super T> delegate; |
56 | |
57 | private int itemCountLimitPerResource = Integer.MAX_VALUE; |
58 | |
59 | private int currentResourceItemCount = 0; |
60 | |
61 | private int resourceIndex = 1; |
62 | |
63 | private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator(); |
64 | |
65 | private boolean saveState = true; |
66 | |
67 | private boolean opened = false; |
68 | |
69 | public MultiResourceItemWriter() { |
70 | setName(ClassUtils.getShortName(MultiResourceItemWriter.class)); |
71 | } |
72 | |
73 | public void write(List<? extends T> items) throws Exception { |
74 | if (!opened) { |
75 | File file = setResourceToDelegate(); |
76 | // create only if write is called |
77 | file.createNewFile(); |
78 | Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable"); |
79 | delegate.open(new ExecutionContext()); |
80 | opened = true; |
81 | } |
82 | delegate.write(items); |
83 | currentResourceItemCount += items.size(); |
84 | if (currentResourceItemCount >= itemCountLimitPerResource) { |
85 | delegate.close(); |
86 | resourceIndex++; |
87 | currentResourceItemCount = 0; |
88 | setResourceToDelegate(); |
89 | opened = false; |
90 | } |
91 | } |
92 | |
93 | /** |
94 | * Allows customization of the suffix of the created resources based on the |
95 | * index. |
96 | */ |
97 | public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) { |
98 | this.suffixCreator = suffixCreator; |
99 | } |
100 | |
101 | /** |
102 | * After this limit is exceeded the next chunk will be written into newly |
103 | * created resource. |
104 | */ |
105 | public void setItemCountLimitPerResource(int itemCountLimitPerResource) { |
106 | this.itemCountLimitPerResource = itemCountLimitPerResource; |
107 | } |
108 | |
109 | /** |
110 | * Delegate used for actual writing of the output. |
111 | */ |
112 | public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) { |
113 | this.delegate = delegate; |
114 | } |
115 | |
116 | /** |
117 | * Prototype for output resources. Actual output files will be created in |
118 | * the same directory and use the same name as this prototype with appended |
119 | * suffix (according to |
120 | * {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. |
121 | */ |
122 | public void setResource(Resource resource) { |
123 | this.resource = resource; |
124 | } |
125 | |
126 | public void setSaveState(boolean saveState) { |
127 | this.saveState = saveState; |
128 | } |
129 | |
130 | public void close() throws ItemStreamException { |
131 | resourceIndex = 1; |
132 | currentResourceItemCount = 0; |
133 | if (opened) { |
134 | delegate.close(); |
135 | } |
136 | } |
137 | |
138 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
139 | resourceIndex = executionContext.getInt(getKey(RESOURCE_INDEX_KEY), 1); |
140 | currentResourceItemCount = executionContext.getInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), 0); |
141 | |
142 | try { |
143 | setResourceToDelegate(); |
144 | } |
145 | catch (IOException e) { |
146 | throw new ItemStreamException("Couldn't assign resource", e); |
147 | } |
148 | |
149 | if (executionContext.containsKey(getKey(CURRENT_RESOURCE_ITEM_COUNT))) { |
150 | // It's a restart |
151 | delegate.open(executionContext); |
152 | } |
153 | else { |
154 | opened = false; |
155 | } |
156 | } |
157 | |
158 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
159 | if (saveState) { |
160 | if (opened) { |
161 | delegate.update(executionContext); |
162 | } |
163 | executionContext.putInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount); |
164 | executionContext.putInt(getKey(RESOURCE_INDEX_KEY), resourceIndex); |
165 | } |
166 | } |
167 | |
168 | /** |
169 | * Create output resource (if necessary) and point the delegate to it. |
170 | */ |
171 | private File setResourceToDelegate() throws IOException { |
172 | String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex); |
173 | File file = new File(path); |
174 | delegate.setResource(new FileSystemResource(file)); |
175 | return file; |
176 | } |
177 | } |