1 /* 2 * Copyright 2009-2010 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.springframework.batch.admin.util; 17 18 import org.springframework.core.task.SyncTaskExecutor; 19 import org.springframework.core.task.TaskExecutor; 20 import org.springframework.core.task.TaskRejectedException; 21 22 import java.util.concurrent.FutureTask; 23 import java.util.concurrent.Semaphore; 24 import java.util.concurrent.atomic.AtomicInteger; 25 26 /** 27 * <p> 28 * A {@link TaskExecutor} with a throttle limit which works by delegating to an 29 * existing task executor and limiting the number of tasks submitted. 30 * </p> 31 * <p> 32 * A throttle limit is provided to limit the number of pending requests over and 33 * above the features provided by the other task executors. The submit method 34 * blocks until there are results available to retrieve. This limit is different 35 * (and orthogonal) to any queue size imposed by the delegate 36 * {@link TaskExecutor}: such queues normally do not throttle, in the sense that 37 * they always accept more work, until they fill up, at which point they reject. 38 * The point of a throttle is to not reject any work, but to still limit the 39 * number of concurrent tasks. 40 * </p> 41 * @author Dave Syer 42 * 43 */ 44 public class ThrottledTaskExecutor implements TaskExecutor { 45 46 private Semaphore semaphore; 47 48 private volatile AtomicInteger count = new AtomicInteger(0); 49 50 private TaskExecutor taskExecutor = new SyncTaskExecutor(); 51 52 /** 53 * Create a {@link ThrottledTaskExecutor} with infinite 54 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted. 55 */ 56 public ThrottledTaskExecutor() { 57 this(null, Integer.MAX_VALUE); 58 } 59 60 /** 61 * Create a {@link ThrottledTaskExecutor} with infinite 62 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted. 63 * 64 * @param taskExecutor the {@link TaskExecutor} to use 65 */ 66 public ThrottledTaskExecutor(TaskExecutor taskExecutor) { 67 this(taskExecutor, Integer.MAX_VALUE); 68 } 69 70 /** 71 * Create a {@link ThrottledTaskExecutor} with finite throttle 72 * limit. The submit method will block when this limit is reached until one 73 * of the tasks has finished. 74 * 75 * @param taskExecutor the {@link TaskExecutor} to use 76 * @param throttleLimit the throttle limit 77 */ 78 public ThrottledTaskExecutor(TaskExecutor taskExecutor, int throttleLimit) { 79 super(); 80 if (taskExecutor != null) { 81 this.taskExecutor = taskExecutor; 82 } 83 this.semaphore = new Semaphore(throttleLimit); 84 } 85 86 /** 87 * Limits the number of concurrent executions on the enclosed task executor. 88 * Do not call this after initialization (for configuration purposes only). 89 * 90 * @param throttleLimit the throttle limit to apply 91 */ 92 public void setThrottleLimit(int throttleLimit) { 93 this.semaphore = new Semaphore(throttleLimit); 94 } 95 96 /** 97 * Public setter for the {@link TaskExecutor} to be used to execute the 98 * tasks submitted. The default is synchronous, executing tasks on the 99 * calling thread. In this case the throttle limit is irrelevant as there 100 * will always be at most one task pending. 101 * 102 * @param taskExecutor {@link org.springframework.core.task.TaskExecutor} 103 */ 104 public void setTaskExecutor(TaskExecutor taskExecutor) { 105 this.taskExecutor = taskExecutor; 106 } 107 108 /** 109 * Submit a task for execution by the delegate task executor, blocking if 110 * the throttleLimit is exceeded. 111 * 112 * @see TaskExecutor#execute(Runnable) 113 */ 114 public void execute(Runnable task) { 115 if (task == null) { 116 throw new NullPointerException("Task is null in ThrottledTaskExecutor."); 117 } 118 doSubmit(task); 119 } 120 121 /** 122 * Get an estimate of the number of pending requests. 123 * 124 * @return the estimate 125 */ 126 public int size() { 127 return count.get(); 128 } 129 130 private Runnable doSubmit(final Runnable task) { 131 132 try { 133 semaphore.acquire(); 134 count.incrementAndGet(); 135 } 136 catch (InterruptedException e) { 137 Thread.currentThread().interrupt(); 138 throw new TaskRejectedException("Task could not be submitted because of a thread interruption."); 139 } 140 141 try { 142 taskExecutor.execute(new FutureTask<Object>(task, null) { 143 @Override 144 protected void done() { 145 semaphore.release(); 146 count.decrementAndGet(); 147 } 148 }); 149 } 150 catch (TaskRejectedException e) { 151 semaphore.release(); 152 count.decrementAndGet(); 153 throw e; 154 } 155 156 return task; 157 } 158 }