1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.repository.dao; |
17 | |
18 | import java.lang.reflect.Field; |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Comparator; |
22 | import java.util.List; |
23 | import java.util.Map; |
24 | import java.util.concurrent.ConcurrentHashMap; |
25 | import java.util.concurrent.atomic.AtomicLong; |
26 | |
27 | import org.springframework.batch.core.Entity; |
28 | import org.springframework.batch.core.JobExecution; |
29 | import org.springframework.batch.core.StepExecution; |
30 | import org.springframework.batch.support.SerializationUtils; |
31 | import org.springframework.dao.OptimisticLockingFailureException; |
32 | import org.springframework.util.Assert; |
33 | import org.springframework.util.ReflectionUtils; |
34 | |
35 | /** |
36 | * In-memory implementation of {@link StepExecutionDao}. |
37 | */ |
38 | public class MapStepExecutionDao implements StepExecutionDao { |
39 | |
40 | private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>(); |
41 | |
42 | private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>(); |
43 | |
44 | private AtomicLong currentId = new AtomicLong(); |
45 | |
46 | public void clear() { |
47 | executionsByJobExecutionId.clear(); |
48 | executionsByStepExecutionId.clear(); |
49 | } |
50 | |
51 | private static StepExecution copy(StepExecution original) { |
52 | return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); |
53 | } |
54 | |
55 | private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) { |
56 | // Cheaper than full serialization is a reflective field copy, which is |
57 | // fine for volatile storage |
58 | ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() { |
59 | public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { |
60 | field.setAccessible(true); |
61 | field.set(targetExecution, field.get(sourceExecution)); |
62 | } |
63 | }); |
64 | } |
65 | |
66 | public void saveStepExecution(StepExecution stepExecution) { |
67 | |
68 | Assert.isTrue(stepExecution.getId() == null); |
69 | Assert.isTrue(stepExecution.getVersion() == null); |
70 | Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already."); |
71 | |
72 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); |
73 | if (executions == null) { |
74 | executions = new ConcurrentHashMap<Long, StepExecution>(); |
75 | executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions); |
76 | } |
77 | |
78 | stepExecution.setId(currentId.incrementAndGet()); |
79 | stepExecution.incrementVersion(); |
80 | StepExecution copy = copy(stepExecution); |
81 | executions.put(stepExecution.getId(), copy); |
82 | executionsByStepExecutionId.put(stepExecution.getId(), copy); |
83 | |
84 | } |
85 | |
86 | public void updateStepExecution(StepExecution stepExecution) { |
87 | |
88 | Assert.notNull(stepExecution.getJobExecutionId()); |
89 | |
90 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); |
91 | Assert.notNull(executions, "step executions for given job execution are expected to be already saved"); |
92 | |
93 | final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId()); |
94 | Assert.notNull(persistedExecution, "step execution is expected to be already saved"); |
95 | |
96 | synchronized (stepExecution) { |
97 | if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) { |
98 | throw new OptimisticLockingFailureException("Attempt to update step execution id=" |
99 | + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() |
100 | + "), where current version is " + persistedExecution.getVersion()); |
101 | } |
102 | |
103 | stepExecution.incrementVersion(); |
104 | StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); |
105 | copy(stepExecution, copy); |
106 | executions.put(stepExecution.getId(), copy); |
107 | executionsByStepExecutionId.put(stepExecution.getId(), copy); |
108 | } |
109 | } |
110 | |
111 | public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) { |
112 | return executionsByStepExecutionId.get(stepExecutionId); |
113 | } |
114 | |
115 | public void addStepExecutions(JobExecution jobExecution) { |
116 | Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId()); |
117 | if (executions == null || executions.isEmpty()) { |
118 | return; |
119 | } |
120 | List<StepExecution> result = new ArrayList<StepExecution>(executions.values()); |
121 | Collections.sort(result, new Comparator<Entity>() { |
122 | |
123 | public int compare(Entity o1, Entity o2) { |
124 | return Long.signum(o2.getId() - o1.getId()); |
125 | } |
126 | }); |
127 | |
128 | List<StepExecution> copy = new ArrayList<StepExecution>(result.size()); |
129 | for (StepExecution exec : result) { |
130 | copy.add(copy(exec)); |
131 | } |
132 | jobExecution.addStepExecutions(copy); |
133 | } |
134 | } |