EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.support.transaction]

COVERAGE SUMMARY FOR SOURCE FILE [TransactionAwareBufferedWriter.java]

nameclass, %method, %block, %line, %
TransactionAwareBufferedWriter.java100% (2/2)100% (21/21)90%  (295/326)96%  (71/74)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TransactionAwareBufferedWriter100% (1/1)100% (16/16)86%  (196/227)94%  (44/47)
getBufferSize (): long 100% (1/1)48%  (14/29)60%  (3/5)
write (char [], int, int): void 100% (1/1)74%  (45/61)92%  (12/13)
<static initializer> 100% (1/1)100% (21/21)100% (2/2)
TransactionAwareBufferedWriter (FileChannel, Runnable): void 100% (1/1)100% (41/41)100% (8/8)
access$000 (TransactionAwareBufferedWriter): String 100% (1/1)100% (3/3)100% (1/1)
access$100 (TransactionAwareBufferedWriter): String 100% (1/1)100% (3/3)100% (1/1)
access$200 (TransactionAwareBufferedWriter): FileChannel 100% (1/1)100% (3/3)100% (1/1)
access$300 (TransactionAwareBufferedWriter): boolean 100% (1/1)100% (3/3)100% (1/1)
access$400 (TransactionAwareBufferedWriter): String 100% (1/1)100% (3/3)100% (1/1)
access$500 (TransactionAwareBufferedWriter): Runnable 100% (1/1)100% (3/3)100% (1/1)
close (): void 100% (1/1)100% (16/16)100% (6/6)
flush (): void 100% (1/1)100% (11/11)100% (3/3)
getCurrentBuffer (): StringBuffer 100% (1/1)100% (20/20)100% (4/4)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
setForceSync (boolean): void 100% (1/1)100% (4/4)100% (2/2)
transactionActive (): boolean 100% (1/1)100% (2/2)100% (1/1)
     
class TransactionAwareBufferedWriter$1100% (1/1)100% (5/5)100% (99/99)100% (28/28)
TransactionAwareBufferedWriter$1 (TransactionAwareBufferedWriter): void 100% (1/1)100% (6/6)100% (1/1)
afterCompletion (int): void 100% (1/1)100% (3/3)100% (2/2)
beforeCommit (boolean): void 100% (1/1)100% (13/13)100% (6/6)
clear (): void 100% (1/1)100% (21/21)100% (5/5)
complete (): void 100% (1/1)100% (56/56)100% (14/14)

1/*
2 * Copyright 2006-2012 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.support.transaction;
17 
18import java.io.IOException;
19import java.io.UnsupportedEncodingException;
20import java.io.Writer;
21import java.nio.ByteBuffer;
22import java.nio.channels.FileChannel;
23 
24import org.springframework.batch.item.WriteFailedException;
25import org.springframework.transaction.support.TransactionSynchronizationAdapter;
26import org.springframework.transaction.support.TransactionSynchronizationManager;
27 
28/**
29 * Wrapper for a {@link FileChannel} that delays actually writing to or closing the
30 * buffer if a transaction is active. If a transaction is detected on the call
31 * to {@link #write(String)} the parameter is buffered and passed on to the
32 * underlying writer only when the transaction is committed.
33 * 
34 * @author Dave Syer
35 * @author Michael Minella
36 * 
37 */
38public class TransactionAwareBufferedWriter extends Writer {
39 
40        private static final String BUFFER_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".BUFFER_KEY";
41 
42        private static final String CLOSE_KEY_PREFIX = TransactionAwareBufferedWriter.class.getName() + ".CLOSE_KEY";
43 
44        private final String bufferKey;
45 
46        private final String closeKey;
47 
48        private FileChannel channel;
49 
50        private final Runnable closeCallback;
51 
52        // default encoding for writing to output files - set to UTF-8.
53        private static final String DEFAULT_CHARSET = "UTF-8";
54 
55        private String encoding = DEFAULT_CHARSET;
56 
57        private boolean forceSync = false;
58 
59        /**
60         * Create a new instance with the underlying file channel provided, and a callback
61         * to execute on close. The callback should clean up related resources like
62         * output streams or channels.
63         * 
64         * @param channel channel used to do the actuall file IO
65         * @param closeCallback callback to execute on close
66         */
67        public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) {
68                super();
69                this.channel = channel;
70                this.closeCallback = closeCallback;
71                this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
72                this.closeKey = CLOSE_KEY_PREFIX + "." + hashCode();
73        }
74 
75        public void setEncoding(String encoding) {
76                this.encoding = encoding;
77        }
78 
79        /**
80         * Flag to indicate that changes should be force-synced to disk on flush.
81         * Defaults to false, which means that even with a local disk changes could
82         * be lost if the OS crashes in between a write and a cache flush. Setting
83         * to true may result in slower performance for usage patterns involving
84         * many frequent writes.
85         * 
86         * @param forceSync the flag value to set
87         */
88        public void setForceSync(boolean forceSync) {
89                this.forceSync = forceSync;
90        }
91 
92        /**
93         * @return
94         */
95        private StringBuffer getCurrentBuffer() {
96 
97                if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
98 
99                        TransactionSynchronizationManager.bindResource(bufferKey, new StringBuffer());
100 
101                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
102                                @Override
103                                public void afterCompletion(int status) {
104                                        clear();
105                                }
106 
107                                @Override
108                                public void beforeCommit(boolean readOnly) {
109                                        try {
110                                                if(!readOnly) {
111                                                        complete();
112                                                }
113                                        }
114                                        catch (IOException e) {
115                                                throw new FlushFailedException("Could not write to output buffer", e);
116                                        }
117                                }
118 
119                                private void complete() throws IOException {
120                                        StringBuffer buffer = (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
121                                        if (buffer != null) {
122                                                String string = buffer.toString();
123                                                byte[] bytes = string.getBytes(encoding);
124                                                int bufferLength = bytes.length;
125                                                ByteBuffer bb = ByteBuffer.wrap(bytes);
126                                                int bytesWritten = channel.write(bb);
127                                                if(bytesWritten != bufferLength) {
128                                                        throw new IOException("All bytes to be written were not successfully written");
129                                                }
130                                                if (forceSync) {
131                                                        channel.force(false);
132                                                }
133                                                if (TransactionSynchronizationManager.hasResource(closeKey)) {
134                                                        closeCallback.run();
135                                                }
136                                        }
137                                }
138 
139                                private void clear() {
140                                        if (TransactionSynchronizationManager.hasResource(bufferKey)) {
141                                                TransactionSynchronizationManager.unbindResource(bufferKey);
142                                        }
143                                        if (TransactionSynchronizationManager.hasResource(closeKey)) {
144                                                TransactionSynchronizationManager.unbindResource(closeKey);
145                                        }
146                                }
147 
148                        });
149 
150                }
151 
152                return (StringBuffer) TransactionSynchronizationManager.getResource(bufferKey);
153 
154        }
155 
156        /**
157         * Convenience method for clients to determine if there is any unflushed
158         * data.
159         * 
160         * @return the current size (in bytes) of unflushed buffered data
161         */
162        public long getBufferSize() {
163                if (!transactionActive()) {
164                        return 0L;
165                }
166                try {
167                        return getCurrentBuffer().toString().getBytes(encoding).length;
168                } catch (UnsupportedEncodingException e) {
169                        throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e);
170                }
171        }
172 
173        /**
174         * @return
175         */
176        private boolean transactionActive() {
177                return TransactionSynchronizationManager.isActualTransactionActive();
178        }
179 
180        /*
181         * (non-Javadoc)
182         * 
183         * @see java.io.Writer#close()
184         */
185        @Override
186        public void close() throws IOException {
187                if (transactionActive()) {
188                        if (getCurrentBuffer().length() > 0) {
189                                TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
190                        }
191                        return;
192                }
193                closeCallback.run();
194        }
195 
196        /*
197         * (non-Javadoc)
198         * 
199         * @see java.io.Writer#flush()
200         */
201        @Override
202        public void flush() throws IOException {
203                if (!transactionActive() && forceSync) {
204                        channel.force(false);
205                }
206        }
207 
208        /*
209         * (non-Javadoc)
210         * 
211         * @see java.io.Writer#write(char[], int, int)
212         */
213        @Override
214        public void write(char[] cbuf, int off, int len) throws IOException {
215 
216                if (!transactionActive()) {
217                        char [] subArray = new char[len];
218                        System.arraycopy(cbuf, off, subArray, 0, len);
219                        byte[] bytes = new String(subArray).getBytes(encoding);
220                        int length = bytes.length;
221                        ByteBuffer bb = ByteBuffer.wrap(bytes);
222                        int bytesWritten = channel.write(bb);
223                        if(bytesWritten != length) {
224                                throw new IOException("Unable to write all data.  Bytes to write: " + len + ".  Bytes written: " + bytesWritten);
225                        }
226                        return;
227                }
228 
229                StringBuffer buffer = getCurrentBuffer();
230                buffer.append(cbuf, off, len);
231        }
232}

[all classes][org.springframework.batch.support.transaction]
EMMA 2.0.5312 (C) Vladimir Roubtsov