001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.util.concurrent.ConcurrentLinkedQueue;
020import java.util.concurrent.locks.ReentrantReadWriteLock;
021import java.util.regex.Pattern;
022
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.Broker;
026import org.apache.activemq.broker.BrokerFilter;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.region.Destination;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ConnectionInfo;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
035
036    public static final Logger LOG = LoggerFactory.getLogger(AbstractRuntimeConfigurationBroker.class);
037    protected final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
038    protected final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock();
039    protected Runnable monitorTask;
040    protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
041    protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
042    protected ObjectName objectName;
043    protected String infoString;
044
045    public AbstractRuntimeConfigurationBroker(Broker next) {
046        super(next);
047    }
048
049    @Override
050    public void start() throws Exception {
051        super.start();
052    }
053
054    @Override
055    public void stop() throws Exception {
056        if (monitorTask != null) {
057            try {
058                this.getBrokerService().getScheduler().cancel(monitorTask);
059            } catch (Exception letsNotStopStop) {
060                LOG.warn("Failed to cancel config monitor task", letsNotStopStop);
061            }
062        }
063        unregisterMbean();
064        super.stop();
065    }
066
067    protected void registerMbean() {
068
069    }
070
071    protected void unregisterMbean() {
072
073    }
074
075    // modification to virtual destinations interceptor needs exclusive access to destination add
076    @Override
077    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
078        Runnable work = addDestinationWork.poll();
079        if (work != null) {
080            try {
081                addDestinationBarrier.writeLock().lockInterruptibly();
082                do {
083                    work.run();
084                    work = addDestinationWork.poll();
085                } while (work != null);
086                return super.addDestination(context, destination, createIfTemporary);
087            } finally {
088                addDestinationBarrier.writeLock().unlock();
089            }
090        } else {
091            try {
092                addDestinationBarrier.readLock().lockInterruptibly();
093                return super.addDestination(context, destination, createIfTemporary);
094            } finally {
095                addDestinationBarrier.readLock().unlock();
096            }
097        }
098    }
099
100    // modification to authentication plugin needs exclusive access to connection add
101    @Override
102    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
103        Runnable work = addConnectionWork.poll();
104        if (work != null) {
105            try {
106                addConnectionBarrier.writeLock().lockInterruptibly();
107                do {
108                    work.run();
109                    work = addConnectionWork.poll();
110                } while (work != null);
111                super.addConnection(context, info);
112            } finally {
113                addConnectionBarrier.writeLock().unlock();
114            }
115        } else {
116            try {
117                addConnectionBarrier.readLock().lockInterruptibly();
118                super.addConnection(context, info);
119            } finally {
120                addConnectionBarrier.readLock().unlock();
121            }
122        }
123    }
124
125    /**
126     * Apply the destination work immediately instead of waiting for
127     * a connection add or destination add
128     *
129     * @throws Exception
130     */
131    protected void applyDestinationWork() throws Exception {
132        Runnable work = addDestinationWork.poll();
133        if (work != null) {
134            try {
135                addDestinationBarrier.writeLock().lockInterruptibly();
136                do {
137                    work.run();
138                    work = addDestinationWork.poll();
139                } while (work != null);
140            } finally {
141                addDestinationBarrier.writeLock().unlock();
142            }
143        }
144    }
145
146    public void debug(String s) {
147        LOG.debug(s);
148    }
149
150    public void info(String s) {
151        LOG.info(filterPasswords(s));
152        if (infoString != null) {
153            infoString += s;
154            infoString += ";";
155        }
156    }
157
158    public void info(String s, Throwable t) {
159        LOG.info(filterPasswords(s), t);
160        if (infoString != null) {
161            infoString += s;
162            infoString += ", " + t;
163            infoString += ";";
164        }
165    }
166
167    Pattern matchPassword = Pattern.compile("password=.*,");
168    protected String filterPasswords(Object toEscape) {
169        return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
170    }
171
172}