View Javadoc

1   /*
2    * Copyright 2006-2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.osgi.service.importer.support.internal.aop;
18  
19  import java.util.Collections;
20  import java.util.Iterator;
21  import java.util.List;
22  
23  import org.osgi.framework.BundleContext;
24  import org.osgi.framework.Constants;
25  import org.osgi.framework.Filter;
26  import org.osgi.framework.ServiceEvent;
27  import org.osgi.framework.ServiceListener;
28  import org.osgi.framework.ServiceReference;
29  import org.springframework.beans.factory.InitializingBean;
30  import org.springframework.context.ApplicationEvent;
31  import org.springframework.context.ApplicationEventPublisher;
32  import org.springframework.context.ApplicationEventPublisherAware;
33  import org.springframework.osgi.service.ServiceUnavailableException;
34  import org.springframework.osgi.service.importer.DefaultOsgiServiceDependency;
35  import org.springframework.osgi.service.importer.OsgiServiceDependency;
36  import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
37  import org.springframework.osgi.service.importer.ServiceProxyDestroyedException;
38  import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitEndedEvent;
39  import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitTimedOutEvent;
40  import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitStartingEvent;
41  import org.springframework.osgi.service.importer.support.internal.dependency.ImporterStateListener;
42  import org.springframework.osgi.service.importer.support.internal.support.DefaultRetryCallback;
43  import org.springframework.osgi.service.importer.support.internal.support.RetryCallback;
44  import org.springframework.osgi.service.importer.support.internal.support.RetryTemplate;
45  import org.springframework.osgi.service.importer.support.internal.support.ServiceWrapper;
46  import org.springframework.osgi.service.importer.support.internal.util.OsgiServiceBindingUtils;
47  import org.springframework.osgi.util.OsgiListenerUtils;
48  import org.springframework.osgi.util.OsgiServiceReferenceUtils;
49  import org.springframework.util.Assert;
50  import org.springframework.util.ObjectUtils;
51  
52  /**
53   * Interceptor adding dynamic behaviour for unary service (..1 cardinality). It
54   * will look for a service using the given filter, retrying if the service is
55   * down or unavailable. Will dynamically rebound a new service, if one is
56   * available with a higher service ranking. <p/> <p/> In case no service is
57   * available, it will throw an exception.
58   * 
59   * <p/> <strong>Note</strong>: this is a stateful interceptor and should not be
60   * shared.
61   * 
62   * @author Costin Leau
63   */
64  public class ServiceDynamicInterceptor extends ServiceInvoker implements InitializingBean,
65  		ApplicationEventPublisherAware {
66  
67  	/**
68  	 * Override the default implementation to plug in event notification.
69  	 * 
70  	 * @author Costin Leau
71  	 * 
72  	 */
73  	private class EventSenderRetryTemplate extends RetryTemplate {
74  
75  		public EventSenderRetryTemplate(RetryTemplate retryTemplate) {
76  			super(retryTemplate);
77  		}
78  
79  		public Object execute(RetryCallback callback, Object notificationLock) {
80  			//send event
81  			publishEvent(new OsgiServiceDependencyWaitStartingEvent(eventSource, dependency, this.getWaitTime()
82  					* this.getRetryNumbers()));
83  
84  			Object result = null;
85  
86  			long start = System.currentTimeMillis();
87  			long stop;
88  
89  			try {
90  				result = super.execute(callback, notificationLock);
91  				stop = System.currentTimeMillis() - start;
92  			}
93  			catch (RuntimeException exception) {
94  				stop = System.currentTimeMillis() - start;
95  				publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
96  				throw exception;
97  			}
98  
99  			// send finalization event
100 			if (callback.isComplete(result)) {
101 				publishEvent(new OsgiServiceDependencyWaitEndedEvent(eventSource, dependency, stop));
102 			}
103 			else {
104 				publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
105 			}
106 
107 			return result;
108 		}
109 	}
110 
111 	/**
112 	 * Listener tracking the OSGi services which form the dynamic reference.
113 	 */
114 	// NOTE: while the listener here seems to share the same functionality as
115 	// the one in ServiceCollection in reality there are a big number of
116 	// differences in them - for example this one supports rebind
117 	// while the collection does not.
118 	//
119 	// the only common part is the TCCL handling before calling the listeners.
120 	private class Listener implements ServiceListener {
121 
122 		public void serviceChanged(ServiceEvent event) {
123 			ClassLoader tccl = Thread.currentThread().getContextClassLoader();
124 			try {
125 				Thread.currentThread().setContextClassLoader(classLoader);
126 				ServiceReference ref = event.getServiceReference();
127 
128 				// service id
129 				long serviceId = ((Long) ref.getProperty(Constants.SERVICE_ID)).longValue();
130 				// service ranking
131 				Integer rank = (Integer) ref.getProperty(Constants.SERVICE_RANKING);
132 				int ranking = (rank == null ? 0 : rank.intValue());
133 
134 				boolean debug = log.isDebugEnabled();
135 
136 				switch (event.getType()) {
137 
138 					case (ServiceEvent.REGISTERED):
139 						// same as ServiceEvent.REGISTERED
140 					case (ServiceEvent.MODIFIED): {
141 						// flag indicating if the service is bound or rebound
142 						boolean servicePresent = false;
143 
144 						synchronized (lock) {
145 							servicePresent = (wrapper != null && wrapper.isServiceAlive());
146 						}
147 
148 						if (updateWrapperIfNecessary(ref, serviceId, ranking)) {
149 							// inform listeners
150 							OsgiServiceBindingUtils.callListenersBind(bundleContext, proxy, ref, listeners);
151 
152 							if (!servicePresent) {
153 								notifySatisfiedStateListeners();
154 							}
155 						}
156 
157 						break;
158 					}
159 					case (ServiceEvent.UNREGISTERING): {
160 
161 						boolean serviceRemoved = false;
162 						/**
163 						 * used if the service goes down and there is no
164 						 * replacement
165 						 */
166 						/**
167 						 * since the listeners will require a valid proxy, the
168 						 * invalidation has to happen *after* calling the
169 						 * listeners
170 						 */
171 						ServiceWrapper oldWrapper = wrapper;
172 
173 						synchronized (lock) {
174 							// remove service
175 							if (wrapper != null) {
176 								if (serviceId == wrapper.getServiceId()) {
177 									serviceRemoved = true;
178 									wrapper = null;
179 
180 								}
181 							}
182 						}
183 
184 						ServiceReference newReference = null;
185 
186 						boolean isDestroyed = false;
187 
188 						synchronized (lock) {
189 							isDestroyed = destroyed;
190 						}
191 
192 						// discover a new reference only if we are still running
193 						if (!isDestroyed) {
194 							newReference = OsgiServiceReferenceUtils.getServiceReference(bundleContext,
195 								(filter == null ? null : filter.toString()));
196 
197 							// we have a rebind (a new service was bound)
198 							// so another candidate has to be searched from the existing candidates
199 							// - as they are alive already, we have to send an event for them ourselves
200 							// MODIFIED will be used for clarity
201 							if (newReference != null) {
202 								// update the listeners (through a MODIFIED event
203 								serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, newReference));
204 							}
205 						}
206 
207 						// if no new reference was found and the service was indeed removed (it was bound to the interceptor)
208 						// then do an unbind
209 						if (newReference == null && serviceRemoved) {
210 
211 							// reuse the old service for the time being
212 							synchronized (lock) {
213 								wrapper = oldWrapper;
214 							}
215 
216 							// inform listeners
217 							OsgiServiceBindingUtils.callListenersUnbind(bundleContext, proxy, ref, listeners);
218 
219 							// clean up wrapper
220 							synchronized (lock) {
221 								wrapper = null;
222 							}
223 
224 							if (debug) {
225 								String message = "Service reference [" + ref + "] was unregistered";
226 								if (serviceRemoved) {
227 									message += " and unbound from the service proxy";
228 								}
229 								else {
230 									message += " but did not affect the service proxy";
231 								}
232 								log.debug(message);
233 							}
234 
235 							// update internal state listeners (unsatisfied event)
236 							notifyUnsatisfiedStateListeners();
237 						}
238 
239 						break;
240 					}
241 					default:
242 						throw new IllegalArgumentException("unsupported event type");
243 				}
244 			}
245 			catch (Throwable e) {
246 				// The framework will swallow these exceptions without logging,
247 				// so log them here
248 				log.fatal("Exception during service event handling", e);
249 			}
250 			finally {
251 				Thread.currentThread().setContextClassLoader(tccl);
252 			}
253 		}
254 
255 		private void notifySatisfiedStateListeners() {
256 			synchronized (stateListeners) {
257 				for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
258 					ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
259 					stateListener.importerSatisfied(eventSource, dependency);
260 				}
261 			}
262 		}
263 
264 		private void notifyUnsatisfiedStateListeners() {
265 			synchronized (stateListeners) {
266 				for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
267 					ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
268 					stateListener.importerUnsatisfied(eventSource, dependency);
269 				}
270 			}
271 		}
272 
273 		private boolean updateWrapperIfNecessary(ServiceReference ref, long serviceId, int serviceRanking) {
274 			boolean updated = false;
275 			try {
276 				synchronized (lock) {
277 					if (wrapper != null && wrapper.isServiceAlive()) {
278 						// if have a higher rank service
279 						if (serviceRanking > wrapper.getServiceRanking()) {
280 							updated = true;
281 							updateReferenceHolders(ref);
282 						}
283 						// if equality, use the service id
284 						if (serviceRanking == wrapper.getServiceRanking()) {
285 							if (serviceId < wrapper.getServiceId()) {
286 								updated = true;
287 								updateReferenceHolders(ref);
288 							}
289 						}
290 					}
291 					// we don't have any valid services bounded yet so just bind
292 					// the new
293 					// one
294 					else {
295 						updated = true;
296 						updateReferenceHolders(ref);
297 					}
298 					lock.notifyAll();
299 					return updated;
300 				}
301 			}
302 			finally {
303 				if (log.isDebugEnabled()) {
304 					String message = "Service reference [" + ref + "]";
305 					if (updated)
306 						message += " bound to proxy";
307 					else
308 						message += " not bound to proxy";
309 					log.debug(message);
310 				}
311 			}
312 		}
313 
314 		/**
315 		 * Update internal holders for the backing ServiceReference.
316 		 * 
317 		 * @param ref
318 		 */
319 		private void updateReferenceHolders(ServiceReference ref) {
320 			// no need for a lock since this method is called from a synchronized block
321 			wrapper = new ServiceWrapper(ref, bundleContext);
322 			referenceDelegate.swapDelegates(ref);
323 		}
324 	}
325 
326 
327 	private static final int hashCode = ServiceDynamicInterceptor.class.hashCode() * 13;
328 
329 	private final BundleContext bundleContext;
330 
331 	private final Filter filter;
332 
333 	/** TCCL to set when calling listeners */
334 	private final ClassLoader classLoader;
335 
336 	private final SwappingServiceReferenceProxy referenceDelegate;
337 
338 	/** event listener */
339 	private final ServiceListener listener;
340 
341 	/** mandatory flag */
342 	private boolean serviceRequiredAtStartup = true;
343 
344 	/** flag indicating whether the destruction has started or not */
345 	private boolean isDuringDestruction = false;
346 
347 	/** flag indicating whether the proxy is already destroyed or not */
348 	private boolean destroyed = false;
349 
350 	/** private lock */
351 	private final Object lock = new Object();
352 
353 	/** utility service wrapper */
354 	private ServiceWrapper wrapper;
355 
356 	/** retry template */
357 	private RetryTemplate retryTemplate;
358 
359 	/** dependable service importer */
360 	private Object eventSource;
361 
362 	/** event source (importer) name */
363 	private String sourceName;
364 
365 	/** listener that need to be informed of bind/rebind/unbind */
366 	private OsgiServiceLifecycleListener[] listeners = new OsgiServiceLifecycleListener[0];
367 
368 	/** reference to the created proxy passed to the listeners */
369 	private Object proxy;
370 
371 	/** event publisher */
372 	private ApplicationEventPublisher applicationEventPublisher;
373 
374 	/** dependency object */
375 	private OsgiServiceDependency dependency;
376 
377 	/** internal state listeners */
378 	private List stateListeners = Collections.EMPTY_LIST;
379 
380 
381 	public ServiceDynamicInterceptor(BundleContext context, Filter filter, ClassLoader classLoader) {
382 		this.bundleContext = context;
383 		this.filter = filter;
384 		this.classLoader = classLoader;
385 
386 		referenceDelegate = new SwappingServiceReferenceProxy();
387 		listener = new Listener();
388 	}
389 
390 	public Object getTarget() {
391 		Object target = lookupService();
392 
393 		// nothing found
394 		if (target == null) {
395 			throw new ServiceUnavailableException(filter);
396 		}
397 		return target;
398 	}
399 
400 	/**
401 	 * Look the service by waiting the service to appear. Note this method
402 	 * should use the same lock as the listener handling the service reference.
403 	 */
404 	private Object lookupService() {
405 		synchronized (lock) {
406 			if (destroyed && !isDuringDestruction)
407 				throw new ServiceProxyDestroyedException();
408 
409 			return (Object) retryTemplate.execute(new DefaultRetryCallback() {
410 
411 				public Object doWithRetry() {
412 					return (wrapper != null) ? wrapper.getService() : null;
413 				}
414 			}, lock);
415 		}
416 	}
417 
418 	private void publishEvent(ApplicationEvent event) {
419 		if (applicationEventPublisher != null) {
420 			if (log.isTraceEnabled())
421 				log.trace("Publishing event through publisher " + applicationEventPublisher);
422 			try {
423 				applicationEventPublisher.publishEvent(event);
424 			}
425 			catch (IllegalStateException ise) {
426 				log.debug(
427 					"Event "
428 							+ event
429 							+ " not published as the publisher is not initialized - usually this is caused by eager initialization of the importers by post processing",
430 					ise);
431 			}
432 
433 		}
434 		else if (log.isTraceEnabled())
435 			log.trace("No application event publisher set; no events will be published");
436 	}
437 
438 	public void afterPropertiesSet() {
439 		Assert.notNull(proxy);
440 		Assert.notNull(eventSource);
441 
442 		boolean debug = log.isDebugEnabled();
443 
444 		retryTemplate = new EventSenderRetryTemplate(retryTemplate);
445 
446 		dependency = new DefaultOsgiServiceDependency(sourceName, filter, serviceRequiredAtStartup);
447 
448 		if (debug)
449 			log.debug("Adding OSGi mandatoryListeners for services matching [" + filter + "]");
450 		OsgiListenerUtils.addSingleServiceListener(bundleContext, listener, filter);
451 
452 		if (serviceRequiredAtStartup) {
453 			if (debug)
454 				log.debug("1..x cardinality - looking for service [" + filter + "] at startup...");
455 			Object target = getTarget();
456 			if (debug)
457 				log.debug("Service retrieved " + target);
458 		}
459 	}
460 
461 	public void destroy() {
462 		OsgiListenerUtils.removeServiceListener(bundleContext, listener);
463 		synchronized (lock) {
464 			// set this flag first to make sure no rebind is done
465 			destroyed = true;
466 			isDuringDestruction = true;
467 			if (wrapper != null) {
468 				ServiceReference ref = wrapper.getReference();
469 				if (ref != null) {
470 					// send unregistration event to the listener
471 					listener.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, ref));
472 				}
473 			}
474 			/** destruction process has ended */
475 			isDuringDestruction = false;
476 		}
477 	}
478 
479 	/**
480 	 * {@inheritDoc}
481 	 * 
482 	 * This particular interceptor returns a delegated service reference so that
483 	 * callers can keep the reference even if the underlying target service
484 	 * reference changes in time.
485 	 */
486 	public ServiceReference getServiceReference() {
487 		return referenceDelegate;
488 	}
489 
490 	public void setRetryTemplate(RetryTemplate retryTemplate) {
491 		this.retryTemplate = retryTemplate;
492 	}
493 
494 	public RetryTemplate getRetryTemplate() {
495 		return retryTemplate;
496 	}
497 
498 	public OsgiServiceLifecycleListener[] getListeners() {
499 		return listeners;
500 	}
501 
502 	public void setListeners(OsgiServiceLifecycleListener[] listeners) {
503 		this.listeners = listeners;
504 	}
505 
506 	public void setServiceImporter(Object importer) {
507 		this.eventSource = importer;
508 	}
509 
510 	public void setServiceImporterName(String name) {
511 		this.sourceName = name;
512 	}
513 
514 	public void setRequiredAtStartup(boolean requiredAtStartup) {
515 		this.serviceRequiredAtStartup = requiredAtStartup;
516 	}
517 
518 	public void setProxy(Object proxy) {
519 		this.proxy = proxy;
520 	}
521 
522 	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
523 		this.applicationEventPublisher = applicationEventPublisher;
524 	}
525 
526 	/** Internal state listeners */
527 	public void setStateListeners(List stateListeners) {
528 		synchronized (lock) {
529 			this.stateListeners = stateListeners;
530 		}
531 	}
532 
533 	public boolean equals(Object other) {
534 		if (this == other)
535 			return true;
536 		if (other instanceof ServiceDynamicInterceptor) {
537 			ServiceDynamicInterceptor oth = (ServiceDynamicInterceptor) other;
538 			return (serviceRequiredAtStartup == oth.serviceRequiredAtStartup
539 					&& ObjectUtils.nullSafeEquals(wrapper, oth.wrapper)
540 					&& ObjectUtils.nullSafeEquals(filter, oth.filter) && ObjectUtils.nullSafeEquals(retryTemplate,
541 				oth.retryTemplate));
542 		}
543 		else
544 			return false;
545 	}
546 
547 	public int hashCode() {
548 		return hashCode;
549 	}
550 }