View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.file;
18  
19  import java.io.File;
20  import java.io.IOException;
21  import java.util.List;
22  
23  import org.springframework.batch.item.ExecutionContext;
24  import org.springframework.batch.item.ItemStream;
25  import org.springframework.batch.item.ItemStreamException;
26  import org.springframework.batch.item.ItemWriter;
27  import org.springframework.batch.item.util.ExecutionContextUserSupport;
28  import org.springframework.core.io.FileSystemResource;
29  import org.springframework.core.io.Resource;
30  import org.springframework.util.Assert;
31  import org.springframework.util.ClassUtils;
32  
33  /**
34   * Wraps a {@link ResourceAwareItemWriterItemStream} and creates a new output
35   * resource when the count of items written in current resource exceeds
36   * {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized
37   * with {@link #setResourceSuffixCreator(ResourceSuffixCreator)}.
38   * 
39   * Note that new resources are created only at chunk boundaries i.e. the number
40   * of items written into one resource is between the limit set by
41   * {@link #setItemCountLimitPerResource(int)} and (limit + chunk size).
42   * 
43   * @param <T> item type
44   * 
45   * @author Robert Kasanicky
46   */
47  public class MultiResourceItemWriter<T> extends ExecutionContextUserSupport implements ItemWriter<T>, ItemStream {
48  
49  	final static private String RESOURCE_INDEX_KEY = "resource.index";
50  
51  	final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count";
52  
53  	private Resource resource;
54  
55  	private ResourceAwareItemWriterItemStream<? super T> delegate;
56  
57  	private int itemCountLimitPerResource = Integer.MAX_VALUE;
58  
59  	private int currentResourceItemCount = 0;
60  
61  	private int resourceIndex = 1;
62  
63  	private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator();
64  
65  	private boolean saveState = true;
66  
67  	private boolean opened = false;
68  
69  	public MultiResourceItemWriter() {
70  		setName(ClassUtils.getShortName(MultiResourceItemWriter.class));
71  	}
72  
73      @Override
74  	public void write(List<? extends T> items) throws Exception {
75  		if (!opened) {
76  			File file = setResourceToDelegate();
77  			// create only if write is called
78  			file.createNewFile();
79  			Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable");
80  			delegate.open(new ExecutionContext());
81  			opened = true;
82  		}
83  		delegate.write(items);
84  		currentResourceItemCount += items.size();
85  		if (currentResourceItemCount >= itemCountLimitPerResource) {
86  			delegate.close();
87  			resourceIndex++;
88  			currentResourceItemCount = 0;
89  			setResourceToDelegate();
90  			opened = false;
91  		}
92  	}
93  
94  	/**
95  	 * Allows customization of the suffix of the created resources based on the
96  	 * index.
97  	 */
98  	public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) {
99  		this.suffixCreator = suffixCreator;
100 	}
101 
102 	/**
103 	 * After this limit is exceeded the next chunk will be written into newly
104 	 * created resource.
105 	 */
106 	public void setItemCountLimitPerResource(int itemCountLimitPerResource) {
107 		this.itemCountLimitPerResource = itemCountLimitPerResource;
108 	}
109 
110 	/**
111 	 * Delegate used for actual writing of the output.
112 	 */
113 	public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) {
114 		this.delegate = delegate;
115 	}
116 
117 	/**
118 	 * Prototype for output resources. Actual output files will be created in
119 	 * the same directory and use the same name as this prototype with appended
120 	 * suffix (according to
121 	 * {@link #setResourceSuffixCreator(ResourceSuffixCreator)}.
122 	 */
123 	public void setResource(Resource resource) {
124 		this.resource = resource;
125 	}
126 
127 	public void setSaveState(boolean saveState) {
128 		this.saveState = saveState;
129 	}
130 
131     @Override
132 	public void close() throws ItemStreamException {
133 		resourceIndex = 1;
134 		currentResourceItemCount = 0;
135 		if (opened) {
136 			delegate.close();
137 		}
138 	}
139 
140     @Override
141 	public void open(ExecutionContext executionContext) throws ItemStreamException {
142 		resourceIndex = executionContext.getInt(getKey(RESOURCE_INDEX_KEY), 1);
143 		currentResourceItemCount = executionContext.getInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), 0);
144 
145 		try {
146 			setResourceToDelegate();
147 		}
148 		catch (IOException e) {
149 			throw new ItemStreamException("Couldn't assign resource", e);
150 		}
151 
152 		if (executionContext.containsKey(getKey(CURRENT_RESOURCE_ITEM_COUNT))) {
153 			// It's a restart
154 			delegate.open(executionContext);
155 		}
156 		else {
157 			opened = false;
158 		}
159 	}
160 
161     @Override
162 	public void update(ExecutionContext executionContext) throws ItemStreamException {
163 		if (saveState) {
164 			if (opened) {
165 				delegate.update(executionContext);
166 			}
167 			executionContext.putInt(getKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount);
168 			executionContext.putInt(getKey(RESOURCE_INDEX_KEY), resourceIndex);
169 		}
170 	}
171 
172 	/**
173 	 * Create output resource (if necessary) and point the delegate to it.
174 	 */
175 	private File setResourceToDelegate() throws IOException {
176 		String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex);
177 		File file = new File(path);
178 		delegate.setResource(new FileSystemResource(file));
179 		return file;
180 	}
181 }