1 | /* |
2 | * Copyright 2006-2012 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.support.transaction; |
17 | |
18 | import java.io.IOException; |
19 | import java.io.UnsupportedEncodingException; |
20 | import java.io.Writer; |
21 | import java.nio.ByteBuffer; |
22 | import java.nio.channels.FileChannel; |
23 | |
24 | import org.springframework.batch.item.WriteFailedException; |
25 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
26 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
27 | |
28 | /** |
29 | * Wrapper for a {@link FileChannel} that delays actually writing to or closing the |
30 | * buffer if a transaction is active. If a transaction is detected on the call |
31 | * to {@link #write(String)} the parameter is buffered and passed on to the |
32 | * underlying writer only when the transaction is committed. |
33 | * |
34 | * @author Dave Syer |
35 | * @author Michael Minella |
36 | * |
37 | */ |
38 | public class TransactionAwareBufferedWriter extends Writer { |
39 | |
40 | private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY"; |
41 | |
42 | private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY"; |
43 | |
44 | private final String bufferKey; |
45 | |
46 | private final String closeKey; |
47 | |
48 | private FileChannel channel; |
49 | |
50 | private final Runnable closeCallback; |
51 | |
52 | // default encoding for writing to output files - set to UTF-8. |
53 | private static final String DEFAULT_CHARSET = "UTF-8"; |
54 | |
55 | private String encoding = DEFAULT_CHARSET; |
56 | |
57 | private boolean forceSync = false; |
58 | |
59 | /** |
60 | * Create a new instance with the underlying file channel provided, and a callback |
61 | * to execute on close. The callback should clean up related resources like |
62 | * output streams or channels. |
63 | * |
64 | * @param channel channel used to do the actuall file IO |
65 | * @param closeCallback callback to execute on close |
66 | */ |
67 | public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) { |
68 | super(); |
69 | this.channel = channel; |
70 | this.closeCallback = closeCallback; |
71 | this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode(); |
72 | this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode(); |
73 | } |
74 | |
75 | public void setEncoding(String encoding) { |
76 | this.encoding = encoding; |
77 | } |
78 | |
79 | /** |
80 | * Flag to indicate that changes should be force-synced to disk on flush. |
81 | * Defaults to false, which means that even with a local disk changes could |
82 | * be lost if the OS crashes in between a write and a cache flush. Setting |
83 | * to true may result in slower performance for usage patterns involving |
84 | * many frequent writes. |
85 | * |
86 | * @param forceSync the flag value to set |
87 | */ |
88 | public void setForceSync(boolean forceSync) { |
89 | this.forceSync = forceSync; |
90 | } |
91 | |
92 | /** |
93 | * @return |
94 | */ |
95 | private StringBuffer getCurrentBuffer() { |
96 | |
97 | if (!TransactionSynchronizationManager.hasResource(bufferKey)) { |
98 | |
99 | TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer()); |
100 | |
101 | TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { |
102 | @Override |
103 | public void afterCompletion(int status) { |
104 | clear(); |
105 | } |
106 | |
107 | @Override |
108 | public void beforeCommit(boolean readOnly) { |
109 | try { |
110 | if(!readOnly) { |
111 | complete(); |
112 | } |
113 | } |
114 | catch (IOException e) { |
115 | throw new FlushFailedException("Could not write to output buffer", e); |
116 | } |
117 | } |
118 | |
119 | private void complete() throws IOException { |
120 | StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey); |
121 | if (buffer != null) { |
122 | String string = buffer.toString(); |
123 | byte[] bytes = string.getBytes(encoding); |
124 | int bufferLength = bytes.length; |
125 | ByteBuffer bb = ByteBuffer.wrap(bytes); |
126 | int bytesWritten = channel.write(bb); |
127 | if(bytesWritten != bufferLength) { |
128 | throw new IOException("All bytes to be written were not successfully written"); |
129 | } |
130 | if (forceSync) { |
131 | channel.force(false); |
132 | } |
133 | if (TransactionSynchronizationManager.hasResource(closeKey)) { |
134 | closeCallback.run(); |
135 | } |
136 | } |
137 | } |
138 | |
139 | private void clear() { |
140 | if (TransactionSynchronizationManager.hasResource(bufferKey)) { |
141 | TransactionSynchronizationManager.unbindResource(bufferKey); |
142 | } |
143 | if (TransactionSynchronizationManager.hasResource(closeKey)) { |
144 | TransactionSynchronizationManager.unbindResource(closeKey); |
145 | } |
146 | } |
147 | |
148 | }); |
149 | |
150 | } |
151 | |
152 | return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey); |
153 | |
154 | } |
155 | |
156 | /** |
157 | * Convenience method for clients to determine if there is any unflushed |
158 | * data. |
159 | * |
160 | * @return the current size (in bytes) of unflushed buffered data |
161 | */ |
162 | public long getBufferSize() { |
163 | if (!transactionActive()) { |
164 | return 0L; |
165 | } |
166 | try { |
167 | return getCurrentBuffer().toString().getBytes(encoding).length; |
168 | } catch (UnsupportedEncodingException e) { |
169 | throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e); |
170 | } |
171 | } |
172 | |
173 | /** |
174 | * @return |
175 | */ |
176 | private boolean transactionActive() { |
177 | return TransactionSynchronizationManager.isActualTransactionActive(); |
178 | } |
179 | |
180 | /* |
181 | * (non-Javadoc) |
182 | * |
183 | * @see java.io.Writer#close() |
184 | */ |
185 | @Override |
186 | public void close() throws IOException { |
187 | if (transactionActive()) { |
188 | if (getCurrentBuffer().length() > 0) { |
189 | TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE); |
190 | } |
191 | return; |
192 | } |
193 | closeCallback.run(); |
194 | } |
195 | |
196 | /* |
197 | * (non-Javadoc) |
198 | * |
199 | * @see java.io.Writer#flush() |
200 | */ |
201 | @Override |
202 | public void flush() throws IOException { |
203 | if (!transactionActive() && forceSync) { |
204 | channel.force(false); |
205 | } |
206 | } |
207 | |
208 | /* |
209 | * (non-Javadoc) |
210 | * |
211 | * @see java.io.Writer#write(char[], int, int) |
212 | */ |
213 | @Override |
214 | public void write(char[] cbuf, int off, int len) throws IOException { |
215 | |
216 | if (!transactionActive()) { |
217 | char [] subArray = new char[len]; |
218 | System.arraycopy(cbuf, off, subArray, 0, len); |
219 | byte[] bytes = new String(subArray).getBytes(encoding); |
220 | int length = bytes.length; |
221 | ByteBuffer bb = ByteBuffer.wrap(bytes); |
222 | int bytesWritten = channel.write(bb); |
223 | if(bytesWritten != length) { |
224 | throw new IOException("Unable to write all data. Bytes to write: " + len + ". Bytes written: " + bytesWritten); |
225 | } |
226 | return; |
227 | } |
228 | |
229 | StringBuffer buffer = getCurrentBuffer(); |
230 | buffer.append(cbuf, off, len); |
231 | } |
232 | } |