1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.support.transaction; |
17 | |
18 | import java.io.IOException; |
19 | import java.io.Writer; |
20 | |
21 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
22 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
23 | |
24 | /** |
25 | * Wrapper for a {@link Writer} that delays actually writing to or closing the |
26 | * buffer if a transaction is active. If a transaction is detected on the call |
27 | * to {@link #write(String)} the parameter is buffered and passed on to the |
28 | * underlying writer only when the transaction is committed. |
29 | * |
30 | * @author Dave Syer |
31 | * |
32 | */ |
33 | public class TransactionAwareBufferedWriter extends Writer { |
34 | |
35 | private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY"; |
36 | |
37 | private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY"; |
38 | |
39 | private final String bufferKey; |
40 | |
41 | private final String closeKey; |
42 | |
43 | private Writer writer; |
44 | |
45 | private final Runnable closeCallback; |
46 | |
47 | /** |
48 | * Create a new instance with the underlying writer provided, and a callback |
49 | * to execute on close. The callback should clean up related resources like |
50 | * output streams or channels. |
51 | * |
52 | * @param writer actually writes to output |
53 | * @param closeCallback callback to execute on close |
54 | */ |
55 | public TransactionAwareBufferedWriter(Writer writer, Runnable closeCallback) { |
56 | super(); |
57 | this.writer = writer; |
58 | this.closeCallback = closeCallback; |
59 | this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode(); |
60 | this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode(); |
61 | } |
62 | |
63 | /** |
64 | * @return |
65 | */ |
66 | private StringBuffer getCurrentBuffer() { |
67 | |
68 | if (!TransactionSynchronizationManager.hasResource(bufferKey)) { |
69 | |
70 | TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer()); |
71 | |
72 | TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { |
73 | @Override |
74 | public void afterCompletion(int status) { |
75 | try { |
76 | if (status == STATUS_COMMITTED) { |
77 | complete(); |
78 | } |
79 | } |
80 | catch (IOException e) { |
81 | throw new FlushFailedException("Could not write to output buffer", e); |
82 | } |
83 | finally { |
84 | clear(); |
85 | } |
86 | } |
87 | |
88 | private void complete() throws IOException { |
89 | StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey); |
90 | if (buffer != null) { |
91 | writer.write(buffer.toString()); |
92 | writer.flush(); |
93 | if (TransactionSynchronizationManager.hasResource(closeKey)) { |
94 | writer.close(); |
95 | closeCallback.run(); |
96 | } |
97 | } |
98 | } |
99 | |
100 | private void clear() { |
101 | if (TransactionSynchronizationManager.hasResource(bufferKey)) { |
102 | TransactionSynchronizationManager.unbindResource(bufferKey); |
103 | } |
104 | if (TransactionSynchronizationManager.hasResource(closeKey)) { |
105 | TransactionSynchronizationManager.unbindResource(closeKey); |
106 | } |
107 | } |
108 | |
109 | }); |
110 | |
111 | } |
112 | |
113 | return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey); |
114 | |
115 | } |
116 | |
117 | /** |
118 | * Convenience method for clients to determine if there is any unflushed |
119 | * data. |
120 | * |
121 | * @return the current size of unflushed buffered data |
122 | */ |
123 | public long getBufferSize() { |
124 | if (!transactionActive()) { |
125 | return 0L; |
126 | } |
127 | return getCurrentBuffer().length(); |
128 | } |
129 | |
130 | /** |
131 | * @return |
132 | */ |
133 | private boolean transactionActive() { |
134 | return TransactionSynchronizationManager.isActualTransactionActive(); |
135 | } |
136 | |
137 | /* |
138 | * (non-Javadoc) |
139 | * |
140 | * @see java.io.Writer#close() |
141 | */ |
142 | @Override |
143 | public void close() throws IOException { |
144 | if (transactionActive()) { |
145 | if (getCurrentBuffer().length() > 0) { |
146 | TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE); |
147 | } |
148 | return; |
149 | } |
150 | writer.close(); |
151 | closeCallback.run(); |
152 | } |
153 | |
154 | /* |
155 | * (non-Javadoc) |
156 | * |
157 | * @see java.io.Writer#flush() |
158 | */ |
159 | @Override |
160 | public void flush() throws IOException { |
161 | if (!transactionActive()) { |
162 | writer.flush(); |
163 | } |
164 | } |
165 | |
166 | /* |
167 | * (non-Javadoc) |
168 | * |
169 | * @see java.io.Writer#write(char[], int, int) |
170 | */ |
171 | @Override |
172 | public void write(char[] cbuf, int off, int len) throws IOException { |
173 | |
174 | if (!transactionActive()) { |
175 | writer.write(cbuf, off, len); |
176 | return; |
177 | } |
178 | |
179 | StringBuffer buffer = getCurrentBuffer(); |
180 | buffer.append(cbuf, off, len); |
181 | |
182 | } |
183 | |
184 | } |