001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.net.URI;
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ThreadPoolExecutor;
023
024import org.apache.activemq.Service;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.broker.region.Region;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.broker.region.virtual.VirtualDestination;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.BrokerId;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.DestinationInfo;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.ProducerInfo;
037import org.apache.activemq.command.SessionInfo;
038import org.apache.activemq.command.TransactionId;
039import org.apache.activemq.store.PListStore;
040import org.apache.activemq.thread.Scheduler;
041import org.apache.activemq.usage.Usage;
042
043/**
044 * The Message Broker which routes messages, maintains subscriptions and
045 * connections, acknowledges messages and handles transactions.
046 */
047public interface Broker extends Region, Service {
048
049    /**
050     * Get a Broker from the Broker Stack that is a particular class
051     *
052     * @param type
053     * @return a Broker instance.
054     */
055    Broker getAdaptor(Class type);
056
057    /**
058     * Get the id of the broker
059     */
060    BrokerId getBrokerId();
061
062    /**
063     * Get the name of the broker
064     */
065    String getBrokerName();
066
067    /**
068     * A remote Broker connects
069     */
070    void addBroker(Connection connection, BrokerInfo info);
071
072    /**
073     * Remove a BrokerInfo
074     *
075     * @param connection
076     * @param info
077     */
078    void removeBroker(Connection connection, BrokerInfo info);
079
080    /**
081     * A client is establishing a connection with the broker.
082     *
083     * @throws Exception TODO
084     */
085    void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
086
087    /**
088     * A client is disconnecting from the broker.
089     *
090     * @param context the environment the operation is being executed under.
091     * @param info
092     * @param error null if the client requested the disconnect or the error
093     *                that caused the client to disconnect.
094     * @throws Exception TODO
095     */
096    void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;
097
098    /**
099     * Adds a session.
100     *
101     * @param context
102     * @param info
103     * @throws Exception TODO
104     */
105    void addSession(ConnectionContext context, SessionInfo info) throws Exception;
106
107    /**
108     * Removes a session.
109     *
110     * @param context
111     * @param info
112     * @throws Exception TODO
113     */
114    void removeSession(ConnectionContext context, SessionInfo info) throws Exception;
115
116    /**
117     * Adds a producer.
118     *
119     * @param context the environment the operation is being executed under.
120     * @throws Exception TODO
121     */
122    @Override
123    void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
124
125    /**
126     * Removes a producer.
127     *
128     * @param context the environment the operation is being executed under.
129     * @throws Exception TODO
130     */
131    @Override
132    void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
133
134    /**
135     * @return all clients added to the Broker.
136     * @throws Exception TODO
137     */
138    Connection[] getClients() throws Exception;
139
140    /**
141     * @return all destinations added to the Broker.
142     * @throws Exception TODO
143     */
144    ActiveMQDestination[] getDestinations() throws Exception;
145
146    /**
147     * return a reference destination map of a region based on the destination type
148     *
149     * @param destination
150     *
151     * @return destination Map
152     */
153    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination);
154
155    /**
156     * Gets a list of all the prepared xa transactions.
157     *
158     * @param context transaction ids
159     * @return array of TransactionId values
160     * @throws Exception TODO
161     */
162    TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
163
164    /**
165     * Starts a transaction.
166     *
167     * @param context
168     * @param xid
169     * @throws Exception TODO
170     */
171    void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception;
172
173    /**
174     * Prepares a transaction. Only valid for xa transactions.
175     *
176     * @param context
177     * @param xid
178     * @return id
179     * @throws Exception TODO
180     */
181    int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
182
183    /**
184     * Rollsback a transaction.
185     *
186     * @param context
187     * @param xid
188     * @throws Exception TODO
189     */
190    void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
191
192    /**
193     * Commits a transaction.
194     *
195     * @param context
196     * @param xid
197     * @param onePhase
198     * @throws Exception TODO
199     */
200    void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception;
201
202    /**
203     * Forgets a transaction.
204     *
205     * @param context
206     * @param transactionId
207     * @throws Exception
208     */
209    void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
210
211    /**
212     * Get the BrokerInfo's of any connected Brokers
213     *
214     * @return array of peer BrokerInfos
215     */
216    BrokerInfo[] getPeerBrokerInfos();
217
218    /**
219     * Notify the Broker that a dispatch is going to happen
220     *
221     * @param messageDispatch
222     */
223    void preProcessDispatch(MessageDispatch messageDispatch);
224
225    /**
226     * Notify the Broker that a dispatch has happened
227     *
228     * @param messageDispatch
229     */
230    void postProcessDispatch(MessageDispatch messageDispatch);
231
232    /**
233     * @return true if the broker has stopped
234     */
235    boolean isStopped();
236
237    /**
238     * @return a Set of all durable destinations
239     */
240    Set<ActiveMQDestination> getDurableDestinations();
241
242    /**
243     * Add and process a DestinationInfo object
244     *
245     * @param context
246     * @param info
247     * @throws Exception
248     */
249    void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
250
251    /**
252     * Remove and process a DestinationInfo object
253     *
254     * @param context
255     * @param info
256     *
257     * @throws Exception
258     */
259    void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
260
261    /**
262     * @return true if fault tolerant
263     */
264    boolean isFaultTolerantConfiguration();
265
266    /**
267     * @return the connection context used to make administration operations on
268     *         startup or via JMX MBeans
269     */
270    ConnectionContext getAdminConnectionContext();
271
272    /**
273     * Sets the default administration connection context used when configuring
274     * the broker on startup or via JMX
275     *
276     * @param adminConnectionContext
277     */
278    void setAdminConnectionContext(ConnectionContext adminConnectionContext);
279
280    /**
281     * @return the temp data store
282     */
283    PListStore getTempDataStore();
284
285    /**
286     * @return the URI that can be used to connect to the local Broker
287     */
288    URI getVmConnectorURI();
289
290    /**
291     * called when the brokerService starts
292     */
293    void brokerServiceStarted();
294
295    /**
296     * @return the BrokerService
297     */
298    BrokerService getBrokerService();
299
300    /**
301     * Ensure we get the Broker at the top of the Stack
302     *
303     * @return the broker at the top of the Stack
304     */
305    Broker getRoot();
306
307    /**
308     * Determine if a message has expired -allows default behaviour to be
309     * overriden - as the timestamp set by the producer can be out of sync with
310     * the broker
311     *
312     * @param messageReference
313     * @return true if the message is expired
314     */
315    boolean isExpired(MessageReference messageReference);
316
317    /**
318     * A Message has Expired
319     *
320     * @param context
321     * @param messageReference
322     * @param subscription (may be null)
323     */
324    void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription);
325
326    /**
327     * A message needs to go the a DLQ
328     *
329     *
330     * @param context
331     * @param messageReference
332     * @param poisonCause reason for dlq submission, may be null
333     * @return true if Message was placed in a DLQ false if discarded.
334     */
335    boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause);
336
337    /**
338     * @return the broker sequence id
339     */
340    long getBrokerSequenceId();
341
342    /**
343     * called when message is consumed
344     * @param context
345     * @param messageReference
346     */
347    void messageConsumed(ConnectionContext context, MessageReference messageReference);
348
349    /**
350     * Called when message is delivered to the broker
351     * @param context
352     * @param messageReference
353     */
354    void messageDelivered(ConnectionContext context, MessageReference messageReference);
355
356    /**
357     * Called when a message is discarded - e.g. running low on memory
358     * This will happen only if the policy is enabled - e.g. non durable topics
359     * @param context
360     * @param sub
361     * @param messageReference
362     */
363    void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
364
365    /**
366     * Called when there is a slow consumer
367     * @param context
368     * @param destination
369     * @param subs
370     */
371    void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
372
373    /**
374     * Called to notify a producer is too fast
375     * @param context
376     * @param producerInfo
377     * @param destination
378     */
379    void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
380
381    /**
382     * Called when a Usage reaches a limit
383     * @param context
384     * @param destination
385     * @param usage
386     */
387    void isFull(ConnectionContext context,Destination destination,Usage usage);
388
389    void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination);
390
391    void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination);
392
393    /**
394     *  called when the broker becomes the master in a master/slave
395     *  configuration
396     */
397    void nowMasterBroker();
398
399    Scheduler getScheduler();
400
401    ThreadPoolExecutor getExecutor();
402
403    void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp);
404
405    void networkBridgeStopped(BrokerInfo brokerInfo);
406
407
408}