1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.ws.transport.jms;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.Iterator;
25 import javax.jms.BytesMessage;
26 import javax.jms.Connection;
27 import javax.jms.ConnectionFactory;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.TemporaryQueue;
35 import javax.jms.TemporaryTopic;
36 import javax.jms.TextMessage;
37
38 import org.springframework.jms.connection.ConnectionFactoryUtils;
39 import org.springframework.jms.core.MessagePostProcessor;
40 import org.springframework.jms.support.JmsUtils;
41 import org.springframework.util.Assert;
42 import org.springframework.ws.WebServiceMessage;
43 import org.springframework.ws.transport.AbstractSenderConnection;
44 import org.springframework.ws.transport.WebServiceConnection;
45 import org.springframework.ws.transport.jms.support.JmsTransportUtils;
46
47
48
49
50
51
52
53
54 public class JmsSenderConnection extends AbstractSenderConnection {
55
56 private final ConnectionFactory connectionFactory;
57
58 private final Connection connection;
59
60 private final Session session;
61
62 private final Destination requestDestination;
63
64 private Message requestMessage;
65
66 private Destination responseDestination;
67
68 private Message responseMessage;
69
70 private long receiveTimeout;
71
72 private int deliveryMode;
73
74 private long timeToLive;
75
76 private int priority;
77
78 private String textMessageEncoding;
79
80 private MessagePostProcessor postProcessor;
81
82 private boolean sessionTransacted = false;
83
84
85 protected JmsSenderConnection(ConnectionFactory connectionFactory,
86 Connection connection,
87 Session session,
88 Destination requestDestination,
89 Message requestMessage) throws JMSException {
90 Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
91 Assert.notNull(connection, "'connection' must not be null");
92 Assert.notNull(session, "'session' must not be null");
93 Assert.notNull(requestDestination, "'requestDestination' must not be null");
94 Assert.notNull(requestMessage, "'requestMessage' must not be null");
95 this.connectionFactory = connectionFactory;
96 this.connection = connection;
97 this.session = session;
98 this.requestDestination = requestDestination;
99 this.requestMessage = requestMessage;
100 }
101
102
103 public Message getRequestMessage() {
104 return requestMessage;
105 }
106
107
108
109
110
111 public Message getResponseMessage() {
112 return responseMessage;
113 }
114
115
116
117
118
119 void setResponseDestination(Destination responseDestination) {
120 this.responseDestination = responseDestination;
121 }
122
123 void setTimeToLive(long timeToLive) {
124 this.timeToLive = timeToLive;
125 }
126
127 void setDeliveryMode(int deliveryMode) {
128 this.deliveryMode = deliveryMode;
129 }
130
131 void setPriority(int priority) {
132 this.priority = priority;
133 }
134
135 void setReceiveTimeout(long receiveTimeout) {
136 this.receiveTimeout = receiveTimeout;
137 }
138
139 void setTextMessageEncoding(String textMessageEncoding) {
140 this.textMessageEncoding = textMessageEncoding;
141 }
142
143 void setPostProcessor(MessagePostProcessor postProcessor) {
144 this.postProcessor = postProcessor;
145 }
146
147 void setSessionTransacted(boolean sessionTransacted) {
148 this.sessionTransacted = sessionTransacted;
149 }
150
151
152
153
154
155 public URI getUri() throws URISyntaxException {
156 try {
157 return JmsTransportUtils.toUri(requestDestination);
158 }
159 catch (JMSException ex) {
160 throw new URISyntaxException("", ex.getMessage());
161 }
162 }
163
164
165
166
167
168 public boolean hasError() throws IOException {
169 return false;
170 }
171
172 public String getErrorMessage() throws IOException {
173 return null;
174 }
175
176
177
178
179
180 protected void addRequestHeader(String name, String value) throws IOException {
181 try {
182 JmsTransportUtils.addHeader(requestMessage, name, value);
183 }
184 catch (JMSException ex) {
185 throw new JmsTransportException("Could not set property", ex);
186 }
187 }
188
189 protected OutputStream getRequestOutputStream() throws IOException {
190 if (requestMessage instanceof BytesMessage) {
191 return new BytesMessageOutputStream((BytesMessage) requestMessage);
192 }
193 else if (requestMessage instanceof TextMessage) {
194 return new TextMessageOutputStream((TextMessage) requestMessage, textMessageEncoding);
195 }
196 else {
197 throw new IllegalStateException("Unknown request message type [" + requestMessage + "]");
198 }
199
200 }
201
202 protected void onSendAfterWrite(WebServiceMessage message) throws IOException {
203 MessageProducer messageProducer = null;
204 try {
205 messageProducer = session.createProducer(requestDestination);
206 messageProducer.setDeliveryMode(deliveryMode);
207 messageProducer.setTimeToLive(timeToLive);
208 messageProducer.setPriority(priority);
209 if (responseDestination == null) {
210 responseDestination = session.createTemporaryQueue();
211 }
212 requestMessage.setJMSReplyTo(responseDestination);
213 if (postProcessor != null) {
214 requestMessage = postProcessor.postProcessMessage(requestMessage);
215 }
216 connection.start();
217 messageProducer.send(requestMessage);
218 if (session.getTransacted() && isSessionLocallyTransacted(session)) {
219 JmsUtils.commitIfNecessary(session);
220 }
221 }
222 catch (JMSException ex) {
223 throw new JmsTransportException(ex);
224 }
225 finally {
226 JmsUtils.closeMessageProducer(messageProducer);
227 }
228 }
229
230
231 private boolean isSessionLocallyTransacted(Session session) {
232 return sessionTransacted && !ConnectionFactoryUtils.isSessionTransactional(session, connectionFactory);
233 }
234
235
236
237
238
239 protected void onReceiveBeforeRead() throws IOException {
240 MessageConsumer messageConsumer = null;
241 try {
242 if (responseDestination instanceof TemporaryQueue || responseDestination instanceof TemporaryTopic) {
243 messageConsumer = session.createConsumer(responseDestination);
244 }
245 else {
246 String messageId = requestMessage.getJMSMessageID().replaceAll("'", "''");
247 String messageSelector = "JMSCorrelationID = '" + messageId + "'";
248 messageConsumer = session.createConsumer(responseDestination, messageSelector);
249 }
250 Message message = receiveTimeout >= 0 ? messageConsumer.receive(receiveTimeout) : messageConsumer.receive();
251 if (message instanceof BytesMessage || message instanceof TextMessage) {
252 responseMessage = message;
253 }
254 else if (message != null) {
255 throw new IllegalArgumentException(
256 "Wrong message type: [" + message.getClass() + "]. Only BytesMessages can be handled.");
257 }
258 }
259 catch (JMSException ex) {
260 throw new JmsTransportException(ex);
261 }
262 finally {
263 JmsUtils.closeMessageConsumer(messageConsumer);
264 if (responseDestination instanceof TemporaryQueue) {
265 try {
266 ((TemporaryQueue) responseDestination).delete();
267 }
268 catch (JMSException e) {
269
270 }
271 }
272 else if (responseDestination instanceof TemporaryTopic) {
273 try {
274 ((TemporaryTopic) responseDestination).delete();
275 }
276 catch (JMSException e) {
277
278 }
279 }
280 }
281 }
282
283 protected boolean hasResponse() throws IOException {
284 return responseMessage != null;
285 }
286
287 protected Iterator getResponseHeaderNames() throws IOException {
288 try {
289 return JmsTransportUtils.getHeaderNames(responseMessage);
290 }
291 catch (JMSException ex) {
292 throw new JmsTransportException("Could not get property names", ex);
293 }
294 }
295
296 protected Iterator getResponseHeaders(String name) throws IOException {
297 try {
298 return JmsTransportUtils.getHeaders(responseMessage, name);
299 }
300 catch (JMSException ex) {
301 throw new JmsTransportException("Could not get property value", ex);
302 }
303 }
304
305 protected InputStream getResponseInputStream() throws IOException {
306 if (responseMessage instanceof BytesMessage) {
307 return new BytesMessageInputStream((BytesMessage) responseMessage);
308 }
309 else if (responseMessage instanceof TextMessage) {
310 return new TextMessageInputStream((TextMessage) responseMessage, textMessageEncoding);
311 }
312 else {
313 throw new IllegalStateException("Unknown response message type [" + responseMessage + "]");
314 }
315
316
317 }
318
319 protected void onClose() throws IOException {
320 JmsUtils.closeSession(session);
321 ConnectionFactoryUtils.releaseConnection(connection, connectionFactory, true);
322 }
323 }