1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.io.File; |
20 | import java.io.IOException; |
21 | import java.util.List; |
22 | import org.springframework.batch.item.ExecutionContext; |
23 | import org.springframework.batch.item.ItemStreamException; |
24 | import org.springframework.batch.item.support.AbstractItemStreamItemWriter; |
25 | import org.springframework.core.io.FileSystemResource; |
26 | import org.springframework.core.io.Resource; |
27 | import org.springframework.util.Assert; |
28 | import org.springframework.util.ClassUtils; |
29 | |
30 | /** |
31 | * Wraps a {@link ResourceAwareItemWriterItemStream} and creates a new output |
32 | * resource when the count of items written in current resource exceeds |
33 | * {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized |
34 | * with {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. |
35 | * |
36 | * Note that new resources are created only at chunk boundaries i.e. the number |
37 | * of items written into one resource is between the limit set by |
38 | * {@link #setItemCountLimitPerResource(int)} and (limit + chunk size). |
39 | * |
40 | * @param <T> item type |
41 | * |
42 | * @author Robert Kasanicky |
43 | */ |
44 | public class MultiResourceItemWriter<T> extends AbstractItemStreamItemWriter<T> { |
45 | |
46 | final static private String RESOURCE_INDEX_KEY = "resource.index"; |
47 | |
48 | final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count"; |
49 | |
50 | private Resource resource; |
51 | |
52 | private ResourceAwareItemWriterItemStream<? super T> delegate; |
53 | |
54 | private int itemCountLimitPerResource = Integer.MAX_VALUE; |
55 | |
56 | private int currentResourceItemCount = 0; |
57 | |
58 | private int resourceIndex = 1; |
59 | |
60 | private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator(); |
61 | |
62 | private boolean saveState = true; |
63 | |
64 | private boolean opened = false; |
65 | |
66 | public MultiResourceItemWriter() { |
67 | this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemWriter.class)); |
68 | } |
69 | |
70 | @Override |
71 | public void write(List<? extends T> items) throws Exception { |
72 | if (!opened) { |
73 | File file = setResourceToDelegate(); |
74 | // create only if write is called |
75 | file.createNewFile(); |
76 | Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable"); |
77 | delegate.open(new ExecutionContext()); |
78 | opened = true; |
79 | } |
80 | delegate.write(items); |
81 | currentResourceItemCount += items.size(); |
82 | if (currentResourceItemCount >= itemCountLimitPerResource) { |
83 | delegate.close(); |
84 | resourceIndex++; |
85 | currentResourceItemCount = 0; |
86 | setResourceToDelegate(); |
87 | opened = false; |
88 | } |
89 | } |
90 | |
91 | /** |
92 | * Allows customization of the suffix of the created resources based on the |
93 | * index. |
94 | */ |
95 | public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) { |
96 | this.suffixCreator = suffixCreator; |
97 | } |
98 | |
99 | /** |
100 | * After this limit is exceeded the next chunk will be written into newly |
101 | * created resource. |
102 | */ |
103 | public void setItemCountLimitPerResource(int itemCountLimitPerResource) { |
104 | this.itemCountLimitPerResource = itemCountLimitPerResource; |
105 | } |
106 | |
107 | /** |
108 | * Delegate used for actual writing of the output. |
109 | */ |
110 | public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) { |
111 | this.delegate = delegate; |
112 | } |
113 | |
114 | /** |
115 | * Prototype for output resources. Actual output files will be created in |
116 | * the same directory and use the same name as this prototype with appended |
117 | * suffix (according to |
118 | * {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. |
119 | */ |
120 | public void setResource(Resource resource) { |
121 | this.resource = resource; |
122 | } |
123 | |
124 | public void setSaveState(boolean saveState) { |
125 | this.saveState = saveState; |
126 | } |
127 | |
128 | @Override |
129 | public void close() throws ItemStreamException { |
130 | super.close(); |
131 | resourceIndex = 1; |
132 | currentResourceItemCount = 0; |
133 | if (opened) { |
134 | delegate.close(); |
135 | } |
136 | } |
137 | |
138 | @Override |
139 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
140 | super.open(executionContext); |
141 | resourceIndex = executionContext.getInt(getExecutionContextKey(RESOURCE_INDEX_KEY), 1); |
142 | currentResourceItemCount = executionContext.getInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), 0); |
143 | |
144 | try { |
145 | setResourceToDelegate(); |
146 | } |
147 | catch (IOException e) { |
148 | throw new ItemStreamException("Couldn't assign resource", e); |
149 | } |
150 | |
151 | if (executionContext.containsKey(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT))) { |
152 | // It's a restart |
153 | delegate.open(executionContext); |
154 | } |
155 | else { |
156 | opened = false; |
157 | } |
158 | } |
159 | |
160 | @Override |
161 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
162 | super.update(executionContext); |
163 | if (saveState) { |
164 | if (opened) { |
165 | delegate.update(executionContext); |
166 | } |
167 | executionContext.putInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount); |
168 | executionContext.putInt(getExecutionContextKey(RESOURCE_INDEX_KEY), resourceIndex); |
169 | } |
170 | } |
171 | |
172 | /** |
173 | * Create output resource (if necessary) and point the delegate to it. |
174 | */ |
175 | private File setResourceToDelegate() throws IOException { |
176 | String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex); |
177 | File file = new File(path); |
178 | delegate.setResource(new FileSystemResource(file)); |
179 | return file; |
180 | } |
181 | } |