View Javadoc
1   /*
2    * Copyright 2009-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.admin.util;
17  
18  import org.springframework.core.task.SyncTaskExecutor;
19  import org.springframework.core.task.TaskExecutor;
20  import org.springframework.core.task.TaskRejectedException;
21  
22  import java.util.concurrent.FutureTask;
23  import java.util.concurrent.Semaphore;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  /**
27   * <p>
28   * A {@link TaskExecutor} with a throttle limit which works by delegating to an
29   * existing task executor and limiting the number of tasks submitted.
30   * </p>
31   * <p>
32   * A throttle limit is provided to limit the number of pending requests over and
33   * above the features provided by the other task executors. The submit method
34   * blocks until there are results available to retrieve. This limit is different
35   * (and orthogonal) to any queue size imposed by the delegate
36   * {@link TaskExecutor}: such queues normally do not throttle, in the sense that
37   * they always accept more work, until they fill up, at which point they reject.
38   * The point of a throttle is to not reject any work, but to still limit the
39   * number of concurrent tasks.
40   * </p>
41   * @author Dave Syer
42   * 
43   */
44  public class ThrottledTaskExecutor implements TaskExecutor {
45  
46  	private Semaphore semaphore;
47  
48  	private volatile AtomicInteger count = new AtomicInteger(0);
49  
50  	private TaskExecutor taskExecutor = new SyncTaskExecutor();
51  
52  	/**
53  	 * Create a {@link ThrottledTaskExecutor} with infinite
54  	 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted.
55  	 */
56  	public ThrottledTaskExecutor() {
57  		this(null, Integer.MAX_VALUE);
58  	}
59  
60  	/**
61  	 * Create a {@link ThrottledTaskExecutor} with infinite
62  	 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted.
63  	 * 
64  	 * @param taskExecutor the {@link TaskExecutor} to use
65  	 */
66  	public ThrottledTaskExecutor(TaskExecutor taskExecutor) {
67  		this(taskExecutor, Integer.MAX_VALUE);
68  	}
69  
70  	/**
71  	 * Create a {@link ThrottledTaskExecutor} with finite throttle
72  	 * limit. The submit method will block when this limit is reached until one
73  	 * of the tasks has finished.
74  	 * 
75  	 * @param taskExecutor the {@link TaskExecutor} to use
76  	 * @param throttleLimit the throttle limit
77  	 */
78  	public ThrottledTaskExecutor(TaskExecutor taskExecutor, int throttleLimit) {
79  		super();
80  		if (taskExecutor != null) {
81  			this.taskExecutor = taskExecutor;
82  		}
83  		this.semaphore = new Semaphore(throttleLimit);
84  	}
85  
86  	/**
87  	 * Limits the number of concurrent executions on the enclosed task executor.
88  	 * Do not call this after initialization (for configuration purposes only).
89  	 * 
90  	 * @param throttleLimit the throttle limit to apply
91  	 */
92  	public void setThrottleLimit(int throttleLimit) {
93  		this.semaphore = new Semaphore(throttleLimit);
94  	}
95  
96  	/**
97  	 * Public setter for the {@link TaskExecutor} to be used to execute the
98  	 * tasks submitted. The default is synchronous, executing tasks on the
99  	 * calling thread. In this case the throttle limit is irrelevant as there
100 	 * will always be at most one task pending.
101 	 * 
102 	 * @param taskExecutor {@link org.springframework.core.task.TaskExecutor}
103 	 */
104 	public void setTaskExecutor(TaskExecutor taskExecutor) {
105 		this.taskExecutor = taskExecutor;
106 	}
107 
108 	/**
109 	 * Submit a task for execution by the delegate task executor, blocking if
110 	 * the throttleLimit is exceeded.
111 	 * 
112 	 * @see TaskExecutor#execute(Runnable)
113 	 */
114 	public void execute(Runnable task) {
115 		if (task == null) {
116 			throw new NullPointerException("Task is null in ThrottledTaskExecutor.");
117 		}
118 		doSubmit(task);
119 	}
120 
121 	/**
122 	 * Get an estimate of the number of pending requests.
123 	 * 
124 	 * @return the estimate
125 	 */
126 	public int size() {
127 		return count.get();
128 	}
129 
130 	private Runnable doSubmit(final Runnable task) {
131 
132 		try {
133 			semaphore.acquire();
134 			count.incrementAndGet();
135 		}
136 		catch (InterruptedException e) {
137 			Thread.currentThread().interrupt();
138 			throw new TaskRejectedException("Task could not be submitted because of a thread interruption.");
139 		}
140 
141 		try {
142 			taskExecutor.execute(new FutureTask<Object>(task, null) {
143 				@Override
144 				protected void done() {
145 					semaphore.release();
146 					count.decrementAndGet();
147 				}
148 			});
149 		}
150 		catch (TaskRejectedException e) {
151 			semaphore.release();
152 			count.decrementAndGet();
153 			throw e;
154 		}
155 
156 		return task;
157 	}
158 }