View Javadoc

1   /*
2    * Copyright 2006-2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.osgi.service.importer.support.internal.collection;
18  
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.LinkedHashMap;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.osgi.framework.BundleContext;
29  import org.osgi.framework.Constants;
30  import org.osgi.framework.Filter;
31  import org.osgi.framework.ServiceEvent;
32  import org.osgi.framework.ServiceListener;
33  import org.osgi.framework.ServiceReference;
34  import org.springframework.beans.factory.DisposableBean;
35  import org.springframework.beans.factory.InitializingBean;
36  import org.springframework.osgi.service.ServiceUnavailableException;
37  import org.springframework.osgi.service.importer.DefaultOsgiServiceDependency;
38  import org.springframework.osgi.service.importer.ImportedOsgiServiceProxy;
39  import org.springframework.osgi.service.importer.OsgiServiceDependency;
40  import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
41  import org.springframework.osgi.service.importer.support.internal.aop.ProxyPlusCallback;
42  import org.springframework.osgi.service.importer.support.internal.aop.ServiceProxyCreator;
43  import org.springframework.osgi.service.importer.support.internal.dependency.ImporterStateListener;
44  import org.springframework.osgi.service.importer.support.internal.util.OsgiServiceBindingUtils;
45  import org.springframework.osgi.util.OsgiListenerUtils;
46  import org.springframework.util.Assert;
47  
48  /**
49   * OSGi service dynamic collection - allows iterating while the underlying
50   * storage is being shrunk/expanded. This collection is read-only - its content
51   * is being retrieved dynamically from the OSGi platform.
52   * 
53   * <p/> This collection and its iterators are thread-safe. That is, multiple
54   * threads can access the collection. However, since the collection is
55   * read-only, it cannot be modified by the client.
56   * 
57   * @see Collection
58   * @author Costin Leau
59   */
60  public class OsgiServiceCollection implements Collection, InitializingBean, CollectionProxy, DisposableBean {
61  
62  	/**
63  	 * Listener tracking the OSGi services which form the dynamic collection.
64  	 * 
65  	 * @author Costin Leau
66  	 */
67  	private class Listener implements ServiceListener {
68  
69  		public void serviceChanged(ServiceEvent event) {
70  			ClassLoader tccl = Thread.currentThread().getContextClassLoader();
71  
72  			try {
73  				Thread.currentThread().setContextClassLoader(classLoader);
74  				ServiceReference ref = event.getServiceReference();
75  				Long serviceId = (Long) ref.getProperty(Constants.SERVICE_ID);
76  				boolean collectionModified = false;
77  
78  				ProxyPlusCallback ppc = null;
79  				Object proxy = null;
80  
81  				// flag used for sending state events 
82  				boolean shouldInformStateListeners = false;
83  
84  				switch (event.getType()) {
85  
86  					case (ServiceEvent.REGISTERED):
87  					case (ServiceEvent.MODIFIED):
88  						// same as ServiceEvent.REGISTERED
89  						synchronized (serviceProxies) {
90  							if (!servicesIdMap.containsKey(serviceId)) {
91  								ppc = proxyCreator.createServiceProxy(ref);
92  								proxy = ppc.proxy;
93  								// let the dynamic collection decide if the service
94  								// is added or not (think set, sorted set)
95  								if (serviceProxies.add(proxy)) {
96  									collectionModified = true;
97  									// check if the list was empty before adding something to it
98  									shouldInformStateListeners = (serviceProxies.size() == 1);
99  									servicesIdMap.put(serviceId, ppc);
100 								}
101 							}
102 						}
103 						// inform listeners
104 						// TODO: should this be part of the lock also?
105 						if (collectionModified) {
106 							OsgiServiceBindingUtils.callListenersBind(context, proxy, ref, listeners);
107 
108 							if (serviceRequiredAtStartup && shouldInformStateListeners)
109 								notifySatisfiedStateListeners();
110 						}
111 
112 						break;
113 					case (ServiceEvent.UNREGISTERING):
114 						synchronized (serviceProxies) {
115 							// remove service id / proxy association
116 							ppc = (ProxyPlusCallback) servicesIdMap.remove(serviceId);
117 							if (ppc != null) {
118 								proxy = ppc.proxy;
119 								// before removal, allow analysis
120 								checkDeadProxies(proxy);
121 								// remove service proxy
122 								collectionModified = serviceProxies.remove(proxy);
123 								// invalidate it
124 								invalidateProxy(ppc);
125 
126 								// check if the list is empty
127 								shouldInformStateListeners = (serviceProxies.isEmpty());
128 							}
129 						}
130 						// TODO: should this be part of the lock also?
131 						if (collectionModified) {
132 							OsgiServiceBindingUtils.callListenersUnbind(context, proxy, ref, listeners);
133 
134 							if (serviceRequiredAtStartup && shouldInformStateListeners)
135 								notifyUnsatisfiedStateListeners();
136 						}
137 
138 						break;
139 
140 					default:
141 						throw new IllegalArgumentException("unsupported event type:" + event);
142 				}
143 			}
144 			// OSGi swallows these exceptions so make sure we get a chance to
145 			// see them.
146 			catch (Throwable re) {
147 				if (log.isWarnEnabled()) {
148 					log.warn("serviceChanged() processing failed", re);
149 				}
150 			}
151 			finally {
152 				Thread.currentThread().setContextClassLoader(tccl);
153 			}
154 		}
155 
156 		private void notifySatisfiedStateListeners() {
157 			synchronized (stateListeners) {
158 				for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
159 					ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
160 					stateListener.importerSatisfied(eventSource, dependency);
161 				}
162 			}
163 		}
164 
165 		private void notifyUnsatisfiedStateListeners() {
166 			synchronized (stateListeners) {
167 				for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
168 					ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
169 					stateListener.importerUnsatisfied(eventSource, dependency);
170 				}
171 			}
172 		}
173 	}
174 
175 	/**
176 	 * Read-only iterator wrapper around the dynamic collection iterator.
177 	 * 
178 	 * @author Costin Leau
179 	 * 
180 	 */
181 	protected class OsgiServiceIterator implements Iterator {
182 
183 		// dynamic thread-safe iterator
184 		private final Iterator iter = serviceProxies.iterator();
185 
186 
187 		public boolean hasNext() {
188 			mandatoryServiceCheck();
189 			return iter.hasNext();
190 		}
191 
192 		public Object next() {
193 			synchronized (serviceProxies) {
194 				mandatoryServiceCheck();
195 				Object proxy = iter.next();
196 				return (proxy == null ? tailDeadProxy : proxy);
197 			}
198 		}
199 
200 		public void remove() {
201 			// write operations disabled
202 			throw new UnsupportedOperationException();
203 		}
204 	}
205 
206 
207 	private static final Log log = LogFactory.getLog(OsgiServiceCollection.class);
208 
209 	// map of services
210 	// the service id is used as key while the service proxy is used for
211 	// values
212 	// Map<ServiceId, ImporterProxy>
213 	// 
214 	// NOTE: this collection is protected by the 'serviceProxies' lock.
215 	protected final Map servicesIdMap = new LinkedHashMap(8);
216 
217 	/**
218 	 * The dynamic collection.
219 	 */
220 	protected DynamicCollection serviceProxies;
221 
222 	/**
223 	 * Recall the last proxy for the rare case, where a service goes down
224 	 * between the #hasNext() and #next() call of an iterator.
225 	 * 
226 	 * Subclasses should implement their own strategy when it comes to assign a
227 	 * value to it through
228 	 */
229 	protected volatile Object tailDeadProxy;
230 
231 	private boolean serviceRequiredAtStartup = true;
232 
233 	private final Filter filter;
234 
235 	private final BundleContext context;
236 
237 	/** TCCL to set between calling listeners */
238 	private final ClassLoader classLoader;
239 
240 	/** Service proxy creator */
241 	private final ServiceProxyCreator proxyCreator;
242 
243 	private OsgiServiceLifecycleListener[] listeners = new OsgiServiceLifecycleListener[0];
244 
245 	private final ServiceListener listener;
246 
247 	/** state listener */
248 	private List stateListeners = Collections.EMPTY_LIST;
249 
250 	private final Object lock = new Object();
251 
252 	/** dependency object */
253 	private OsgiServiceDependency dependency;
254 
255 	/** dependable service importer */
256 	private Object eventSource;
257 
258 	/** event source (importer) name */
259 	private String sourceName;
260 
261 
262 	public OsgiServiceCollection(Filter filter, BundleContext context, ClassLoader classLoader,
263 			ServiceProxyCreator proxyCreator) {
264 		Assert.notNull(classLoader, "ClassLoader is required");
265 		Assert.notNull(context, "context is required");
266 
267 		this.filter = filter;
268 		this.context = context;
269 		this.classLoader = classLoader;
270 
271 		this.proxyCreator = proxyCreator;
272 		listener = new Listener();
273 	}
274 
275 	public void afterPropertiesSet() {
276 		// create service proxies collection
277 		this.serviceProxies = createInternalDynamicStorage();
278 
279 		boolean trace = log.isTraceEnabled();
280 
281 		dependency = new DefaultOsgiServiceDependency(sourceName, filter, serviceRequiredAtStartup);
282 
283 		if (trace)
284 			log.trace("Adding osgi listener for services matching [" + filter + "]");
285 		OsgiListenerUtils.addServiceListener(context, listener, filter);
286 
287 		if (serviceRequiredAtStartup) {
288 			if (trace)
289 				log.trace("1..x cardinality - looking for service [" + filter + "] at startup...");
290 			mandatoryServiceCheck();
291 		}
292 	}
293 
294 	public void destroy() {
295 		OsgiListenerUtils.removeServiceListener(context, listener);
296 
297 		synchronized (serviceProxies) {
298 			for (Iterator iterator = serviceProxies.iterator(); iterator.hasNext();) {
299 				ImportedOsgiServiceProxy serviceProxy = (ImportedOsgiServiceProxy) iterator.next();
300 				ServiceReference ref = serviceProxy.getServiceReference();
301 
302 				// get first the destruction callback
303 				ProxyPlusCallback ppc = (ProxyPlusCallback) servicesIdMap.get((Long) ref.getProperty(Constants.SERVICE_ID));
304 				listener.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, ref));
305 
306 				try {
307 					ppc.destructionCallback.destroy();
308 				}
309 				catch (Exception ex) {
310 					log.error("Exception occurred while destroying proxy " + ppc.proxy, ex);
311 				}
312 			}
313 
314 			serviceProxies.clear();
315 			servicesIdMap.clear();
316 		}
317 	}
318 
319 	/**
320 	 * Check to see whether at least one service is available.
321 	 */
322 	protected void mandatoryServiceCheck() {
323 		if (serviceRequiredAtStartup && serviceProxies.isEmpty())
324 			throw new ServiceUnavailableException(filter);
325 	}
326 
327 	public boolean isSatisfied() {
328 		if (serviceRequiredAtStartup)
329 			return (!serviceProxies.isEmpty());
330 		else
331 			return true;
332 	}
333 
334 	/**
335 	 * Create the dynamic storage used internally. The storage <strong>has</strong>
336 	 * to be thread-safe.
337 	 */
338 	protected DynamicCollection createInternalDynamicStorage() {
339 		return new DynamicCollection();
340 	}
341 
342 	private void invalidateProxy(ProxyPlusCallback ppc) {
343 		// don't do anything (the proxy will simply thrown an exception if still in use)
344 	}
345 
346 	/**
347 	 * Hook for tracking the last disappearing service to cope with the rare
348 	 * case, where the last service in the collection disappears between calls
349 	 * to hasNext() and next() on an iterator at the end of the collection.
350 	 * 
351 	 * @param proxy
352 	 */
353 	protected void checkDeadProxies(Object proxy, int proxyCollectionPos) {
354 		if (proxyCollectionPos == serviceProxies.size() - 1)
355 			tailDeadProxy = proxy;
356 	}
357 
358 	/**
359 	 * Private method, computing the index and share it with subclasses.
360 	 * 
361 	 * @param proxy
362 	 */
363 	private void checkDeadProxies(Object proxy) {
364 		// no need for a collection lock (already have it)
365 		int index = serviceProxies.indexOf(proxy);
366 		checkDeadProxies(proxy, index);
367 	}
368 
369 	public void setServiceImporter(Object importer) {
370 		this.eventSource = importer;
371 	}
372 
373 	public void setServiceImporterName(String name) {
374 		this.sourceName = name;
375 	}
376 
377 	public Iterator iterator() {
378 		return new OsgiServiceIterator();
379 	}
380 
381 	public int size() {
382 		mandatoryServiceCheck();
383 		return serviceProxies.size();
384 	}
385 
386 	public String toString() {
387 		mandatoryServiceCheck();
388 		synchronized (serviceProxies) {
389 			return serviceProxies.toString();
390 		}
391 	}
392 
393 	//
394 	// write operations forbidden
395 	//
396 	public boolean remove(Object o) {
397 		throw new UnsupportedOperationException();
398 	}
399 
400 	public boolean removeAll(Collection c) {
401 		throw new UnsupportedOperationException();
402 	}
403 
404 	public boolean add(Object o) {
405 		throw new UnsupportedOperationException();
406 	}
407 
408 	public boolean addAll(Collection c) {
409 		throw new UnsupportedOperationException();
410 	}
411 
412 	public void clear() {
413 		throw new UnsupportedOperationException();
414 	}
415 
416 	public boolean retainAll(Collection c) {
417 		throw new UnsupportedOperationException();
418 	}
419 
420 	public boolean contains(Object o) {
421 		mandatoryServiceCheck();
422 		return serviceProxies.contains(o);
423 	}
424 
425 	public boolean containsAll(Collection c) {
426 		mandatoryServiceCheck();
427 		return serviceProxies.containsAll(c);
428 	}
429 
430 	public boolean isEmpty() {
431 		mandatoryServiceCheck();
432 		return size() == 0;
433 	}
434 
435 	public Object[] toArray() {
436 		mandatoryServiceCheck();
437 		return serviceProxies.toArray();
438 	}
439 
440 	public Object[] toArray(Object[] array) {
441 		mandatoryServiceCheck();
442 		return serviceProxies.toArray(array);
443 	}
444 
445 	/**
446 	 * @param listeners The listeners to set.
447 	 */
448 	public void setListeners(OsgiServiceLifecycleListener[] listeners) {
449 		Assert.notNull(listeners);
450 		this.listeners = listeners;
451 	}
452 
453 	public void setRequiredAtStartup(boolean serviceRequiredAtStartup) {
454 		this.serviceRequiredAtStartup = serviceRequiredAtStartup;
455 	}
456 
457 	public void setStateListeners(List stateListeners) {
458 		synchronized (lock) {
459 			this.stateListeners = stateListeners;
460 		}
461 	}
462 }