View Javadoc

1   /*
2    * Copyright 2005-2012 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.ws.transport.jms;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.OutputStream;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.Iterator;
25  import javax.jms.BytesMessage;
26  import javax.jms.Connection;
27  import javax.jms.ConnectionFactory;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageProducer;
33  import javax.jms.Session;
34  import javax.jms.TemporaryQueue;
35  import javax.jms.TextMessage;
36  
37  import org.springframework.jms.connection.ConnectionFactoryUtils;
38  import org.springframework.jms.core.MessagePostProcessor;
39  import org.springframework.jms.support.JmsUtils;
40  import org.springframework.util.Assert;
41  import org.springframework.ws.WebServiceMessage;
42  import org.springframework.ws.transport.AbstractSenderConnection;
43  import org.springframework.ws.transport.WebServiceConnection;
44  import org.springframework.ws.transport.jms.support.JmsTransportUtils;
45  
46  /**
47   * Implementation of {@link WebServiceConnection} that is used for client-side JMS access. Exposes a {@link
48   * BytesMessage} request and response message.
49   *
50   * @author Arjen Poutsma
51   * @since 1.5.0
52   */
53  public class JmsSenderConnection extends AbstractSenderConnection {
54  
55      private final ConnectionFactory connectionFactory;
56  
57      private final Connection connection;
58  
59      private final Session session;
60  
61      private final Destination requestDestination;
62  
63      private Message requestMessage;
64  
65      private Destination responseDestination;
66  
67      private Message responseMessage;
68  
69      private long receiveTimeout;
70  
71      private int deliveryMode;
72  
73      private long timeToLive;
74  
75      private int priority;
76  
77      private String textMessageEncoding;
78  
79      private MessagePostProcessor postProcessor;
80  
81      private boolean sessionTransacted = false;
82  
83      private boolean temporaryResponseQueueCreated = false;
84  
85      /** Constructs a new JMS connection with the given parameters. */
86      protected JmsSenderConnection(ConnectionFactory connectionFactory,
87                                    Connection connection,
88                                    Session session,
89                                    Destination requestDestination,
90                                    Message requestMessage) throws JMSException {
91          Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
92          Assert.notNull(connection, "'connection' must not be null");
93          Assert.notNull(session, "'session' must not be null");
94          Assert.notNull(requestDestination, "'requestDestination' must not be null");
95          Assert.notNull(requestMessage, "'requestMessage' must not be null");
96          this.connectionFactory = connectionFactory;
97          this.connection = connection;
98          this.session = session;
99          this.requestDestination = requestDestination;
100         this.requestMessage = requestMessage;
101     }
102 
103     /** Returns the request message for this connection. Returns either a {@link BytesMessage} or a {@link TextMessage}. */
104     public Message getRequestMessage() {
105         return requestMessage;
106     }
107 
108     /**
109      * Returns the response message, if any, for this connection. Returns either a {@link BytesMessage} or a {@link
110      * TextMessage}.
111      */
112     public Message getResponseMessage() {
113         return responseMessage;
114     }
115 
116     /*
117      * Package-friendly setters
118      */
119 
120     void setResponseDestination(Destination responseDestination) {
121         this.responseDestination = responseDestination;
122     }
123 
124     void setTimeToLive(long timeToLive) {
125         this.timeToLive = timeToLive;
126     }
127 
128     void setDeliveryMode(int deliveryMode) {
129         this.deliveryMode = deliveryMode;
130     }
131 
132     void setPriority(int priority) {
133         this.priority = priority;
134     }
135 
136     void setReceiveTimeout(long receiveTimeout) {
137         this.receiveTimeout = receiveTimeout;
138     }
139 
140     void setTextMessageEncoding(String textMessageEncoding) {
141         this.textMessageEncoding = textMessageEncoding;
142     }
143 
144     void setPostProcessor(MessagePostProcessor postProcessor) {
145         this.postProcessor = postProcessor;
146     }
147 
148     void setSessionTransacted(boolean sessionTransacted) {
149         this.sessionTransacted = sessionTransacted;
150     }
151 
152     /*
153      * URI
154      */
155 
156     public URI getUri() throws URISyntaxException {
157         try {
158             return JmsTransportUtils.toUri(requestDestination);
159         }
160         catch (JMSException ex) {
161             throw new URISyntaxException("", ex.getMessage());
162         }
163     }
164 
165     /*
166      * Errors
167      */
168 
169     public boolean hasError() throws IOException {
170         return false;
171     }
172 
173     public String getErrorMessage() throws IOException {
174         return null;
175     }
176 
177     /*
178      * Sending
179      */
180 
181     @Override
182     protected void addRequestHeader(String name, String value) throws IOException {
183         try {
184             JmsTransportUtils.addHeader(requestMessage, name, value);
185         }
186         catch (JMSException ex) {
187             throw new JmsTransportException("Could not set property", ex);
188         }
189     }
190 
191     @Override
192     protected OutputStream getRequestOutputStream() throws IOException {
193         if (requestMessage instanceof BytesMessage) {
194             return new BytesMessageOutputStream((BytesMessage) requestMessage);
195         }
196         else if (requestMessage instanceof TextMessage) {
197             return new TextMessageOutputStream((TextMessage) requestMessage, textMessageEncoding);
198         }
199         else {
200             throw new IllegalStateException("Unknown request message type [" + requestMessage + "]");
201         }
202 
203     }
204 
205     @Override
206     protected void onSendAfterWrite(WebServiceMessage message) throws IOException {
207         MessageProducer messageProducer = null;
208         try {
209             messageProducer = session.createProducer(requestDestination);
210             messageProducer.setDeliveryMode(deliveryMode);
211             messageProducer.setTimeToLive(timeToLive);
212             messageProducer.setPriority(priority);
213             if (responseDestination == null) {
214                 responseDestination = session.createTemporaryQueue();
215                 temporaryResponseQueueCreated = true;
216             }
217             requestMessage.setJMSReplyTo(responseDestination);
218             if (postProcessor != null) {
219                 requestMessage = postProcessor.postProcessMessage(requestMessage);
220             }
221             connection.start();
222             messageProducer.send(requestMessage);
223             if (session.getTransacted() && isSessionLocallyTransacted(session)) {
224                 JmsUtils.commitIfNecessary(session);
225             }
226         }
227         catch (JMSException ex) {
228             throw new JmsTransportException(ex);
229         }
230         finally {
231             JmsUtils.closeMessageProducer(messageProducer);
232         }
233     }
234 
235     /** @see org.springframework.jms.core.JmsTemplate#isSessionLocallyTransacted(Session) */
236     private boolean isSessionLocallyTransacted(Session session) {
237         return sessionTransacted && !ConnectionFactoryUtils.isSessionTransactional(session, connectionFactory);
238     }
239 
240     /*
241     * Receiving
242     */
243 
244     @Override
245     protected void onReceiveBeforeRead() throws IOException {
246         MessageConsumer messageConsumer = null;
247         try {
248             if (temporaryResponseQueueCreated) {
249                 messageConsumer = session.createConsumer(responseDestination);
250             }
251             else {
252                 String messageId = requestMessage.getJMSMessageID().replaceAll("'", "''");
253                 String messageSelector = "JMSCorrelationID = '" + messageId + "'";
254                 messageConsumer = session.createConsumer(responseDestination, messageSelector);
255             }
256             Message message = receiveTimeout >= 0 ? messageConsumer.receive(receiveTimeout) : messageConsumer.receive();
257             if (message instanceof BytesMessage || message instanceof TextMessage) {
258                 responseMessage = message;
259             }
260             else if (message != null) {
261                 throw new IllegalArgumentException(
262                         "Wrong message type: [" + message.getClass() + "]. " +
263                                 "Only BytesMessages or TextMessages can be handled.");
264             }
265         }
266         catch (JMSException ex) {
267             throw new JmsTransportException(ex);
268         }
269         finally {
270             JmsUtils.closeMessageConsumer(messageConsumer);
271             if (temporaryResponseQueueCreated) {
272                 try {
273                     ((TemporaryQueue) responseDestination).delete();
274                 }
275                 catch (JMSException ex) {
276                     // ignore
277                 }
278             }
279         }
280     }
281 
282     @Override
283     protected boolean hasResponse() throws IOException {
284         return responseMessage != null;
285     }
286 
287     @Override
288     protected Iterator<String> getResponseHeaderNames() throws IOException {
289         try {
290             return JmsTransportUtils.getHeaderNames(responseMessage);
291         }
292         catch (JMSException ex) {
293             throw new JmsTransportException("Could not get property names", ex);
294         }
295     }
296 
297     @Override
298     protected Iterator<String> getResponseHeaders(String name) throws IOException {
299         try {
300             return JmsTransportUtils.getHeaders(responseMessage, name);
301         }
302         catch (JMSException ex) {
303             throw new JmsTransportException("Could not get property value", ex);
304         }
305     }
306 
307     @Override
308     protected InputStream getResponseInputStream() throws IOException {
309         if (responseMessage instanceof BytesMessage) {
310             return new BytesMessageInputStream((BytesMessage) responseMessage);
311         }
312         else if (responseMessage instanceof TextMessage) {
313             return new TextMessageInputStream((TextMessage) responseMessage, textMessageEncoding);
314         }
315         else {
316             throw new IllegalStateException("Unknown response message type [" + responseMessage + "]");
317         }
318 
319 
320     }
321 
322     @Override
323     protected void onClose() throws IOException {
324         JmsUtils.closeSession(session);
325         ConnectionFactoryUtils.releaseConnection(connection, connectionFactory, true);
326     }
327 }