1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.job.flow; |
18 | |
19 | import org.springframework.batch.core.BatchStatus; |
20 | import org.springframework.batch.core.ExitStatus; |
21 | import org.springframework.batch.core.JobExecution; |
22 | import org.springframework.batch.core.JobInterruptedException; |
23 | import org.springframework.batch.core.StartLimitExceededException; |
24 | import org.springframework.batch.core.Step; |
25 | import org.springframework.batch.core.StepExecution; |
26 | import org.springframework.batch.core.job.StepHandler; |
27 | import org.springframework.batch.core.repository.JobRepository; |
28 | import org.springframework.batch.core.repository.JobRestartException; |
29 | |
30 | /** |
31 | * Implementation of {@link FlowExecutor} for use in components that need to |
32 | * execute a flow related to a {@link JobExecution}. |
33 | * |
34 | * @author Dave Syer |
35 | * @author Michael Minella |
36 | * |
37 | */ |
38 | public class JobFlowExecutor implements FlowExecutor { |
39 | |
40 | private final ThreadLocal<StepExecution> stepExecutionHolder = new ThreadLocal<StepExecution>(); |
41 | |
42 | private final JobExecution execution; |
43 | |
44 | private ExitStatus exitStatus = ExitStatus.EXECUTING; |
45 | |
46 | private final StepHandler stepHandler; |
47 | |
48 | private final JobRepository jobRepository; |
49 | |
50 | /** |
51 | * @param execution |
52 | */ |
53 | public JobFlowExecutor(JobRepository jobRepository, StepHandler stepHandler, JobExecution execution) { |
54 | this.jobRepository = jobRepository; |
55 | this.stepHandler = stepHandler; |
56 | this.execution = execution; |
57 | stepExecutionHolder.set(null); |
58 | } |
59 | |
60 | @Override |
61 | public String executeStep(Step step) throws JobInterruptedException, JobRestartException, |
62 | StartLimitExceededException { |
63 | boolean isRerun = isStepRestart(step); |
64 | StepExecution stepExecution = stepHandler.handleStep(step, execution); |
65 | stepExecutionHolder.set(stepExecution); |
66 | |
67 | if (stepExecution == null) { |
68 | return ExitStatus.COMPLETED.getExitCode(); |
69 | } |
70 | if (stepExecution.isTerminateOnly()) { |
71 | throw new JobInterruptedException("Step requested termination: "+stepExecution, stepExecution.getStatus()); |
72 | } |
73 | |
74 | if(isRerun) { |
75 | stepExecution.getExecutionContext().put("batch.restart", true); |
76 | } |
77 | |
78 | return stepExecution.getExitStatus().getExitCode(); |
79 | } |
80 | |
81 | private boolean isStepRestart(Step step) { |
82 | int count = jobRepository.getStepExecutionCount(execution.getJobInstance(), step.getName()); |
83 | |
84 | return count > 0; |
85 | } |
86 | |
87 | @Override |
88 | public void abandonStepExecution() { |
89 | StepExecution lastStepExecution = stepExecutionHolder.get(); |
90 | if (lastStepExecution != null && lastStepExecution.getStatus().isGreaterThan(BatchStatus.STOPPING)) { |
91 | lastStepExecution.upgradeStatus(BatchStatus.ABANDONED); |
92 | jobRepository.update(lastStepExecution); |
93 | } |
94 | } |
95 | |
96 | @Override |
97 | public void updateJobExecutionStatus(FlowExecutionStatus status) { |
98 | execution.setStatus(findBatchStatus(status)); |
99 | exitStatus = exitStatus.and(new ExitStatus(status.getName())); |
100 | execution.setExitStatus(exitStatus); |
101 | } |
102 | |
103 | @Override |
104 | public JobExecution getJobExecution() { |
105 | return execution; |
106 | } |
107 | |
108 | @Override |
109 | public StepExecution getStepExecution() { |
110 | return stepExecutionHolder.get(); |
111 | } |
112 | |
113 | @Override |
114 | public void close(FlowExecution result) { |
115 | stepExecutionHolder.set(null); |
116 | } |
117 | |
118 | @Override |
119 | public boolean isRestart() { |
120 | if (getStepExecution() != null && getStepExecution().getStatus() == BatchStatus.ABANDONED) { |
121 | /* |
122 | * This is assumed to be the last step execution and it was marked |
123 | * abandoned, so we are in a restart of a stopped step. |
124 | */ |
125 | // TODO: mark the step execution in some more definitive way? |
126 | return true; |
127 | } |
128 | return execution.getStepExecutions().isEmpty(); |
129 | } |
130 | |
131 | @Override |
132 | public void addExitStatus(String code) { |
133 | exitStatus = exitStatus.and(new ExitStatus(code)); |
134 | } |
135 | |
136 | /** |
137 | * @param status |
138 | * @return |
139 | */ |
140 | private BatchStatus findBatchStatus(FlowExecutionStatus status) { |
141 | for (BatchStatus batchStatus : BatchStatus.values()) { |
142 | if (status.getName().startsWith(batchStatus.toString())) { |
143 | return batchStatus; |
144 | } |
145 | } |
146 | return BatchStatus.UNKNOWN; |
147 | } |
148 | |
149 | } |