EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.partition.support]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleStepExecutionSplitter.java]

nameclass, %method, %block, %line, %
SimpleStepExecutionSplitter.java100% (1/1)86%  (12/14)70%  (258/371)75%  (61.3/82)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleStepExecutionSplitter100% (1/1)86%  (12/14)70%  (258/371)75%  (61.3/82)
SimpleStepExecutionSplitter (JobRepository, Step, Partitioner): void 0%   (0/1)0%   (0/20)0%   (0/7)
setAllowStartIfComplete (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
shouldStart (boolean, StepExecution, StepExecution): boolean 100% (1/1)38%  (30/80)62%  (10/16)
isSameJobExecution (StepExecution, StepExecution): boolean 100% (1/1)56%  (9/16)67%  (2/3)
getContexts (StepExecution, int): Map 100% (1/1)63%  (49/78)67%  (10/15)
afterPropertiesSet (): void 100% (1/1)88%  (22/25)93%  (3.7/4)
SimpleStepExecutionSplitter (): void 100% (1/1)100% (6/6)100% (3/3)
SimpleStepExecutionSplitter (JobRepository, boolean, String, Partitioner): void 100% (1/1)100% (18/18)100% (7/7)
getStartable (StepExecution, ExecutionContext): boolean 100% (1/1)100% (46/46)100% (8/8)
getStepName (): String 100% (1/1)100% (3/3)100% (1/1)
setJobRepository (JobRepository): void 100% (1/1)100% (4/4)100% (2/2)
setPartitioner (Partitioner): void 100% (1/1)100% (4/4)100% (2/2)
setStepName (String): void 100% (1/1)100% (4/4)100% (2/2)
split (StepExecution, int): Set 100% (1/1)100% (63/63)100% (12/12)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.partition.support;
18 
19import java.util.Collection;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.Map;
23import java.util.Set;
24import java.util.Map.Entry;
25 
26import org.springframework.batch.core.BatchStatus;
27import org.springframework.batch.core.JobExecution;
28import org.springframework.batch.core.JobExecutionException;
29import org.springframework.batch.core.JobInstance;
30import org.springframework.batch.core.Step;
31import org.springframework.batch.core.StepExecution;
32import org.springframework.batch.core.partition.StepExecutionSplitter;
33import org.springframework.batch.core.repository.JobRepository;
34import org.springframework.batch.item.ExecutionContext;
35import org.springframework.beans.factory.InitializingBean;
36import org.springframework.util.Assert;
37 
38/**
39 * Generic implementation of {@link StepExecutionSplitter} that delegates to a
40 * {@link Partitioner} to generate {@link ExecutionContext} instances. Takes
41 * care of restartability and identifying the step executions from previous runs
42 * of the same job. The generated {@link StepExecution} instances have names
43 * that identify them uniquely in the partition. The name is constructed from a
44 * base (name of the target step) plus a suffix taken from the
45 * {@link Partitioner} identifiers, separated by a colon, e.g.
46 * <code>{step1:partition0, step1:partition1, ...}</code>.
47 * 
48 * @author Dave Syer
49 * @since 2.0
50 */
51public class SimpleStepExecutionSplitter implements StepExecutionSplitter, InitializingBean {
52 
53        private static final String STEP_NAME_SEPARATOR = ":";
54 
55        private String stepName;
56 
57        private Partitioner partitioner;
58 
59        private boolean allowStartIfComplete = false;
60 
61        private JobRepository jobRepository;
62 
63        /**
64         * Default constructor for convenience in configuration.
65         */
66        public SimpleStepExecutionSplitter() {
67        }
68 
69        /**
70         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
71         * properties.
72         * 
73         * @param jobRepository the {@link JobRepository}
74         * @param allowStartIfComplete flag specifying preferences on restart
75         * @param stepName the target step name
76         * @param partitioner a {@link Partitioner} to use for generating input
77         * parameters
78         */
79        public SimpleStepExecutionSplitter(JobRepository jobRepository, boolean allowStartIfComplete, String stepName, Partitioner partitioner) {
80                this.jobRepository = jobRepository;
81                this.allowStartIfComplete = allowStartIfComplete;
82                this.partitioner = partitioner;
83                this.stepName = stepName;
84        }
85 
86        /**
87         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
88         * properties.
89         * 
90         * @param jobRepository the {@link JobRepository}
91         * @param step the target step (a local version of it), used to extract the
92         * name and allowStartIfComplete flags
93         * @param partitioner a {@link Partitioner} to use for generating input
94         * parameters
95         * 
96         * @deprecated use {@link #SimpleStepExecutionSplitter(JobRepository, boolean, String, Partitioner)} instead
97         */
98        @Deprecated
99        public SimpleStepExecutionSplitter(JobRepository jobRepository, Step step, Partitioner partitioner) {
100                this.jobRepository = jobRepository;
101                this.allowStartIfComplete = step.isAllowStartIfComplete();
102                this.partitioner = partitioner;
103                this.stepName = step.getName();
104        }
105 
106        /**
107         * Check mandatory properties (step name, job repository and partitioner).
108         * 
109         * @see InitializingBean#afterPropertiesSet()
110         */
111        public void afterPropertiesSet() throws Exception {
112                Assert.state(jobRepository != null, "A JobRepository is required");
113                Assert.state(stepName != null, "A step name is required");
114                Assert.state(partitioner != null, "A Partitioner is required");
115        }
116 
117        /**
118         * Flag to indicate that the partition target step is allowed to start if an
119         * execution is complete. Defaults to the same value as the underlying step.
120         * Set this manually to override the underlying step properties.
121         * 
122         * @see Step#isAllowStartIfComplete()
123         * 
124         * @param allowStartIfComplete the value to set
125         */
126        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
127                this.allowStartIfComplete = allowStartIfComplete;
128        }
129 
130        /**
131         * The job repository that will be used to manage the persistence of the
132         * delegate step executions.
133         * 
134         * @param jobRepository the JobRepository to set
135         */
136        public void setJobRepository(JobRepository jobRepository) {
137                this.jobRepository = jobRepository;
138        }
139 
140        /**
141         * The {@link Partitioner} that will be used to generate step execution meta
142         * data for the target step.
143         * 
144         * @param partitioner the partitioner to set
145         */
146        public void setPartitioner(Partitioner partitioner) {
147                this.partitioner = partitioner;
148        }
149 
150        /**
151         * The name of the target step that will be executed across the partitions.
152         * Mandatory with no default.
153         * 
154         * @param stepName the step name to set
155         */
156        public void setStepName(String stepName) {
157                this.stepName = stepName;
158        }
159 
160        /**
161         * @see StepExecutionSplitter#getStepName()
162         */
163        public String getStepName() {
164                return this.stepName;
165        }
166 
167        /**
168         * @see StepExecutionSplitter#split(StepExecution, int)
169         */
170        public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException {
171 
172                JobExecution jobExecution = stepExecution.getJobExecution();
173 
174                Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
175                Set<StepExecution> set = new HashSet<StepExecution>(contexts.size());
176 
177                for (Entry<String, ExecutionContext> context : contexts.entrySet()) {
178 
179                        // Make the step execution name unique and repeatable
180                        String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();
181 
182                        StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
183 
184                        boolean startable = getStartable(currentStepExecution, context.getValue());
185 
186                        if (startable) {
187                                jobRepository.add(currentStepExecution);
188                                set.add(currentStepExecution);
189                        }
190 
191                }
192 
193                return set;
194 
195        }
196 
197        private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {
198 
199                ExecutionContext context = stepExecution.getExecutionContext();
200                String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";
201 
202                // If this is a restart we must retain the same grid size, ignoring the
203                // one passed in...
204                int splitSize = (int) context.getLong(key, gridSize);
205                context.putLong(key, splitSize);
206 
207                Map<String, ExecutionContext> result;
208                if (context.isDirty()) {
209                        // The context changed so we didn't already know the partitions
210                        jobRepository.updateExecutionContext(stepExecution);
211                        result = partitioner.partition(splitSize);
212                }
213                else {
214                        if (partitioner instanceof PartitionNameProvider) {
215                                result = new HashMap<String, ExecutionContext>();
216                                Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
217                                for (String name : names) {
218                                        /*
219                                         * We need to return the same keys as the original (failed)
220                                         * execution, but the execution contexts will be discarded
221                                         * so they can be empty.
222                                         */
223                                        result.put(name, new ExecutionContext());
224                                }
225                        }
226                        else {
227                                // If no names are provided, grab the partition again.
228                                result = partitioner.partition(splitSize);
229                        }
230                }
231 
232                return result;
233        }
234 
235        private boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
236 
237                JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
238                String stepName = stepExecution.getStepName();
239                StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);
240 
241                boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);
242 
243                if (isRestart) {
244                        stepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
245                }
246                else {
247                        stepExecution.setExecutionContext(context);
248                }
249 
250                return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;
251 
252        }
253 
254        private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution)
255                        throws JobExecutionException {
256 
257                if (lastStepExecution == null) {
258                        return true;
259                }
260 
261                BatchStatus stepStatus = lastStepExecution.getStatus();
262 
263                if (stepStatus == BatchStatus.UNKNOWN) {
264                        throw new JobExecutionException("Cannot restart step from UNKNOWN status.  "
265                                        + "The last execution ended with a failure that could not be rolled back, "
266                                        + "so it may be dangerous to proceed.  " + "Manual intervention is probably necessary.");
267                }
268 
269                if (stepStatus == BatchStatus.COMPLETED) {
270                        if (!allowStartIfComplete) {
271                                if (isSameJobExecution(stepExecution, lastStepExecution)) {
272                                        // it's always OK to start again in the same JobExecution
273                                        return true;
274                                }
275                                // step is complete, false should be returned, indicating that
276                                // the step should not be started
277                                return false;
278                        }
279                        else {
280                                return true;
281                        }
282                }
283 
284                if (stepStatus == BatchStatus.STOPPED || stepStatus == BatchStatus.FAILED) {
285                        return true;
286                }
287 
288                if (stepStatus == BatchStatus.STARTED || stepStatus == BatchStatus.STARTING
289                                || stepStatus == BatchStatus.STOPPING) {
290                        throw new JobExecutionException(
291                                        "Cannot restart step from "
292                                                        + stepStatus
293                                                        + " status.  "
294                                                        + "The old execution may still be executing, so you may need to verify manually that this is the case.");
295                }
296 
297                throw new JobExecutionException("Cannot restart step from " + stepStatus + " status.  "
298                                + "We believe the old execution was abandoned and therefore has been marked as un-restartable.");
299 
300        }
301 
302        private boolean isSameJobExecution(StepExecution stepExecution, StepExecution lastStepExecution) {
303                if (stepExecution.getJobExecutionId()==null) {
304                        return lastStepExecution.getJobExecutionId()==null;
305                }
306                return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId());
307        }
308 
309}

[all classes][org.springframework.batch.core.partition.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov