View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.sample.domain.trade.internal;
18  
19  import java.math.BigDecimal;
20  import java.util.ArrayList;
21  import java.util.List;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.springframework.batch.core.annotation.AfterWrite;
26  import org.springframework.batch.item.ExecutionContext;
27  import org.springframework.batch.item.ItemStreamException;
28  import org.springframework.batch.item.ItemStreamSupport;
29  import org.springframework.batch.item.ItemWriter;
30  import org.springframework.batch.item.WriteFailedException;
31  import org.springframework.batch.sample.domain.trade.Trade;
32  import org.springframework.batch.sample.domain.trade.TradeDao;
33  import org.springframework.util.Assert;
34  
35  /**
36   * Delegates the actual writing to custom DAO delegate. Allows configurable
37   * exception raising for testing skip and restart.
38   */
39  public class TradeWriter extends ItemStreamSupport implements ItemWriter<Trade> {
40  
41  	private static Log log = LogFactory.getLog(TradeWriter.class);
42  
43  	public static final String TOTAL_AMOUNT_KEY = "TOTAL_AMOUNT";
44  
45  	private TradeDao dao;
46  
47  	private List<String> failingCustomers = new ArrayList<String>();
48  
49  	private BigDecimal totalPrice = BigDecimal.ZERO;
50  
51  	public void write(List<? extends Trade> trades) {
52  
53  		for (Trade trade : trades) {
54  
55  			log.debug(trade);
56  
57  			dao.writeTrade(trade);
58  
59  			Assert.notNull(trade.getPrice()); // There must be a price to total
60  
61  			if (this.failingCustomers.contains(trade.getCustomer())) {
62  				throw new WriteFailedException("Something unexpected happened!");
63  			}
64  		}
65  
66  	}
67  
68  	@AfterWrite
69  	public void updateTotalPrice(List<Trade> trades) {
70  		for (Trade trade : trades) {
71  			this.totalPrice = this.totalPrice.add(trade.getPrice());
72  		}
73  	}
74  
75  	@Override
76  	public void open(ExecutionContext executionContext) throws ItemStreamException {
77  		if (executionContext.containsKey(TOTAL_AMOUNT_KEY)) {
78  			this.totalPrice = (BigDecimal) executionContext.get(TOTAL_AMOUNT_KEY);
79  		}
80  		else {
81  			//
82  			// Fresh run. Disregard old state.
83  			//
84  			this.totalPrice = BigDecimal.ZERO;
85  		}
86  	}
87  
88  	@Override
89  	public void update(ExecutionContext executionContext) {
90  		executionContext.put(TOTAL_AMOUNT_KEY, this.totalPrice);
91  	}
92  
93  	public BigDecimal getTotalPrice() {
94  		return totalPrice;
95  	}
96  
97  	public void setDao(TradeDao dao) {
98  		this.dao = dao;
99  	}
100 
101 	/**
102 	 * Public setter for the the customers on which failure should occur.
103 	 * 
104 	 * @param failingCustomers The customers to fail on
105 	 */
106 	public void setFailingCustomers(List<String> failingCustomers) {
107 		this.failingCustomers = failingCustomers;
108 	}
109 }