1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.support.transaction;
17
18 import java.io.IOException;
19 import java.io.Writer;
20
21 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
22 import org.springframework.transaction.support.TransactionSynchronizationManager;
23
24
25
26
27
28
29
30
31
32
33 public class TransactionAwareBufferedWriter extends Writer {
34
35 private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY";
36
37 private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY";
38
39 private final String bufferKey;
40
41 private final String closeKey;
42
43 private Writer writer;
44
45 private final Runnable closeCallback;
46
47
48
49
50
51
52
53
54
55 public TransactionAwareBufferedWriter(Writer writer, Runnable closeCallback) {
56 super();
57 this.writer = writer;
58 this.closeCallback = closeCallback;
59 this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
60 this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode();
61 }
62
63
64
65
66 private StringBuffer getCurrentBuffer() {
67
68 if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
69
70 TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer());
71
72 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
73 @Override
74 public void afterCompletion(int status) {
75 try {
76 if (status == STATUS_COMMITTED) {
77 complete();
78 }
79 }
80 catch (IOException e) {
81 throw new FlushFailedException("Could not write to output buffer", e);
82 }
83 finally {
84 clear();
85 }
86 }
87
88 private void complete() throws IOException {
89 StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
90 if (buffer != null) {
91 writer.write(buffer.toString());
92 writer.flush();
93 if (TransactionSynchronizationManager.hasResource(closeKey)) {
94 writer.close();
95 closeCallback.run();
96 }
97 }
98 }
99
100 private void clear() {
101 if (TransactionSynchronizationManager.hasResource(bufferKey)) {
102 TransactionSynchronizationManager.unbindResource(bufferKey);
103 }
104 if (TransactionSynchronizationManager.hasResource(closeKey)) {
105 TransactionSynchronizationManager.unbindResource(closeKey);
106 }
107 }
108
109 });
110
111 }
112
113 return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
114
115 }
116
117
118
119
120
121
122
123 public long getBufferSize() {
124 if (!transactionActive()) {
125 return 0L;
126 }
127 return getCurrentBuffer().length();
128 }
129
130
131
132
133 private boolean transactionActive() {
134 return TransactionSynchronizationManager.isActualTransactionActive();
135 }
136
137
138
139
140
141
142 @Override
143 public void close() throws IOException {
144 if (transactionActive()) {
145 if (getCurrentBuffer().length() > 0) {
146 TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
147 }
148 return;
149 }
150 writer.close();
151 closeCallback.run();
152 }
153
154
155
156
157
158
159 @Override
160 public void flush() throws IOException {
161 if (!transactionActive()) {
162 writer.flush();
163 }
164 }
165
166
167
168
169
170
171 @Override
172 public void write(char[] cbuf, int off, int len) throws IOException {
173
174 if (!transactionActive()) {
175 writer.write(cbuf, off, len);
176 return;
177 }
178
179 StringBuffer buffer = getCurrentBuffer();
180 buffer.append(cbuf, off, len);
181
182 }
183
184 }