View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.support.transaction;
17  
18  import java.io.IOException;
19  import java.io.Writer;
20  
21  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
22  import org.springframework.transaction.support.TransactionSynchronizationManager;
23  
24  /**
25   * Wrapper for a {@link Writer} that delays actually writing to or closing the
26   * buffer if a transaction is active. If a transaction is detected on the call
27   * to {@link #write(String)} the parameter is buffered and passed on to the
28   * underlying writer only when the transaction is committed.
29   * 
30   * @author Dave Syer
31   * 
32   */
33  public class TransactionAwareBufferedWriter extends Writer {
34  
35  	private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY";
36  
37  	private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY";
38  
39  	private final String bufferKey;
40  
41  	private final String closeKey;
42  
43  	private Writer writer;
44  
45  	private final Runnable closeCallback;
46  
47  	/**
48  	 * Create a new instance with the underlying writer provided, and a callback
49  	 * to execute on close. The callback should clean up related resources like
50  	 * output streams or channels.
51  	 * 
52  	 * @param writer actually writes to output
53  	 * @param closeCallback callback to execute on close
54  	 */
55  	public TransactionAwareBufferedWriter(Writer writer, Runnable closeCallback) {
56  		super();
57  		this.writer = writer;
58  		this.closeCallback = closeCallback;
59  		this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
60  		this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode();
61  	}
62  
63  	/**
64  	 * @return
65  	 */
66  	private StringBuffer getCurrentBuffer() {
67  
68  		if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
69  
70  			TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer());
71  
72  			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
73  				@Override
74  				public void afterCompletion(int status) {
75  					try {
76  						if (status == STATUS_COMMITTED) {
77  							complete();
78  						}
79  					}
80  					catch (IOException e) {
81  						throw new FlushFailedException("Could not write to output buffer", e);
82  					}
83  					finally {
84  						clear();
85  					}
86  				}
87  
88  				private void complete() throws IOException {
89  					StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
90  					if (buffer != null) {
91  						writer.write(buffer.toString());
92  						writer.flush();
93  						if (TransactionSynchronizationManager.hasResource(closeKey)) {
94  							writer.close();
95  							closeCallback.run();
96  						}
97  					}
98  				}
99  
100 				private void clear() {
101 					if (TransactionSynchronizationManager.hasResource(bufferKey)) {
102 						TransactionSynchronizationManager.unbindResource(bufferKey);
103 					}
104 					if (TransactionSynchronizationManager.hasResource(closeKey)) {
105 						TransactionSynchronizationManager.unbindResource(closeKey);
106 					}
107 				}
108 
109 			});
110 
111 		}
112 
113 		return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
114 
115 	}
116 
117 	/**
118 	 * Convenience method for clients to determine if there is any unflushed
119 	 * data.
120 	 * 
121 	 * @return the current size of unflushed buffered data
122 	 */
123 	public long getBufferSize() {
124 		if (!transactionActive()) {
125 			return 0L;
126 		}
127 		return getCurrentBuffer().length();
128 	}
129 
130 	/**
131 	 * @return
132 	 */
133 	private boolean transactionActive() {
134 		return TransactionSynchronizationManager.isActualTransactionActive();
135 	}
136 
137 	/*
138 	 * (non-Javadoc)
139 	 * 
140 	 * @see java.io.Writer#close()
141 	 */
142 	@Override
143 	public void close() throws IOException {
144 		if (transactionActive()) {
145 			if (getCurrentBuffer().length() > 0) {
146 				TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
147 			}
148 			return;
149 		}
150 		writer.close();
151 		closeCallback.run();
152 	}
153 
154 	/*
155 	 * (non-Javadoc)
156 	 * 
157 	 * @see java.io.Writer#flush()
158 	 */
159 	@Override
160 	public void flush() throws IOException {
161 		if (!transactionActive()) {
162 			writer.flush();
163 		}
164 	}
165 
166 	/*
167 	 * (non-Javadoc)
168 	 * 
169 	 * @see java.io.Writer#write(char[], int, int)
170 	 */
171 	@Override
172 	public void write(char[] cbuf, int off, int len) throws IOException {
173 
174 		if (!transactionActive()) {
175 			writer.write(cbuf, off, len);
176 			return;
177 		}
178 
179 		StringBuffer buffer = getCurrentBuffer();
180 		buffer.append(cbuf, off, len);
181 
182 	}
183 
184 }