View Javadoc

1   /*
2    * Copyright 2006-2012 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.support.transaction;
17  
18  import java.io.IOException;
19  import java.io.Writer;
20  import java.nio.ByteBuffer;
21  import java.nio.channels.FileChannel;
22  
23  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
24  import org.springframework.transaction.support.TransactionSynchronizationManager;
25  
26  /**
27   * Wrapper for a {@link FileChannel} that delays actually writing to or closing the
28   * buffer if a transaction is active. If a transaction is detected on the call
29   * to {@link #write(String)} the parameter is buffered and passed on to the
30   * underlying writer only when the transaction is committed.
31   * 
32   * @author Dave Syer
33   * @author Michael Minella
34   * 
35   */
36  public class TransactionAwareBufferedWriter extends Writer {
37  
38  	private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY";
39  
40  	private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY";
41  
42  	private final String bufferKey;
43  
44  	private final String closeKey;
45  
46  	private FileChannel channel;
47  
48  	private final Runnable closeCallback;
49  	
50  	// default encoding for writing to output files - set to UTF-8.
51  	private static final String DEFAULT_CHARSET = "UTF-8";
52  	
53  	private String encoding = DEFAULT_CHARSET;
54  	
55  	/**
56  	 * Create a new instance with the underlying file channel provided, and a callback
57  	 * to execute on close. The callback should clean up related resources like
58  	 * output streams or channels.
59  	 * 
60  	 * @param channel channel used to do the actuall file IO
61  	 * @param closeCallback callback to execute on close
62  	 */
63  	public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) {
64  		super();
65  		this.channel = channel;
66  		this.closeCallback = closeCallback;
67  		this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
68  		this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode();
69  	}
70  
71  	public void setEncoding(String encoding) {
72  		this.encoding = encoding;
73  	}
74  
75  	/**
76  	 * @return
77  	 */
78  	private StringBuffer getCurrentBuffer() {
79  
80  		if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
81  
82  			TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer());
83  
84  			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
85  				@Override
86  				public void afterCompletion(int status) {
87  					clear();
88  				}
89  				
90  				@Override
91  				public void beforeCommit(boolean readOnly) {
92  					try {
93  						if(!readOnly) {
94  							complete();
95  						}
96  					}
97  					catch (IOException e) {
98  						throw new FlushFailedException("Could not write to output buffer", e);
99  					}
100 				}
101 
102 				private void complete() throws IOException {
103 					StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
104 					if (buffer != null) {
105 						String string = buffer.toString();
106 						byte[] bytes = string.getBytes(encoding);
107 						int bufferLength = bytes.length;
108 						ByteBuffer bb = ByteBuffer.wrap(bytes);
109 						int bytesWritten = channel.write(bb);
110 						if(bytesWritten != bufferLength) {
111 							throw new IOException("All bytes to be written were not successfully written");
112 						}
113 						if (TransactionSynchronizationManager.hasResource(closeKey)) {
114 							closeCallback.run();
115 						}
116 					}
117 				}
118 
119 				private void clear() {
120 					if (TransactionSynchronizationManager.hasResource(bufferKey)) {
121 						TransactionSynchronizationManager.unbindResource(bufferKey);
122 					}
123 					if (TransactionSynchronizationManager.hasResource(closeKey)) {
124 						TransactionSynchronizationManager.unbindResource(closeKey);
125 					}
126 				}
127 
128 			});
129 
130 		}
131 
132 		return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
133 
134 	}
135 
136 	/**
137 	 * Convenience method for clients to determine if there is any unflushed
138 	 * data.
139 	 * 
140 	 * @return the current size of unflushed buffered data
141 	 */
142 	public long getBufferSize() {
143 		if (!transactionActive()) {
144 			return 0L;
145 		}
146 		return getCurrentBuffer().length();
147 	}
148 
149 	/**
150 	 * @return
151 	 */
152 	private boolean transactionActive() {
153 		return TransactionSynchronizationManager.isActualTransactionActive();
154 	}
155 
156 	/*
157 	 * (non-Javadoc)
158 	 * 
159 	 * @see java.io.Writer#close()
160 	 */
161 	@Override
162 	public void close() throws IOException {
163 		if (transactionActive()) {
164 			if (getCurrentBuffer().length() > 0) {
165 				TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
166 			}
167 			return;
168 		}
169 		closeCallback.run();
170 	}
171 
172 	/*
173 	 * (non-Javadoc)
174 	 * 
175 	 * @see java.io.Writer#flush()
176 	 */
177 	@Override
178 	public void flush() throws IOException {
179 		if (!transactionActive()) {
180 			channel.force(false);
181 		}
182 	}
183 
184 	/*
185 	 * (non-Javadoc)
186 	 * 
187 	 * @see java.io.Writer#write(char[], int, int)
188 	 */
189 	@Override
190 	public void write(char[] cbuf, int off, int len) throws IOException {
191 
192 		if (!transactionActive()) {
193 			char [] subArray = new char[len];
194 			System.arraycopy(cbuf, off, subArray, 0, len);
195 			byte[] bytes = new String(subArray).getBytes(encoding);
196 			int length = bytes.length;
197 			ByteBuffer bb = ByteBuffer.wrap(bytes);
198 			int bytesWritten = channel.write(bb);
199 			if(bytesWritten != length) {
200 				throw new IOException("Unable to write all data.  Bytes to write: " + len + ".  Bytes written: " + bytesWritten);
201 			}
202 			return;
203 		}
204 
205 		StringBuffer buffer = getCurrentBuffer();
206 		buffer.append(cbuf, off, len);
207 	}
208 }