1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.jms; |
18 | |
19 | import javax.jms.Destination; |
20 | import javax.jms.JMSException; |
21 | import javax.jms.Message; |
22 | |
23 | import org.apache.commons.logging.Log; |
24 | import org.apache.commons.logging.LogFactory; |
25 | import org.springframework.batch.item.NewItemIdentifier; |
26 | import org.springframework.batch.item.ItemKeyGenerator; |
27 | import org.springframework.batch.item.ItemReader; |
28 | import org.springframework.batch.item.ItemRecoverer; |
29 | import org.springframework.batch.item.UnexpectedInputException; |
30 | import org.springframework.batch.item.support.AbstractItemReader; |
31 | import org.springframework.jms.JmsException; |
32 | import org.springframework.jms.core.JmsOperations; |
33 | import org.springframework.jms.core.JmsTemplate; |
34 | import org.springframework.util.Assert; |
35 | |
36 | /** |
37 | * An {@link ItemReader} for JMS using a {@link JmsTemplate}. The template |
38 | * should have a default destination, which will be used to provide items in |
39 | * {@link #read()}. If a recovery step is needed, set the error destination and |
40 | * the item will be sent there if processing fails in a stateful retry. |
41 | * |
42 | * The implementation is thread safe after its properties are set (normal |
43 | * singleton behaviour). |
44 | * |
45 | * @author Dave Syer |
46 | * |
47 | */ |
48 | public class JmsItemReader extends AbstractItemReader implements ItemRecoverer, ItemKeyGenerator, NewItemIdentifier { |
49 | |
50 | protected Log logger = LogFactory.getLog(getClass()); |
51 | |
52 | private JmsOperations jmsTemplate; |
53 | |
54 | private Class itemType; |
55 | |
56 | private String errorDestinationName; |
57 | |
58 | private Destination errorDestination; |
59 | |
60 | /** |
61 | * Set the error destination. Should not be the same as the default |
62 | * destination of the jms template. |
63 | * @param errorDestination a JMS Destination |
64 | */ |
65 | public void setErrorDestination(Destination errorDestination) { |
66 | this.errorDestination = errorDestination; |
67 | } |
68 | |
69 | /** |
70 | * Set the error destination by name. Will be resolved by the destination |
71 | * resolver in the jms template. |
72 | * |
73 | * @param errorDestinationName the name of a JMS Destination |
74 | */ |
75 | public void setErrorDestinationName(String errorDestinationName) { |
76 | this.errorDestinationName = errorDestinationName; |
77 | } |
78 | |
79 | /** |
80 | * Setter for jms template. |
81 | * |
82 | * @param jmsTemplate a {@link JmsOperations} instance |
83 | */ |
84 | public void setJmsTemplate(JmsOperations jmsTemplate) { |
85 | this.jmsTemplate = jmsTemplate; |
86 | } |
87 | |
88 | /** |
89 | * Set the expected type of incoming message payloads. Set this to |
90 | * {@link Message} to receive the raw underlying message. |
91 | * |
92 | * @param itemType the java class of the items to be delivered. |
93 | * |
94 | * @throws IllegalStateException if the message payload is of the wrong |
95 | * type. |
96 | */ |
97 | public void setItemType(Class itemType) { |
98 | this.itemType = itemType; |
99 | } |
100 | |
101 | public Object read() { |
102 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
103 | return jmsTemplate.receive(); |
104 | } |
105 | Object result = jmsTemplate.receiveAndConvert(); |
106 | if (itemType != null && result != null) { |
107 | Assert.state(itemType.isAssignableFrom(result.getClass()), |
108 | "Received message payload of wrong type: expected [" + itemType + "]"); |
109 | } |
110 | return result; |
111 | } |
112 | |
113 | /** |
114 | * Send the message back to the provider using the specified error |
115 | * destination property of this reader. If the recovery is successful the |
116 | * item itself is returned, otherwise null. |
117 | * |
118 | * @see org.springframework.batch.item.ItemRecoverer#recover(Object, |
119 | * Throwable) |
120 | */ |
121 | public Object recover(Object item, Throwable cause) { |
122 | try { |
123 | if (errorDestination != null) { |
124 | jmsTemplate.convertAndSend(errorDestination, item); |
125 | } |
126 | else if (errorDestinationName != null) { |
127 | jmsTemplate.convertAndSend(errorDestinationName, item); |
128 | } |
129 | else { |
130 | // do nothing - it doesn't make sense to send the message back |
131 | // to the destination it came from |
132 | return null; |
133 | } |
134 | return item; |
135 | } |
136 | catch (JmsException e) { |
137 | logger.error("Could not recover because of JmsException.", e); |
138 | throw e; |
139 | } |
140 | } |
141 | |
142 | /** |
143 | * If the message is a {@link Message} then returns the JMS message ID. |
144 | * Otherwise just delegate to parent class. |
145 | * |
146 | * @see org.springframework.batch.item.ItemKeyGenerator#getKey(java.lang.Object) |
147 | * |
148 | * @throws UnexpectedInputException if the JMS id cannot be determined from |
149 | * a JMS Message |
150 | */ |
151 | public Object getKey(Object item) { |
152 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
153 | try { |
154 | return ((Message) item).getJMSMessageID(); |
155 | } |
156 | catch (JMSException e) { |
157 | throw new UnexpectedInputException("Could not extract message ID", e); |
158 | } |
159 | } |
160 | return item; |
161 | } |
162 | |
163 | /** |
164 | * If the item is a message, check the JMS re-delivered flag, otherwise |
165 | * return false to be on the safe side. |
166 | * |
167 | * @see org.springframework.batch.item.NewItemIdentifier#isNew(java.lang.Object) |
168 | */ |
169 | public boolean isNew(Object item) { |
170 | if (itemType != null && itemType.isAssignableFrom(Message.class)) { |
171 | try { |
172 | return !((Message) item).getJMSRedelivered(); |
173 | } |
174 | catch (JMSException e) { |
175 | throw new UnexpectedInputException("Could not extract message ID", e); |
176 | } |
177 | } |
178 | return false; |
179 | } |
180 | |
181 | } |