View Javadoc

1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.item.support;
17  
18  import java.util.Map.Entry;
19  
20  import org.springframework.batch.item.ExecutionContext;
21  import org.springframework.batch.item.ItemReader;
22  import org.springframework.batch.item.ItemStream;
23  import org.springframework.batch.item.ItemStreamException;
24  import org.springframework.batch.item.ItemStreamReader;
25  import org.springframework.batch.item.ParseException;
26  import org.springframework.batch.item.PeekableItemReader;
27  import org.springframework.batch.item.UnexpectedInputException;
28  
29  /**
30   * <p>
31   * A {@link PeekableItemReader} that allows the user to peek one item ahead.
32   * Repeated calls to {@link #peek()} will return the same item, and this will be
33   * the next item returned from {@link #read()}.
34   * </p>
35   * 
36   * <p>
37   * Intentionally not thread safe: it wouldn't be possible to honour the peek in
38   * multiple threads because only one of the threads that peeked would get that
39   * item in the next call to read.
40   * </p>
41   * 
42   * @author Dave Syer
43   * 
44   */
45  public class SingleItemPeekableItemReader<T> implements ItemStreamReader<T>, PeekableItemReader<T> {
46  
47  	private ItemReader<T> delegate;
48  
49  	private T next;
50  
51  	private ExecutionContext executionContext = new ExecutionContext();
52  
53  	/**
54  	 * The item reader to use as a delegate. Items are read from the delegate
55  	 * and passed to the caller in {@link #read()}.
56  	 * 
57  	 * @param delegate the delegate to set
58  	 */
59  	public void setDelegate(ItemReader<T> delegate) {
60  		this.delegate = delegate;
61  	}
62  
63  	/**
64  	 * Get the next item from the delegate (whether or not it has already been
65  	 * peeked at).
66  	 * 
67  	 * @see ItemReader#read()
68  	 */
69      @Override
70  	public T read() throws Exception, UnexpectedInputException, ParseException {
71  		if (next != null) {
72  			T item = next;
73  			next = null;
74  			// executionContext = new ExecutionContext();
75  			return item;
76  		}
77  		return delegate.read();
78  	}
79  
80  	/**
81  	 * Peek at the next item, ensuring that if the delegate is an
82  	 * {@link ItemStream} the state is stored for the next call to
83  	 * {@link #update(ExecutionContext)}.
84  	 * 
85  	 * @return the next item (or null if there is none).
86  	 * 
87  	 * @see PeekableItemReader#peek()
88  	 */
89      @Override
90  	public T peek() throws Exception, UnexpectedInputException, ParseException {
91  		if (next == null) {
92  			updateDelegate(executionContext);
93  			next = delegate.read();
94  		}
95  		return next;
96  	}
97  
98  	/**
99  	 * If the delegate is an {@link ItemStream}, just pass the call on,
100 	 * otherwise reset the peek cache.
101 	 * 
102 	 * @throws ItemStreamException if there is a problem
103 	 * @see ItemStream#close()
104 	 */
105     @Override
106 	public void close() throws ItemStreamException {
107 		next = null;
108 		if (delegate instanceof ItemStream) {
109 			((ItemStream) delegate).close();
110 		}
111 		executionContext = new ExecutionContext();
112 	}
113 
114 	/**
115 	 * If the delegate is an {@link ItemStream}, just pass the call on,
116 	 * otherwise reset the peek cache.
117 	 * 
118 	 * @param executionContext the current context
119 	 * @throws ItemStreamException if there is a problem
120 	 * @see ItemStream#open(ExecutionContext)
121 	 */
122     @Override
123 	public void open(ExecutionContext executionContext) throws ItemStreamException {
124 		next = null;
125 		if (delegate instanceof ItemStream) {
126 			((ItemStream) delegate).open(executionContext);
127 		}
128 		executionContext = new ExecutionContext();
129 	}
130 
131 	/**
132 	 * If there is a cached peek, then retrieve the execution context state from
133 	 * that point. If there is no peek cached, then call directly to the
134 	 * delegate.
135 	 * 
136 	 * @param executionContext the current context
137 	 * @throws ItemStreamException if there is a problem
138 	 * @see ItemStream#update(ExecutionContext)
139 	 */
140     @Override
141 	public void update(ExecutionContext executionContext) throws ItemStreamException {
142 		if (next != null) {
143 			// Get the last state from the delegate instead of using
144 			// current value.
145 			for (Entry<String, Object> entry : this.executionContext.entrySet()) {
146 				executionContext.put(entry.getKey(), entry.getValue());
147 			}
148 			return;
149 		}
150 		updateDelegate(executionContext);
151 	}
152 
153 	private void updateDelegate(ExecutionContext executionContext) {
154 		if (delegate instanceof ItemStream) {
155 			((ItemStream) delegate).update(executionContext);
156 		}
157 	}
158 
159 }