View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.item;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.springframework.batch.item.ExecutionContext;
21  import org.springframework.batch.item.ItemReader;
22  import org.springframework.batch.item.ItemStream;
23  import org.springframework.batch.item.ItemStreamException;
24  import org.springframework.batch.item.support.CompositeItemStream;
25  
26  /**
27   * Manage the offset data between the last successful commit and updates made to
28   * an input chunk. Only works with single threaded steps because it has to use a
29   * {@link ThreadLocal} to manage the state and co-ordinate between the caller
30   * and the wrapped {@link ItemStream}.
31   *
32   * @author Dave Syer
33   * @since 2.0
34   */
35  public class ChunkMonitor implements ItemStream {
36  
37  	private Log logger = LogFactory.getLog(getClass());
38  
39  	private boolean streamsRegistered = false;
40  
41  	public static class ChunkMonitorData {
42  		public int offset;
43  
44  		public int chunkSize;
45  
46  		public ChunkMonitorData(int offset, int chunkSize) {
47  			this.offset = offset;
48  			this.chunkSize = chunkSize;
49  		}
50  	}
51  
52  	private static final String OFFSET = ChunkMonitor.class.getName() + ".OFFSET";
53  
54  	private CompositeItemStream stream = new CompositeItemStream();
55  
56  	private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
57  
58  	private ItemReader<?> reader;
59  
60  	/**
61  	 * @param stream the stream to set
62  	 */
63  	public void registerItemStream(ItemStream stream) {
64  		streamsRegistered = true;
65  		this.stream.register(stream);
66  	}
67  
68  	/**
69  	 * @param reader the reader to set
70  	 */
71  	public void setItemReader(ItemReader<?> reader) {
72  		this.reader = reader;
73  	}
74  
75  	public void incrementOffset() {
76  		ChunkMonitorData data = getData();
77  		data.offset ++;
78  		if (data.offset >= data.chunkSize) {
79  			resetOffset();
80  		}
81  	}
82  
83  	public int getOffset() {
84  		return getData().offset;
85  	}
86  
87  	public void resetOffset() {
88  		getData().offset = 0;
89  	}
90  
91  	public void setChunkSize(int chunkSize) {
92  		getData().chunkSize = chunkSize;
93  		resetOffset();
94  	}
95  
96  	@Override
97  	public void close() throws ItemStreamException {
98  		holder.set(null);
99  		if (streamsRegistered) {
100 			stream.close();
101 		}
102 	}
103 
104 	@Override
105 	public void open(ExecutionContext executionContext) throws ItemStreamException {
106 		if (streamsRegistered) {
107 			stream.open(executionContext);
108 			ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(OFFSET, 0), 0);
109 			holder.set(data);
110 			if (reader == null) {
111 				logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
112 				return;
113 			}
114 			for (int i = 0; i < data.offset; i++) {
115 				try {
116 					reader.read();
117 				}
118 				catch (Exception e) {
119 					throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
120 				}
121 			}
122 		}
123 	}
124 
125 	@Override
126 	public void update(ExecutionContext executionContext) throws ItemStreamException {
127 		if (streamsRegistered) {
128 			ChunkMonitorData data = getData();
129 			if (data.offset == 0) {
130 				// Only call the underlying update method if we are on a chunk
131 				// boundary
132 				stream.update(executionContext);
133 			}
134 			else {
135 				executionContext.putInt(OFFSET, data.offset);
136 			}
137 		}
138 	}
139 
140 	private ChunkMonitorData getData() {
141 		ChunkMonitorData data = holder.get();
142 		if (data==null) {
143 			if (streamsRegistered) {
144 				logger.warn("ItemStream was opened in a different thread.  Restart data could be compromised.");
145 			}
146 			data = new ChunkMonitorData(0,0);
147 			holder.set(data);
148 		}
149 		return data;
150 	}
151 
152 }