View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.repository.dao;
17  
18  import java.lang.reflect.Field;
19  import java.util.ArrayList;
20  import java.util.Collections;
21  import java.util.Comparator;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.springframework.batch.core.Entity;
28  import org.springframework.batch.core.JobExecution;
29  import org.springframework.batch.core.StepExecution;
30  import org.springframework.batch.support.SerializationUtils;
31  import org.springframework.dao.OptimisticLockingFailureException;
32  import org.springframework.util.Assert;
33  import org.springframework.util.ReflectionUtils;
34  
35  /**
36   * In-memory implementation of {@link StepExecutionDao}.
37   */
38  public class MapStepExecutionDao implements StepExecutionDao {
39  
40  	private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>();
41  
42  	private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>();
43  
44  	private AtomicLong currentId = new AtomicLong();
45  
46  	public void clear() {
47  		executionsByJobExecutionId.clear();
48  		executionsByStepExecutionId.clear();
49  	}
50  
51  	private static StepExecution copy(StepExecution original) {
52  		return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
53  	}
54  
55  	private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) {
56  		// Cheaper than full serialization is a reflective field copy, which is
57  		// fine for volatile storage
58  		ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() {
59  			@Override
60  			public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
61  				field.setAccessible(true);
62  				field.set(targetExecution, field.get(sourceExecution));
63  			}
64  		});
65  	}
66  
67  	@Override
68  	public void saveStepExecution(StepExecution stepExecution) {
69  
70  		Assert.isTrue(stepExecution.getId() == null);
71  		Assert.isTrue(stepExecution.getVersion() == null);
72  		Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already.");
73  
74  		Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
75  		if (executions == null) {
76  			executions = new ConcurrentHashMap<Long, StepExecution>();
77  			executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions);
78  		}
79  
80  		stepExecution.setId(currentId.incrementAndGet());
81  		stepExecution.incrementVersion();
82  		StepExecution copy = copy(stepExecution);
83  		executions.put(stepExecution.getId(), copy);
84  		executionsByStepExecutionId.put(stepExecution.getId(), copy);
85  
86  	}
87  
88  	@Override
89  	public void updateStepExecution(StepExecution stepExecution) {
90  
91  		Assert.notNull(stepExecution.getJobExecutionId());
92  
93  		Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
94  		Assert.notNull(executions, "step executions for given job execution are expected to be already saved");
95  
96  		final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId());
97  		Assert.notNull(persistedExecution, "step execution is expected to be already saved");
98  
99  		synchronized (stepExecution) {
100 			if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) {
101 				throw new OptimisticLockingFailureException("Attempt to update step execution id="
102 						+ stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
103 						+ "), where current version is " + persistedExecution.getVersion());
104 			}
105 
106 			stepExecution.incrementVersion();
107 			StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
108 			copy(stepExecution, copy);
109 			executions.put(stepExecution.getId(), copy);
110 			executionsByStepExecutionId.put(stepExecution.getId(), copy);
111 		}
112 	}
113 
114 	@Override
115 	public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
116 		return executionsByStepExecutionId.get(stepExecutionId);
117 	}
118 
119 	@Override
120 	public void addStepExecutions(JobExecution jobExecution) {
121 		Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId());
122 		if (executions == null || executions.isEmpty()) {
123 			return;
124 		}
125 		List<StepExecution> result = new ArrayList<StepExecution>(executions.values());
126 		Collections.sort(result, new Comparator<Entity>() {
127 
128 			@Override
129 			public int compare(Entity o1, Entity o2) {
130 				return Long.signum(o2.getId() - o1.getId());
131 			}
132 		});
133 
134 		List<StepExecution> copy = new ArrayList<StepExecution>(result.size());
135 		for (StepExecution exec : result) {
136 			copy.add(copy(exec));
137 		}
138 		jobExecution.addStepExecutions(copy);
139 	}
140 }