| 1 | /* |
| 2 | * Copyright 2006-2013 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.springframework.batch.core.job.flow; |
| 18 | |
| 19 | import org.springframework.batch.core.BatchStatus; |
| 20 | import org.springframework.batch.core.ExitStatus; |
| 21 | import org.springframework.batch.core.JobExecution; |
| 22 | import org.springframework.batch.core.JobInterruptedException; |
| 23 | import org.springframework.batch.core.StartLimitExceededException; |
| 24 | import org.springframework.batch.core.Step; |
| 25 | import org.springframework.batch.core.StepExecution; |
| 26 | import org.springframework.batch.core.job.StepHandler; |
| 27 | import org.springframework.batch.core.repository.JobRepository; |
| 28 | import org.springframework.batch.core.repository.JobRestartException; |
| 29 | |
| 30 | /** |
| 31 | * Implementation of {@link FlowExecutor} for use in components that need to |
| 32 | * execute a flow related to a {@link JobExecution}. |
| 33 | * |
| 34 | * @author Dave Syer |
| 35 | * @author Michael Minella |
| 36 | * |
| 37 | */ |
| 38 | public class JobFlowExecutor implements FlowExecutor { |
| 39 | |
| 40 | private final ThreadLocal<StepExecution> stepExecutionHolder = new ThreadLocal<StepExecution>(); |
| 41 | |
| 42 | private final JobExecution execution; |
| 43 | |
| 44 | private ExitStatus exitStatus = ExitStatus.EXECUTING; |
| 45 | |
| 46 | private final StepHandler stepHandler; |
| 47 | |
| 48 | private final JobRepository jobRepository; |
| 49 | |
| 50 | /** |
| 51 | * @param execution |
| 52 | */ |
| 53 | public JobFlowExecutor(JobRepository jobRepository, StepHandler stepHandler, JobExecution execution) { |
| 54 | this.jobRepository = jobRepository; |
| 55 | this.stepHandler = stepHandler; |
| 56 | this.execution = execution; |
| 57 | stepExecutionHolder.set(null); |
| 58 | } |
| 59 | |
| 60 | @Override |
| 61 | public String executeStep(Step step) throws JobInterruptedException, JobRestartException, |
| 62 | StartLimitExceededException { |
| 63 | boolean isRerun = isStepRestart(step); |
| 64 | StepExecution stepExecution = stepHandler.handleStep(step, execution); |
| 65 | stepExecutionHolder.set(stepExecution); |
| 66 | |
| 67 | if (stepExecution == null) { |
| 68 | return ExitStatus.COMPLETED.getExitCode(); |
| 69 | } |
| 70 | if (stepExecution.isTerminateOnly()) { |
| 71 | throw new JobInterruptedException("Step requested termination: "+stepExecution, stepExecution.getStatus()); |
| 72 | } |
| 73 | |
| 74 | if(isRerun) { |
| 75 | stepExecution.getExecutionContext().put("batch.restart", true); |
| 76 | } |
| 77 | |
| 78 | return stepExecution.getExitStatus().getExitCode(); |
| 79 | } |
| 80 | |
| 81 | private boolean isStepRestart(Step step) { |
| 82 | int count = jobRepository.getStepExecutionCount(execution.getJobInstance(), step.getName()); |
| 83 | |
| 84 | return count > 0; |
| 85 | } |
| 86 | |
| 87 | @Override |
| 88 | public void abandonStepExecution() { |
| 89 | StepExecution lastStepExecution = stepExecutionHolder.get(); |
| 90 | if (lastStepExecution != null && lastStepExecution.getStatus().isGreaterThan(BatchStatus.STOPPING)) { |
| 91 | lastStepExecution.upgradeStatus(BatchStatus.ABANDONED); |
| 92 | jobRepository.update(lastStepExecution); |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | @Override |
| 97 | public void updateJobExecutionStatus(FlowExecutionStatus status) { |
| 98 | execution.setStatus(findBatchStatus(status)); |
| 99 | exitStatus = exitStatus.and(new ExitStatus(status.getName())); |
| 100 | execution.setExitStatus(exitStatus); |
| 101 | } |
| 102 | |
| 103 | @Override |
| 104 | public JobExecution getJobExecution() { |
| 105 | return execution; |
| 106 | } |
| 107 | |
| 108 | @Override |
| 109 | public StepExecution getStepExecution() { |
| 110 | return stepExecutionHolder.get(); |
| 111 | } |
| 112 | |
| 113 | @Override |
| 114 | public void close(FlowExecution result) { |
| 115 | stepExecutionHolder.set(null); |
| 116 | } |
| 117 | |
| 118 | @Override |
| 119 | public boolean isRestart() { |
| 120 | if (getStepExecution() != null && getStepExecution().getStatus() == BatchStatus.ABANDONED) { |
| 121 | /* |
| 122 | * This is assumed to be the last step execution and it was marked |
| 123 | * abandoned, so we are in a restart of a stopped step. |
| 124 | */ |
| 125 | // TODO: mark the step execution in some more definitive way? |
| 126 | return true; |
| 127 | } |
| 128 | return execution.getStepExecutions().isEmpty(); |
| 129 | } |
| 130 | |
| 131 | @Override |
| 132 | public void addExitStatus(String code) { |
| 133 | exitStatus = exitStatus.and(new ExitStatus(code)); |
| 134 | } |
| 135 | |
| 136 | /** |
| 137 | * @param status |
| 138 | * @return |
| 139 | */ |
| 140 | private BatchStatus findBatchStatus(FlowExecutionStatus status) { |
| 141 | for (BatchStatus batchStatus : BatchStatus.values()) { |
| 142 | if (status.getName().startsWith(batchStatus.toString())) { |
| 143 | return batchStatus; |
| 144 | } |
| 145 | } |
| 146 | return BatchStatus.UNKNOWN; |
| 147 | } |
| 148 | |
| 149 | } |