1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.jms; |
18 | |
19 | import javax.jms.Destination; |
20 | import javax.jms.JMSException; |
21 | import javax.jms.Message; |
22 | |
23 | import org.apache.commons.logging.Log; |
24 | import org.apache.commons.logging.LogFactory; |
25 | import org.springframework.batch.item.AbstractItemReader; |
26 | import org.springframework.batch.item.FailedItemIdentifier; |
27 | import org.springframework.batch.item.ItemKeyGenerator; |
28 | import org.springframework.batch.item.ItemReader; |
29 | import org.springframework.batch.item.ItemRecoverer; |
30 | import org.springframework.batch.item.UnexpectedInputException; |
31 | import org.springframework.jms.JmsException; |
32 | import org.springframework.jms.core.JmsOperations; |
33 | import org.springframework.jms.core.JmsTemplate; |
34 | import org.springframework.util.Assert; |
35 | |
36 | /** |
37 | * An {@link ItemReader} for JMS using a {@link JmsTemplate}. The template |
38 | * should have a default destination, which will be used to provide items in |
39 | * {@link #read()}. If a recovery step is needed, set the error destination and |
40 | * the item will be sent there if processing fails in an external retry. |
41 | * |
42 | * @author Dave Syer |
43 | * |
44 | */ |
45 | public class JmsItemReader extends AbstractItemReader implements ItemRecoverer, ItemKeyGenerator, FailedItemIdentifier { |
46 | |
47 | protected Log logger = LogFactory.getLog(getClass()); |
48 | |
49 | private JmsOperations jmsTemplate; |
50 | |
51 | private Class itemType; |
52 | |
53 | private String errorDestinationName; |
54 | |
55 | private Destination errorDestination; |
56 | |
57 | /** |
58 | * Set the error destination. Should not be the same as the default |
59 | * destination of the jms template. |
60 | * @param errorDestination a JMS Destination |
61 | */ |
62 | public void setErrorDestination(Destination errorDestination) { |
63 | this.errorDestination = errorDestination; |
64 | } |
65 | |
66 | /** |
67 | * Set the error destination by name. Will be resolved by the destination |
68 | * resolver in the jms template. |
69 | * |
70 | * @param errorDestinationName the name of a JMS Destination |
71 | */ |
72 | public void setErrorDestinationName(String errorDestinationName) { |
73 | this.errorDestinationName = errorDestinationName; |
74 | } |
75 | |
76 | /** |
77 | * Setter for jms template. |
78 | * |
79 | * @param jmsTemplate a {@link JmsOperations} instance |
80 | */ |
81 | public void setJmsTemplate(JmsOperations jmsTemplate) { |
82 | this.jmsTemplate = jmsTemplate; |
83 | } |
84 | |
85 | /** |
86 | * Set the expected type of incoming message payloads. Set this to |
87 | * {@link Message} to receive the raw underlying message. |
88 | * |
89 | * @param itemType the java class of the items to be delivered. |
90 | * |
91 | * @throws IllegalStateException if the message payload is of the wrong |
92 | * type. |
93 | */ |
94 | public void setItemType(Class itemType) { |
95 | this.itemType = itemType; |
96 | } |
97 | |
98 | public Object read() { |
99 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
100 | return jmsTemplate.receive(); |
101 | } |
102 | Object result = jmsTemplate.receiveAndConvert(); |
103 | if (itemType != null && result != null) { |
104 | Assert.state(itemType.isAssignableFrom(result.getClass()), |
105 | "Received message payload of wrong type: expected [" + itemType + "]"); |
106 | } |
107 | return result; |
108 | } |
109 | |
110 | /** |
111 | * Send the message back to the proovider using the specified error |
112 | * destination property of this provider. |
113 | * |
114 | * @see org.springframework.batch.item.ItemRecoverer#recover(Object, |
115 | * Throwable) |
116 | */ |
117 | public boolean recover(Object item, Throwable cause) { |
118 | try { |
119 | if (errorDestination != null) { |
120 | jmsTemplate.convertAndSend(errorDestination, item); |
121 | } |
122 | else if (errorDestinationName != null) { |
123 | jmsTemplate.convertAndSend(errorDestinationName, item); |
124 | } |
125 | else { |
126 | // do nothing - it doesn't make sense to send the message back |
127 | // to |
128 | // the destination it came from |
129 | return false; |
130 | } |
131 | return true; |
132 | } |
133 | catch (JmsException e) { |
134 | logger.error("Could not recover because of JmsException.", e); |
135 | return false; |
136 | } |
137 | } |
138 | |
139 | /** |
140 | * If the message is a {@link Message} then returns the JMS message ID. |
141 | * Otherwise just delegate to parent class. |
142 | * |
143 | * @see org.springframework.batch.item.ItemKeyGenerator#getKey(java.lang.Object) |
144 | * |
145 | * @throws UnexpectedInputException if the JMS id cannot be determined from |
146 | * a JMS Message |
147 | */ |
148 | public Object getKey(Object item) { |
149 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
150 | try { |
151 | return ((Message) item).getJMSMessageID(); |
152 | } |
153 | catch (JMSException e) { |
154 | throw new UnexpectedInputException("Could not extract message ID", e); |
155 | } |
156 | } |
157 | return item; |
158 | } |
159 | |
160 | /** |
161 | * If the item is a message, check the JMS redelivered flag, otherwise |
162 | * return true to be on the safe side. |
163 | * |
164 | * @see org.springframework.batch.item.FailedItemIdentifier#hasFailed(java.lang.Object) |
165 | */ |
166 | public boolean hasFailed(Object item) { |
167 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
168 | try { |
169 | return ((Message) item).getJMSRedelivered(); |
170 | } |
171 | catch (JMSException e) { |
172 | throw new UnexpectedInputException("Could not extract message ID", e); |
173 | } |
174 | } |
175 | return true; |
176 | } |
177 | |
178 | } |