1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.ws.transport.jms;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.Iterator;
25 import javax.jms.BytesMessage;
26 import javax.jms.Connection;
27 import javax.jms.ConnectionFactory;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.jms.TemporaryQueue;
35 import javax.jms.TextMessage;
36
37 import org.springframework.jms.connection.ConnectionFactoryUtils;
38 import org.springframework.jms.core.MessagePostProcessor;
39 import org.springframework.jms.support.JmsUtils;
40 import org.springframework.util.Assert;
41 import org.springframework.ws.WebServiceMessage;
42 import org.springframework.ws.transport.AbstractSenderConnection;
43 import org.springframework.ws.transport.WebServiceConnection;
44 import org.springframework.ws.transport.jms.support.JmsTransportUtils;
45
46
47
48
49
50
51
52
53 public class JmsSenderConnection extends AbstractSenderConnection {
54
55 private final ConnectionFactory connectionFactory;
56
57 private final Connection connection;
58
59 private final Session session;
60
61 private final Destination requestDestination;
62
63 private Message requestMessage;
64
65 private Destination responseDestination;
66
67 private Message responseMessage;
68
69 private long receiveTimeout;
70
71 private int deliveryMode;
72
73 private long timeToLive;
74
75 private int priority;
76
77 private String textMessageEncoding;
78
79 private MessagePostProcessor postProcessor;
80
81 private boolean sessionTransacted = false;
82
83 private boolean temporaryResponseQueueCreated = false;
84
85
86 protected JmsSenderConnection(ConnectionFactory connectionFactory,
87 Connection connection,
88 Session session,
89 Destination requestDestination,
90 Message requestMessage) throws JMSException {
91 Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
92 Assert.notNull(connection, "'connection' must not be null");
93 Assert.notNull(session, "'session' must not be null");
94 Assert.notNull(requestDestination, "'requestDestination' must not be null");
95 Assert.notNull(requestMessage, "'requestMessage' must not be null");
96 this.connectionFactory = connectionFactory;
97 this.connection = connection;
98 this.session = session;
99 this.requestDestination = requestDestination;
100 this.requestMessage = requestMessage;
101 }
102
103
104 public Message getRequestMessage() {
105 return requestMessage;
106 }
107
108
109
110
111
112 public Message getResponseMessage() {
113 return responseMessage;
114 }
115
116
117
118
119
120 void setResponseDestination(Destination responseDestination) {
121 this.responseDestination = responseDestination;
122 }
123
124 void setTimeToLive(long timeToLive) {
125 this.timeToLive = timeToLive;
126 }
127
128 void setDeliveryMode(int deliveryMode) {
129 this.deliveryMode = deliveryMode;
130 }
131
132 void setPriority(int priority) {
133 this.priority = priority;
134 }
135
136 void setReceiveTimeout(long receiveTimeout) {
137 this.receiveTimeout = receiveTimeout;
138 }
139
140 void setTextMessageEncoding(String textMessageEncoding) {
141 this.textMessageEncoding = textMessageEncoding;
142 }
143
144 void setPostProcessor(MessagePostProcessor postProcessor) {
145 this.postProcessor = postProcessor;
146 }
147
148 void setSessionTransacted(boolean sessionTransacted) {
149 this.sessionTransacted = sessionTransacted;
150 }
151
152
153
154
155
156 public URI getUri() throws URISyntaxException {
157 try {
158 return JmsTransportUtils.toUri(requestDestination);
159 }
160 catch (JMSException ex) {
161 throw new URISyntaxException("", ex.getMessage());
162 }
163 }
164
165
166
167
168
169 public boolean hasError() throws IOException {
170 return false;
171 }
172
173 public String getErrorMessage() throws IOException {
174 return null;
175 }
176
177
178
179
180
181 @Override
182 protected void addRequestHeader(String name, String value) throws IOException {
183 try {
184 JmsTransportUtils.addHeader(requestMessage, name, value);
185 }
186 catch (JMSException ex) {
187 throw new JmsTransportException("Could not set property", ex);
188 }
189 }
190
191 @Override
192 protected OutputStream getRequestOutputStream() throws IOException {
193 if (requestMessage instanceof BytesMessage) {
194 return new BytesMessageOutputStream((BytesMessage) requestMessage);
195 }
196 else if (requestMessage instanceof TextMessage) {
197 return new TextMessageOutputStream((TextMessage) requestMessage, textMessageEncoding);
198 }
199 else {
200 throw new IllegalStateException("Unknown request message type [" + requestMessage + "]");
201 }
202
203 }
204
205 @Override
206 protected void onSendAfterWrite(WebServiceMessage message) throws IOException {
207 MessageProducer messageProducer = null;
208 try {
209 messageProducer = session.createProducer(requestDestination);
210 messageProducer.setDeliveryMode(deliveryMode);
211 messageProducer.setTimeToLive(timeToLive);
212 messageProducer.setPriority(priority);
213 if (responseDestination == null) {
214 responseDestination = session.createTemporaryQueue();
215 temporaryResponseQueueCreated = true;
216 }
217 requestMessage.setJMSReplyTo(responseDestination);
218 if (postProcessor != null) {
219 requestMessage = postProcessor.postProcessMessage(requestMessage);
220 }
221 connection.start();
222 messageProducer.send(requestMessage);
223 if (session.getTransacted() && isSessionLocallyTransacted(session)) {
224 JmsUtils.commitIfNecessary(session);
225 }
226 }
227 catch (JMSException ex) {
228 throw new JmsTransportException(ex);
229 }
230 finally {
231 JmsUtils.closeMessageProducer(messageProducer);
232 }
233 }
234
235
236 private boolean isSessionLocallyTransacted(Session session) {
237 return sessionTransacted && !ConnectionFactoryUtils.isSessionTransactional(session, connectionFactory);
238 }
239
240
241
242
243
244 @Override
245 protected void onReceiveBeforeRead() throws IOException {
246 MessageConsumer messageConsumer = null;
247 try {
248 if (temporaryResponseQueueCreated) {
249 messageConsumer = session.createConsumer(responseDestination);
250 }
251 else {
252 String messageId = requestMessage.getJMSMessageID().replaceAll("'", "''");
253 String messageSelector = "JMSCorrelationID = '" + messageId + "'";
254 messageConsumer = session.createConsumer(responseDestination, messageSelector);
255 }
256 Message message = receiveTimeout >= 0 ? messageConsumer.receive(receiveTimeout) : messageConsumer.receive();
257 if (message instanceof BytesMessage || message instanceof TextMessage) {
258 responseMessage = message;
259 }
260 else if (message != null) {
261 throw new IllegalArgumentException(
262 "Wrong message type: [" + message.getClass() + "]. " +
263 "Only BytesMessages or TextMessages can be handled.");
264 }
265 }
266 catch (JMSException ex) {
267 throw new JmsTransportException(ex);
268 }
269 finally {
270 JmsUtils.closeMessageConsumer(messageConsumer);
271 if (temporaryResponseQueueCreated) {
272 try {
273 ((TemporaryQueue) responseDestination).delete();
274 }
275 catch (JMSException ex) {
276
277 }
278 }
279 }
280 }
281
282 @Override
283 protected boolean hasResponse() throws IOException {
284 return responseMessage != null;
285 }
286
287 @Override
288 protected Iterator<String> getResponseHeaderNames() throws IOException {
289 try {
290 return JmsTransportUtils.getHeaderNames(responseMessage);
291 }
292 catch (JMSException ex) {
293 throw new JmsTransportException("Could not get property names", ex);
294 }
295 }
296
297 @Override
298 protected Iterator<String> getResponseHeaders(String name) throws IOException {
299 try {
300 return JmsTransportUtils.getHeaders(responseMessage, name);
301 }
302 catch (JMSException ex) {
303 throw new JmsTransportException("Could not get property value", ex);
304 }
305 }
306
307 @Override
308 protected InputStream getResponseInputStream() throws IOException {
309 if (responseMessage instanceof BytesMessage) {
310 return new BytesMessageInputStream((BytesMessage) responseMessage);
311 }
312 else if (responseMessage instanceof TextMessage) {
313 return new TextMessageInputStream((TextMessage) responseMessage, textMessageEncoding);
314 }
315 else {
316 throw new IllegalStateException("Unknown response message type [" + responseMessage + "]");
317 }
318
319
320 }
321
322 @Override
323 protected void onClose() throws IOException {
324 JmsUtils.closeSession(session);
325 ConnectionFactoryUtils.releaseConnection(connection, connectionFactory, true);
326 }
327 }