View Javadoc

1   /*
2    * Copyright 2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.ws.transport.jms;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.OutputStream;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.Iterator;
25  import javax.jms.BytesMessage;
26  import javax.jms.Connection;
27  import javax.jms.ConnectionFactory;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageProducer;
33  import javax.jms.Session;
34  import javax.jms.TemporaryQueue;
35  import javax.jms.TemporaryTopic;
36  import javax.jms.TextMessage;
37  
38  import org.springframework.jms.connection.ConnectionFactoryUtils;
39  import org.springframework.jms.core.MessagePostProcessor;
40  import org.springframework.jms.support.JmsUtils;
41  import org.springframework.util.Assert;
42  import org.springframework.ws.WebServiceMessage;
43  import org.springframework.ws.transport.AbstractSenderConnection;
44  import org.springframework.ws.transport.WebServiceConnection;
45  import org.springframework.ws.transport.jms.support.JmsTransportUtils;
46  
47  /**
48   * Implementation of {@link WebServiceConnection} that is used for client-side JMS access. Exposes a {@link
49   * BytesMessage} request and response message.
50   *
51   * @author Arjen Poutsma
52   * @since 1.5.0
53   */
54  public class JmsSenderConnection extends AbstractSenderConnection {
55  
56      private final ConnectionFactory connectionFactory;
57  
58      private final Connection connection;
59  
60      private final Session session;
61  
62      private final Destination requestDestination;
63  
64      private Message requestMessage;
65  
66      private Destination responseDestination;
67  
68      private Message responseMessage;
69  
70      private long receiveTimeout;
71  
72      private int deliveryMode;
73  
74      private long timeToLive;
75  
76      private int priority;
77  
78      private String textMessageEncoding;
79  
80      private MessagePostProcessor postProcessor;
81  
82      private boolean sessionTransacted = false;
83  
84      /** Constructs a new JMS connection with the given parameters. */
85      protected JmsSenderConnection(ConnectionFactory connectionFactory,
86                                    Connection connection,
87                                    Session session,
88                                    Destination requestDestination,
89                                    Message requestMessage) throws JMSException {
90          Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
91          Assert.notNull(connection, "'connection' must not be null");
92          Assert.notNull(session, "'session' must not be null");
93          Assert.notNull(requestDestination, "'requestDestination' must not be null");
94          Assert.notNull(requestMessage, "'requestMessage' must not be null");
95          this.connectionFactory = connectionFactory;
96          this.connection = connection;
97          this.session = session;
98          this.requestDestination = requestDestination;
99          this.requestMessage = requestMessage;
100     }
101 
102     /** Returns the request message for this connection. Returns either a {@link BytesMessage} or a {@link TextMessage}. */
103     public Message getRequestMessage() {
104         return requestMessage;
105     }
106 
107     /**
108      * Returns the response message, if any, for this connection. Returns either a {@link BytesMessage} or a {@link
109      * TextMessage}.
110      */
111     public Message getResponseMessage() {
112         return responseMessage;
113     }
114 
115     /*
116      * Package-friendly setters
117      */
118 
119     void setResponseDestination(Destination responseDestination) {
120         this.responseDestination = responseDestination;
121     }
122 
123     void setTimeToLive(long timeToLive) {
124         this.timeToLive = timeToLive;
125     }
126 
127     void setDeliveryMode(int deliveryMode) {
128         this.deliveryMode = deliveryMode;
129     }
130 
131     void setPriority(int priority) {
132         this.priority = priority;
133     }
134 
135     void setReceiveTimeout(long receiveTimeout) {
136         this.receiveTimeout = receiveTimeout;
137     }
138 
139     void setTextMessageEncoding(String textMessageEncoding) {
140         this.textMessageEncoding = textMessageEncoding;
141     }
142 
143     void setPostProcessor(MessagePostProcessor postProcessor) {
144         this.postProcessor = postProcessor;
145     }
146 
147     void setSessionTransacted(boolean sessionTransacted) {
148         this.sessionTransacted = sessionTransacted;
149     }
150 
151     /*
152      * URI
153      */
154 
155     public URI getUri() throws URISyntaxException {
156         try {
157             return JmsTransportUtils.toUri(requestDestination);
158         }
159         catch (JMSException ex) {
160             throw new URISyntaxException("", ex.getMessage());
161         }
162     }
163 
164     /*
165      * Errors
166      */
167 
168     public boolean hasError() throws IOException {
169         return false;
170     }
171 
172     public String getErrorMessage() throws IOException {
173         return null;
174     }
175 
176     /*
177      * Sending
178      */
179 
180     protected void addRequestHeader(String name, String value) throws IOException {
181         try {
182             JmsTransportUtils.addHeader(requestMessage, name, value);
183         }
184         catch (JMSException ex) {
185             throw new JmsTransportException("Could not set property", ex);
186         }
187     }
188 
189     protected OutputStream getRequestOutputStream() throws IOException {
190         if (requestMessage instanceof BytesMessage) {
191             return new BytesMessageOutputStream((BytesMessage) requestMessage);
192         }
193         else if (requestMessage instanceof TextMessage) {
194             return new TextMessageOutputStream((TextMessage) requestMessage, textMessageEncoding);
195         }
196         else {
197             throw new IllegalStateException("Unknown request message type [" + requestMessage + "]");
198         }
199 
200     }
201 
202     protected void onSendAfterWrite(WebServiceMessage message) throws IOException {
203         MessageProducer messageProducer = null;
204         try {
205             messageProducer = session.createProducer(requestDestination);
206             messageProducer.setDeliveryMode(deliveryMode);
207             messageProducer.setTimeToLive(timeToLive);
208             messageProducer.setPriority(priority);
209             if (responseDestination == null) {
210                 responseDestination = session.createTemporaryQueue();
211             }
212             requestMessage.setJMSReplyTo(responseDestination);
213             if (postProcessor != null) {
214                 requestMessage = postProcessor.postProcessMessage(requestMessage);
215             }
216             connection.start();
217             messageProducer.send(requestMessage);
218             if (session.getTransacted() && isSessionLocallyTransacted(session)) {
219                 JmsUtils.commitIfNecessary(session);
220             }
221         }
222         catch (JMSException ex) {
223             throw new JmsTransportException(ex);
224         }
225         finally {
226             JmsUtils.closeMessageProducer(messageProducer);
227         }
228     }
229 
230     /** @see org.springframework.jms.core.JmsTemplate#isSessionLocallyTransacted(Session) */
231     private boolean isSessionLocallyTransacted(Session session) {
232         return sessionTransacted && !ConnectionFactoryUtils.isSessionTransactional(session, connectionFactory);
233     }
234 
235     /*
236     * Receiving
237     */
238 
239     protected void onReceiveBeforeRead() throws IOException {
240         MessageConsumer messageConsumer = null;
241         try {
242             if (responseDestination instanceof TemporaryQueue || responseDestination instanceof TemporaryTopic) {
243                 messageConsumer = session.createConsumer(responseDestination);
244             }
245             else {
246                 String messageId = requestMessage.getJMSMessageID().replaceAll("'", "''");
247                 String messageSelector = "JMSCorrelationID = '" + messageId + "'";
248                 messageConsumer = session.createConsumer(responseDestination, messageSelector);
249             }
250             Message message = receiveTimeout >= 0 ? messageConsumer.receive(receiveTimeout) : messageConsumer.receive();
251             if (message instanceof BytesMessage || message instanceof TextMessage) {
252                 responseMessage = message;
253             }
254             else if (message != null) {
255                 throw new IllegalArgumentException(
256                         "Wrong message type: [" + message.getClass() + "]. Only BytesMessages can be handled.");
257             }
258         }
259         catch (JMSException ex) {
260             throw new JmsTransportException(ex);
261         }
262         finally {
263             JmsUtils.closeMessageConsumer(messageConsumer);
264             if (responseDestination instanceof TemporaryQueue) {
265                 try {
266                     ((TemporaryQueue) responseDestination).delete();
267                 }
268                 catch (JMSException e) {
269                     // ignore
270                 }
271             }
272             else if (responseDestination instanceof TemporaryTopic) {
273                 try {
274                     ((TemporaryTopic) responseDestination).delete();
275                 }
276                 catch (JMSException e) {
277                     // ignore
278                 }
279             }
280         }
281     }
282 
283     protected boolean hasResponse() throws IOException {
284         return responseMessage != null;
285     }
286 
287     protected Iterator getResponseHeaderNames() throws IOException {
288         try {
289             return JmsTransportUtils.getHeaderNames(responseMessage);
290         }
291         catch (JMSException ex) {
292             throw new JmsTransportException("Could not get property names", ex);
293         }
294     }
295 
296     protected Iterator getResponseHeaders(String name) throws IOException {
297         try {
298             return JmsTransportUtils.getHeaders(responseMessage, name);
299         }
300         catch (JMSException ex) {
301             throw new JmsTransportException("Could not get property value", ex);
302         }
303     }
304 
305     protected InputStream getResponseInputStream() throws IOException {
306         if (responseMessage instanceof BytesMessage) {
307             return new BytesMessageInputStream((BytesMessage) responseMessage);
308         }
309         else if (responseMessage instanceof TextMessage) {
310             return new TextMessageInputStream((TextMessage) responseMessage, textMessageEncoding);
311         }
312         else {
313             throw new IllegalStateException("Unknown response message type [" + responseMessage + "]");
314         }
315 
316 
317     }
318 
319     protected void onClose() throws IOException {
320         JmsUtils.closeSession(session);
321         ConnectionFactoryUtils.releaseConnection(connection, connectionFactory, true);
322     }
323 }