1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.repository.dao; |
17 | |
18 | import java.lang.reflect.Field; |
19 | import java.util.ArrayList; |
20 | import java.util.Collection; |
21 | import java.util.Collections; |
22 | import java.util.Comparator; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | import java.util.concurrent.ConcurrentHashMap; |
26 | import java.util.concurrent.atomic.AtomicLong; |
27 | |
28 | import org.springframework.batch.core.Entity; |
29 | import org.springframework.batch.core.JobExecution; |
30 | import org.springframework.batch.core.StepExecution; |
31 | import org.springframework.batch.support.SerializationUtils; |
32 | import org.springframework.dao.OptimisticLockingFailureException; |
33 | import org.springframework.util.Assert; |
34 | import org.springframework.util.ReflectionUtils; |
35 | |
36 | /** |
37 | * In-memory implementation of {@link StepExecutionDao}. |
38 | */ |
39 | public class MapStepExecutionDao implements StepExecutionDao { |
40 | |
41 | private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>(); |
42 | |
43 | private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>(); |
44 | |
45 | private AtomicLong currentId = new AtomicLong(); |
46 | |
47 | public void clear() { |
48 | executionsByJobExecutionId.clear(); |
49 | executionsByStepExecutionId.clear(); |
50 | } |
51 | |
52 | private static StepExecution copy(StepExecution original) { |
53 | return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); |
54 | } |
55 | |
56 | private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) { |
57 | // Cheaper than full serialization is a reflective field copy, which is |
58 | // fine for volatile storage |
59 | ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() { |
60 | @Override |
61 | public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { |
62 | field.setAccessible(true); |
63 | field.set(targetExecution, field.get(sourceExecution)); |
64 | } |
65 | }); |
66 | } |
67 | |
68 | @Override |
69 | public void saveStepExecution(StepExecution stepExecution) { |
70 | |
71 | Assert.isTrue(stepExecution.getId() == null); |
72 | Assert.isTrue(stepExecution.getVersion() == null); |
73 | Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already."); |
74 | |
75 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); |
76 | if (executions == null) { |
77 | executions = new ConcurrentHashMap<Long, StepExecution>(); |
78 | executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions); |
79 | } |
80 | |
81 | stepExecution.setId(currentId.incrementAndGet()); |
82 | stepExecution.incrementVersion(); |
83 | StepExecution copy = copy(stepExecution); |
84 | executions.put(stepExecution.getId(), copy); |
85 | executionsByStepExecutionId.put(stepExecution.getId(), copy); |
86 | |
87 | } |
88 | |
89 | @Override |
90 | public void updateStepExecution(StepExecution stepExecution) { |
91 | |
92 | Assert.notNull(stepExecution.getJobExecutionId()); |
93 | |
94 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); |
95 | Assert.notNull(executions, "step executions for given job execution are expected to be already saved"); |
96 | |
97 | final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId()); |
98 | Assert.notNull(persistedExecution, "step execution is expected to be already saved"); |
99 | |
100 | synchronized (stepExecution) { |
101 | if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) { |
102 | throw new OptimisticLockingFailureException("Attempt to update step execution id=" |
103 | + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() |
104 | + "), where current version is " + persistedExecution.getVersion()); |
105 | } |
106 | |
107 | stepExecution.incrementVersion(); |
108 | StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); |
109 | copy(stepExecution, copy); |
110 | executions.put(stepExecution.getId(), copy); |
111 | executionsByStepExecutionId.put(stepExecution.getId(), copy); |
112 | } |
113 | } |
114 | |
115 | @Override |
116 | public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) { |
117 | return executionsByStepExecutionId.get(stepExecutionId); |
118 | } |
119 | |
120 | @Override |
121 | public void addStepExecutions(JobExecution jobExecution) { |
122 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId()); |
123 | if (executions == null || executions.isEmpty()) { |
124 | return; |
125 | } |
126 | List<StepExecution> result = new ArrayList<StepExecution>(executions.values()); |
127 | Collections.sort(result, new Comparator<Entity>() { |
128 | |
129 | @Override |
130 | public int compare(Entity o1, Entity o2) { |
131 | return Long.signum(o2.getId() - o1.getId()); |
132 | } |
133 | }); |
134 | |
135 | List<StepExecution> copy = new ArrayList<StepExecution>(result.size()); |
136 | for (StepExecution exec : result) { |
137 | copy.add(copy(exec)); |
138 | } |
139 | jobExecution.addStepExecutions(copy); |
140 | } |
141 | |
142 | @Override |
143 | public void saveStepExecutions(Collection<StepExecution> stepExecutions) { |
144 | Assert.notNull(stepExecutions,"Attempt to save an null collect of step executions"); |
145 | for (StepExecution stepExecution: stepExecutions) { |
146 | saveStepExecution(stepExecution); |
147 | } |
148 | } |
149 | } |