1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Comparator; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | |
27 | import java.util.concurrent.ConcurrentHashMap; |
28 | import java.util.concurrent.ConcurrentMap; |
29 | |
30 | import java.util.concurrent.atomic.AtomicLong; |
31 | |
32 | import org.springframework.batch.core.JobExecution; |
33 | import org.springframework.batch.core.JobInstance; |
34 | import org.springframework.batch.support.SerializationUtils; |
35 | import org.springframework.dao.OptimisticLockingFailureException; |
36 | import org.springframework.util.Assert; |
37 | |
38 | /** |
39 | * In-memory implementation of {@link JobExecutionDao}. |
40 | */ |
41 | public class MapJobExecutionDao implements JobExecutionDao { |
42 | |
43 | // JDK6 Make this into a ConcurrentSkipListMap: adds and removes tend to be very near the front or back |
44 | private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>(); |
45 | |
46 | private final AtomicLong currentId = new AtomicLong(0L); |
47 | |
48 | public void clear() { |
49 | executionsById.clear(); |
50 | } |
51 | |
52 | private static JobExecution copy(JobExecution original) { |
53 | JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); |
54 | return copy; |
55 | } |
56 | |
57 | public void saveJobExecution(JobExecution jobExecution) { |
58 | Assert.isTrue(jobExecution.getId() == null); |
59 | Long newId = currentId.getAndIncrement(); |
60 | jobExecution.setId(newId); |
61 | jobExecution.incrementVersion(); |
62 | executionsById.put(newId, copy(jobExecution)); |
63 | } |
64 | |
65 | public List<JobExecution> findJobExecutions(JobInstance jobInstance) { |
66 | List<JobExecution> executions = new ArrayList<JobExecution>(); |
67 | for (JobExecution exec : executionsById.values()) { |
68 | if (exec.getJobInstance().equals(jobInstance)) { |
69 | executions.add(copy(exec)); |
70 | } |
71 | } |
72 | Collections.sort(executions, new Comparator<JobExecution>() { |
73 | |
74 | public int compare(JobExecution e1, JobExecution e2) { |
75 | long result = (e1.getId() - e2.getId()); |
76 | if (result > 0) { |
77 | return -1; |
78 | } |
79 | else if (result < 0) { |
80 | return 1; |
81 | } |
82 | else { |
83 | return 0; |
84 | } |
85 | } |
86 | }); |
87 | return executions; |
88 | } |
89 | |
90 | public void updateJobExecution(JobExecution jobExecution) { |
91 | Long id = jobExecution.getId(); |
92 | Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)"); |
93 | JobExecution persistedExecution = executionsById.get(id); |
94 | Assert.notNull(persistedExecution, "JobExecution must already be saved"); |
95 | |
96 | synchronized (jobExecution) { |
97 | if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) { |
98 | throw new OptimisticLockingFailureException("Attempt to update step execution id=" + id |
99 | + " with wrong version (" + jobExecution.getVersion() + "), where current version is " |
100 | + persistedExecution.getVersion()); |
101 | } |
102 | jobExecution.incrementVersion(); |
103 | executionsById.put(id, copy(jobExecution)); |
104 | } |
105 | } |
106 | |
107 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
108 | JobExecution lastExec = null; |
109 | for (JobExecution exec : executionsById.values()) { |
110 | if (!exec.getJobInstance().equals(jobInstance)) { |
111 | continue; |
112 | } |
113 | if (lastExec == null) { |
114 | lastExec = exec; |
115 | } |
116 | if (lastExec.getCreateTime().before(exec.getCreateTime())) { |
117 | lastExec = exec; |
118 | } |
119 | } |
120 | return copy(lastExec); |
121 | } |
122 | |
123 | /* |
124 | * (non-Javadoc) |
125 | * |
126 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
127 | * findRunningJobExecutions(java.lang.String) |
128 | */ |
129 | public Set<JobExecution> findRunningJobExecutions(String jobName) { |
130 | Set<JobExecution> result = new HashSet<JobExecution>(); |
131 | for (JobExecution exec : executionsById.values()) { |
132 | if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) { |
133 | continue; |
134 | } |
135 | result.add(copy(exec)); |
136 | } |
137 | return result; |
138 | } |
139 | |
140 | /* |
141 | * (non-Javadoc) |
142 | * |
143 | * @see |
144 | * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution |
145 | * (java.lang.Long) |
146 | */ |
147 | public JobExecution getJobExecution(Long executionId) { |
148 | return copy(executionsById.get(executionId)); |
149 | } |
150 | |
151 | public void synchronizeStatus(JobExecution jobExecution) { |
152 | JobExecution saved = getJobExecution(jobExecution.getId()); |
153 | if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) { |
154 | jobExecution.upgradeStatus(saved.getStatus()); |
155 | jobExecution.setVersion(saved.getVersion()); |
156 | } |
157 | } |
158 | } |