001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.util.concurrent.ConcurrentLinkedQueue; 020import java.util.concurrent.locks.ReentrantReadWriteLock; 021import java.util.regex.Pattern; 022 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.BrokerFilter; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ConnectionInfo; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034public class AbstractRuntimeConfigurationBroker extends BrokerFilter { 035 036 public static final Logger LOG = LoggerFactory.getLogger(AbstractRuntimeConfigurationBroker.class); 037 protected final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock(); 038 protected final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock(); 039 protected Runnable monitorTask; 040 protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>(); 041 protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>(); 042 protected ObjectName objectName; 043 protected String infoString; 044 045 public AbstractRuntimeConfigurationBroker(Broker next) { 046 super(next); 047 } 048 049 @Override 050 public void start() throws Exception { 051 super.start(); 052 } 053 054 @Override 055 public void stop() throws Exception { 056 if (monitorTask != null) { 057 try { 058 this.getBrokerService().getScheduler().cancel(monitorTask); 059 } catch (Exception letsNotStopStop) { 060 LOG.warn("Failed to cancel config monitor task", letsNotStopStop); 061 } 062 } 063 unregisterMbean(); 064 super.stop(); 065 } 066 067 protected void registerMbean() { 068 069 } 070 071 protected void unregisterMbean() { 072 073 } 074 075 // modification to virtual destinations interceptor needs exclusive access to destination add 076 @Override 077 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { 078 Runnable work = addDestinationWork.poll(); 079 if (work != null) { 080 try { 081 addDestinationBarrier.writeLock().lockInterruptibly(); 082 do { 083 work.run(); 084 work = addDestinationWork.poll(); 085 } while (work != null); 086 return super.addDestination(context, destination, createIfTemporary); 087 } finally { 088 addDestinationBarrier.writeLock().unlock(); 089 } 090 } else { 091 try { 092 addDestinationBarrier.readLock().lockInterruptibly(); 093 return super.addDestination(context, destination, createIfTemporary); 094 } finally { 095 addDestinationBarrier.readLock().unlock(); 096 } 097 } 098 } 099 100 // modification to authentication plugin needs exclusive access to connection add 101 @Override 102 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 103 Runnable work = addConnectionWork.poll(); 104 if (work != null) { 105 try { 106 addConnectionBarrier.writeLock().lockInterruptibly(); 107 do { 108 work.run(); 109 work = addConnectionWork.poll(); 110 } while (work != null); 111 super.addConnection(context, info); 112 } finally { 113 addConnectionBarrier.writeLock().unlock(); 114 } 115 } else { 116 try { 117 addConnectionBarrier.readLock().lockInterruptibly(); 118 super.addConnection(context, info); 119 } finally { 120 addConnectionBarrier.readLock().unlock(); 121 } 122 } 123 } 124 125 /** 126 * Apply the destination work immediately instead of waiting for 127 * a connection add or destination add 128 * 129 * @throws Exception 130 */ 131 protected void applyDestinationWork() throws Exception { 132 Runnable work = addDestinationWork.poll(); 133 if (work != null) { 134 try { 135 addDestinationBarrier.writeLock().lockInterruptibly(); 136 do { 137 work.run(); 138 work = addDestinationWork.poll(); 139 } while (work != null); 140 } finally { 141 addDestinationBarrier.writeLock().unlock(); 142 } 143 } 144 } 145 146 public void debug(String s) { 147 LOG.debug(s); 148 } 149 150 public void info(String s) { 151 LOG.info(filterPasswords(s)); 152 if (infoString != null) { 153 infoString += s; 154 infoString += ";"; 155 } 156 } 157 158 public void info(String s, Throwable t) { 159 LOG.info(filterPasswords(s), t); 160 if (infoString != null) { 161 infoString += s; 162 infoString += ", " + t; 163 infoString += ";"; 164 } 165 } 166 167 Pattern matchPassword = Pattern.compile("password=.*,"); 168 protected String filterPasswords(Object toEscape) { 169 return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,"); 170 } 171 172}