001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util.osgi; 018 019import static org.osgi.framework.wiring.BundleRevision.PACKAGE_NAMESPACE; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.net.URL; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Properties; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033 034import org.apache.activemq.Service; 035import org.apache.activemq.store.PersistenceAdapter; 036import org.apache.activemq.transport.Transport; 037import org.apache.activemq.transport.discovery.DiscoveryAgent; 038import org.apache.activemq.util.FactoryFinder; 039import org.apache.activemq.util.FactoryFinder.ObjectFactory; 040import org.osgi.framework.Bundle; 041import org.osgi.framework.BundleActivator; 042import org.osgi.framework.BundleContext; 043import org.osgi.framework.BundleEvent; 044import org.osgi.framework.SynchronousBundleListener; 045import org.osgi.framework.wiring.BundleCapability; 046import org.osgi.framework.wiring.BundleWire; 047import org.osgi.framework.wiring.BundleWiring; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * An OSGi bundle activator for ActiveMQ which adapts the {@link org.apache.activemq.util.FactoryFinder} 053 * to the OSGi environment. 054 * 055 */ 056public class Activator implements BundleActivator, SynchronousBundleListener, ObjectFactory { 057 058 private static final Logger LOG = LoggerFactory.getLogger(Activator.class); 059 060 private final ConcurrentMap<String, Class<?>> serviceCache = new ConcurrentHashMap<String, Class<?>>(); 061 private final ConcurrentMap<Long, BundleWrapper> bundleWrappers = new ConcurrentHashMap<Long, BundleWrapper>(); 062 private BundleContext bundleContext; 063 private Set<BundleCapability> packageCapabilities = new HashSet<BundleCapability>(); 064 065 // ================================================================ 066 // BundleActivator interface impl 067 // ================================================================ 068 069 @Override 070 public synchronized void start(BundleContext bundleContext) throws Exception { 071 072 // This is how we replace the default FactoryFinder strategy 073 // with one that is more compatible in an OSGi env. 074 FactoryFinder.setObjectFactory(this); 075 076 debug("activating"); 077 this.bundleContext = bundleContext; 078 079 cachePackageCapabilities(Service.class, Transport.class, DiscoveryAgent.class, PersistenceAdapter.class); 080 081 debug("checking existing bundles"); 082 bundleContext.addBundleListener(this); 083 for (Bundle bundle : bundleContext.getBundles()) { 084 if (bundle.getState() == Bundle.RESOLVED || bundle.getState() == Bundle.STARTING || 085 bundle.getState() == Bundle.ACTIVE || bundle.getState() == Bundle.STOPPING) { 086 register(bundle); 087 } 088 } 089 debug("activated"); 090 } 091 092 /** 093 * Caches the package capabilities that are needed for a set of interface classes 094 * 095 * @param classes interfaces we want to track 096 */ 097 private void cachePackageCapabilities(Class<?> ... classes) { 098 BundleWiring ourWiring = bundleContext.getBundle().adapt(BundleWiring.class); 099 Set<String> packageNames = new HashSet<String>(); 100 for (Class<?> clazz: classes) { 101 packageNames.add(clazz.getPackage().getName()); 102 } 103 104 List<BundleCapability> ourExports = ourWiring.getCapabilities(PACKAGE_NAMESPACE); 105 for (BundleCapability ourExport : ourExports) { 106 String ourPkgName = (String) ourExport.getAttributes().get(PACKAGE_NAMESPACE); 107 if (packageNames.contains(ourPkgName)) { 108 packageCapabilities.add(ourExport); 109 } 110 } 111 } 112 113 114 @Override 115 public synchronized void stop(BundleContext bundleContext) throws Exception { 116 debug("deactivating"); 117 bundleContext.removeBundleListener(this); 118 while (!bundleWrappers.isEmpty()) { 119 unregister(bundleWrappers.keySet().iterator().next()); 120 } 121 debug("deactivated"); 122 this.bundleContext = null; 123 } 124 125 // ================================================================ 126 // SynchronousBundleListener interface impl 127 // ================================================================ 128 129 @Override 130 public void bundleChanged(BundleEvent event) { 131 if (event.getType() == BundleEvent.RESOLVED) { 132 register(event.getBundle()); 133 } else if (event.getType() == BundleEvent.UNRESOLVED || event.getType() == BundleEvent.UNINSTALLED) { 134 unregister(event.getBundle().getBundleId()); 135 } 136 } 137 138 protected void register(final Bundle bundle) { 139 debug("checking bundle " + bundle.getBundleId()); 140 if (isOurBundle(bundle) || isImportingUs(bundle) ) { 141 debug("Registering bundle for extension resolution: "+ bundle.getBundleId()); 142 bundleWrappers.put(bundle.getBundleId(), new BundleWrapper(bundle)); 143 } 144 } 145 146 private boolean isOurBundle(final Bundle bundle) { 147 return bundle.getBundleId() == bundleContext.getBundle().getBundleId(); 148 } 149 150 /** 151 * When bundles unload.. we remove them thier cached Class entries from the 152 * serviceCache. Future service lookups for the service will fail. 153 * 154 * TODO: consider a way to get the Broker release any references to 155 * instances of the service. 156 * 157 * @param bundleId 158 */ 159 protected void unregister(long bundleId) { 160 BundleWrapper bundle = bundleWrappers.remove(bundleId); 161 if (bundle != null) { 162 for (String path : bundle.cachedServices) { 163 debug("unregistering service for key: " +path ); 164 serviceCache.remove(path); 165 } 166 } 167 } 168 169 // ================================================================ 170 // ObjectFactory interface impl 171 // ================================================================ 172 173 @Override 174 public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException { 175 Class<?> clazz = serviceCache.get(path); 176 if (clazz == null) { 177 StringBuffer warnings = new StringBuffer(); 178 // We need to look for a bundle that has that class. 179 int wrrningCounter=1; 180 for (BundleWrapper wrapper : bundleWrappers.values()) { 181 URL resource = wrapper.bundle.getResource(path); 182 if( resource == null ) { 183 continue; 184 } 185 186 Properties properties = loadProperties(resource); 187 188 String className = properties.getProperty("class"); 189 if (className == null) { 190 warnings.append("("+(wrrningCounter++)+") Invalid service file in bundle "+wrapper+": 'class' property not defined."); 191 continue; 192 } 193 194 try { 195 clazz = wrapper.bundle.loadClass(className); 196 } catch (ClassNotFoundException e) { 197 warnings.append("("+(wrrningCounter++)+") Bundle "+wrapper+" could not load "+className+": "+e); 198 continue; 199 } 200 201 // Yay.. the class was found. Now cache it. 202 serviceCache.put(path, clazz); 203 wrapper.cachedServices.add(path); 204 break; 205 } 206 207 if( clazz == null ) { 208 // Since OSGi is such a tricky environment to work in.. lets give folks the 209 // most information we can in the error message. 210 String msg = "Service not found: '" + path + "'"; 211 if (warnings.length()!= 0) { 212 msg += ", "+warnings; 213 } 214 throw new IOException(msg); 215 } 216 } 217 return clazz.newInstance(); 218 } 219 220 // ================================================================ 221 // Internal Helper Methods 222 // ================================================================ 223 224 private void debug(Object msg) { 225 LOG.debug(msg.toString()); 226 } 227 228 private Properties loadProperties(URL resource) throws IOException { 229 InputStream in = resource.openStream(); 230 try { 231 BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8")); 232 Properties properties = new Properties(); 233 properties.load(in); 234 return properties; 235 } finally { 236 try { 237 in.close(); 238 } catch (Exception e) { 239 } 240 } 241 } 242 243 /** 244 * We consider a bundle to be a candidate for objects if it imports at least 245 * one of the packages of our interfaces 246 * 247 * @param bundle 248 * @return true if the bundle is improting. 249 */ 250 private boolean isImportingUs(Bundle bundle) { 251 BundleWiring wiring = bundle.adapt(BundleWiring.class); 252 List<BundleWire> imports = wiring.getRequiredWires(PACKAGE_NAMESPACE); 253 for (BundleWire importWire : imports) { 254 if (packageCapabilities.contains(importWire.getCapability())) { 255 return true; 256 } 257 } 258 return false; 259 } 260 261 private static class BundleWrapper { 262 private final Bundle bundle; 263 private final List<String> cachedServices = new ArrayList<String>(); 264 265 public BundleWrapper(Bundle bundle) { 266 this.bundle = bundle; 267 } 268 } 269}