EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.partition.support]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleStepExecutionSplitter.java]

nameclass, %method, %block, %line, %
SimpleStepExecutionSplitter.java100% (1/1)93%  (13/14)71%  (262/371)76%  (63.3/83)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleStepExecutionSplitter100% (1/1)93%  (13/14)71%  (262/371)76%  (63.3/83)
SimpleStepExecutionSplitter (JobRepository, Step, Partitioner): void 0%   (0/1)0%   (0/20)0%   (0/7)
shouldStart (boolean, StepExecution, StepExecution): boolean 100% (1/1)38%  (30/80)62%  (10/16)
isSameJobExecution (StepExecution, StepExecution): boolean 100% (1/1)56%  (9/16)67%  (2/3)
getContexts (StepExecution, int): Map 100% (1/1)63%  (49/78)62%  (10/16)
afterPropertiesSet (): void 100% (1/1)88%  (22/25)93%  (3.7/4)
SimpleStepExecutionSplitter (): void 100% (1/1)100% (6/6)100% (3/3)
SimpleStepExecutionSplitter (JobRepository, boolean, String, Partitioner): void 100% (1/1)100% (18/18)100% (7/7)
getStartable (StepExecution, ExecutionContext): boolean 100% (1/1)100% (46/46)100% (8/8)
getStepName (): String 100% (1/1)100% (3/3)100% (1/1)
setAllowStartIfComplete (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setJobRepository (JobRepository): void 100% (1/1)100% (4/4)100% (2/2)
setPartitioner (Partitioner): void 100% (1/1)100% (4/4)100% (2/2)
setStepName (String): void 100% (1/1)100% (4/4)100% (2/2)
split (StepExecution, int): Set 100% (1/1)100% (63/63)100% (12/12)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.partition.support;
18 
19import java.util.Collection;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.Map;
23import java.util.Map.Entry;
24import java.util.Set;
25 
26import org.springframework.batch.core.BatchStatus;
27import org.springframework.batch.core.JobExecution;
28import org.springframework.batch.core.JobExecutionException;
29import org.springframework.batch.core.JobInstance;
30import org.springframework.batch.core.Step;
31import org.springframework.batch.core.StepExecution;
32import org.springframework.batch.core.partition.StepExecutionSplitter;
33import org.springframework.batch.core.repository.JobRepository;
34import org.springframework.batch.item.ExecutionContext;
35import org.springframework.beans.factory.InitializingBean;
36import org.springframework.util.Assert;
37 
38/**
39 * Generic implementation of {@link StepExecutionSplitter} that delegates to a
40 * {@link Partitioner} to generate {@link ExecutionContext} instances. Takes
41 * care of restartability and identifying the step executions from previous runs
42 * of the same job. The generated {@link StepExecution} instances have names
43 * that identify them uniquely in the partition. The name is constructed from a
44 * base (name of the target step) plus a suffix taken from the
45 * {@link Partitioner} identifiers, separated by a colon, e.g.
46 * <code>{step1:partition0, step1:partition1, ...}</code>.
47 *
48 * @author Dave Syer
49 * @since 2.0
50 */
51public class SimpleStepExecutionSplitter implements StepExecutionSplitter, InitializingBean {
52 
53        private static final String STEP_NAME_SEPARATOR = ":";
54 
55        private String stepName;
56 
57        private Partitioner partitioner;
58 
59        private boolean allowStartIfComplete = false;
60 
61        private JobRepository jobRepository;
62 
63        /**
64         * Default constructor for convenience in configuration.
65         */
66        public SimpleStepExecutionSplitter() {
67        }
68 
69        /**
70         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
71         * properties.
72         *
73         * @param jobRepository the {@link JobRepository}
74         * @param allowStartIfComplete flag specifying preferences on restart
75         * @param stepName the target step name
76         * @param partitioner a {@link Partitioner} to use for generating input
77         * parameters
78         */
79        public SimpleStepExecutionSplitter(JobRepository jobRepository, boolean allowStartIfComplete, String stepName, Partitioner partitioner) {
80                this.jobRepository = jobRepository;
81                this.allowStartIfComplete = allowStartIfComplete;
82                this.partitioner = partitioner;
83                this.stepName = stepName;
84        }
85 
86        /**
87         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
88         * properties.
89         *
90         * @param jobRepository the {@link JobRepository}
91         * @param step the target step (a local version of it), used to extract the
92         * name and allowStartIfComplete flags
93         * @param partitioner a {@link Partitioner} to use for generating input
94         * parameters
95         *
96         * @deprecated use {@link #SimpleStepExecutionSplitter(JobRepository, boolean, String, Partitioner)} instead
97         */
98        @Deprecated
99        public SimpleStepExecutionSplitter(JobRepository jobRepository, Step step, Partitioner partitioner) {
100                this.jobRepository = jobRepository;
101                this.allowStartIfComplete = step.isAllowStartIfComplete();
102                this.partitioner = partitioner;
103                this.stepName = step.getName();
104        }
105 
106        /**
107         * Check mandatory properties (step name, job repository and partitioner).
108         *
109         * @see InitializingBean#afterPropertiesSet()
110         */
111        @Override
112        public void afterPropertiesSet() throws Exception {
113                Assert.state(jobRepository != null, "A JobRepository is required");
114                Assert.state(stepName != null, "A step name is required");
115                Assert.state(partitioner != null, "A Partitioner is required");
116        }
117 
118        /**
119         * Flag to indicate that the partition target step is allowed to start if an
120         * execution is complete. Defaults to the same value as the underlying step.
121         * Set this manually to override the underlying step properties.
122         *
123         * @see Step#isAllowStartIfComplete()
124         *
125         * @param allowStartIfComplete the value to set
126         */
127        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
128                this.allowStartIfComplete = allowStartIfComplete;
129        }
130 
131        /**
132         * The job repository that will be used to manage the persistence of the
133         * delegate step executions.
134         *
135         * @param jobRepository the JobRepository to set
136         */
137        public void setJobRepository(JobRepository jobRepository) {
138                this.jobRepository = jobRepository;
139        }
140 
141        /**
142         * The {@link Partitioner} that will be used to generate step execution meta
143         * data for the target step.
144         *
145         * @param partitioner the partitioner to set
146         */
147        public void setPartitioner(Partitioner partitioner) {
148                this.partitioner = partitioner;
149        }
150 
151        /**
152         * The name of the target step that will be executed across the partitions.
153         * Mandatory with no default.
154         *
155         * @param stepName the step name to set
156         */
157        public void setStepName(String stepName) {
158                this.stepName = stepName;
159        }
160 
161        /**
162         * @see StepExecutionSplitter#getStepName()
163         */
164        @Override
165        public String getStepName() {
166                return this.stepName;
167        }
168 
169        /**
170         * @see StepExecutionSplitter#split(StepExecution, int)
171         */
172        @Override
173        public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException {
174 
175                JobExecution jobExecution = stepExecution.getJobExecution();
176 
177                Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
178                Set<StepExecution> set = new HashSet<StepExecution>(contexts.size());
179 
180                for (Entry<String, ExecutionContext> context : contexts.entrySet()) {
181 
182                        // Make the step execution name unique and repeatable
183                        String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();
184 
185                        StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
186 
187                        boolean startable = getStartable(currentStepExecution, context.getValue());
188 
189                        if (startable) {
190                                set.add(currentStepExecution);
191                        }
192                }
193                
194                jobRepository.addAll(set);
195 
196                return set;
197 
198        }
199 
200        private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {
201 
202                ExecutionContext context = stepExecution.getExecutionContext();
203                String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";
204 
205                // If this is a restart we must retain the same grid size, ignoring the
206                // one passed in...
207                int splitSize = (int) context.getLong(key, gridSize);
208                context.putLong(key, splitSize);
209 
210                Map<String, ExecutionContext> result;
211                if (context.isDirty()) {
212                        // The context changed so we didn't already know the partitions
213                        jobRepository.updateExecutionContext(stepExecution);
214                        result = partitioner.partition(splitSize);
215                }
216                else {
217                        if (partitioner instanceof PartitionNameProvider) {
218                                result = new HashMap<String, ExecutionContext>();
219                                Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
220                                for (String name : names) {
221                                        /*
222                                         * We need to return the same keys as the original (failed)
223                                         * execution, but the execution contexts will be discarded
224                                         * so they can be empty.
225                                         */
226                                        result.put(name, new ExecutionContext());
227                                }
228                        }
229                        else {
230                                // If no names are provided, grab the partition again.
231                                result = partitioner.partition(splitSize);
232                        }
233                }
234 
235                return result;
236        }
237 
238        private boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
239 
240                JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
241                String stepName = stepExecution.getStepName();
242                StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);
243 
244                boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);
245 
246                if (isRestart) {
247                        stepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
248                }
249                else {
250                        stepExecution.setExecutionContext(context);
251                }
252 
253                return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;
254 
255        }
256 
257        private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution)
258                        throws JobExecutionException {
259 
260                if (lastStepExecution == null) {
261                        return true;
262                }
263 
264                BatchStatus stepStatus = lastStepExecution.getStatus();
265 
266                if (stepStatus == BatchStatus.UNKNOWN) {
267                        throw new JobExecutionException("Cannot restart step from UNKNOWN status.  "
268                                        + "The last execution ended with a failure that could not be rolled back, "
269                                        + "so it may be dangerous to proceed.  " + "Manual intervention is probably necessary.");
270                }
271 
272                if (stepStatus == BatchStatus.COMPLETED) {
273                        if (!allowStartIfComplete) {
274                                if (isSameJobExecution(stepExecution, lastStepExecution)) {
275                                        // it's always OK to start again in the same JobExecution
276                                        return true;
277                                }
278                                // step is complete, false should be returned, indicating that
279                                // the step should not be started
280                                return false;
281                        }
282                        else {
283                                return true;
284                        }
285                }
286 
287                if (stepStatus == BatchStatus.STOPPED || stepStatus == BatchStatus.FAILED) {
288                        return true;
289                }
290 
291                if (stepStatus == BatchStatus.STARTED || stepStatus == BatchStatus.STARTING
292                                || stepStatus == BatchStatus.STOPPING) {
293                        throw new JobExecutionException(
294                                        "Cannot restart step from "
295                                                        + stepStatus
296                                                        + " status.  "
297                                                        + "The old execution may still be executing, so you may need to verify manually that this is the case.");
298                }
299 
300                throw new JobExecutionException("Cannot restart step from " + stepStatus + " status.  "
301                                + "We believe the old execution was abandoned and therefore has been marked as un-restartable.");
302 
303        }
304 
305        private boolean isSameJobExecution(StepExecution stepExecution, StepExecution lastStepExecution) {
306                if (stepExecution.getJobExecutionId()==null) {
307                        return lastStepExecution.getJobExecutionId()==null;
308                }
309                return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId());
310        }
311 
312}

[all classes][org.springframework.batch.core.partition.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov