001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.xmpp;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.Socket;
023    import java.net.URI;
024    
025    import javax.net.SocketFactory;
026    import javax.xml.bind.JAXBContext;
027    import javax.xml.bind.JAXBException;
028    import javax.xml.bind.Marshaller;
029    import javax.xml.bind.Unmarshaller;
030    import javax.xml.namespace.QName;
031    import javax.xml.stream.Location;
032    import javax.xml.stream.XMLEventReader;
033    import javax.xml.stream.XMLInputFactory;
034    import javax.xml.stream.XMLOutputFactory;
035    import javax.xml.stream.XMLReporter;
036    import javax.xml.stream.XMLStreamException;
037    import javax.xml.stream.XMLStreamWriter;
038    import javax.xml.stream.events.Attribute;
039    import javax.xml.stream.events.StartElement;
040    import javax.xml.stream.events.XMLEvent;
041    
042    import org.apache.activemq.command.BrokerInfo;
043    import org.apache.activemq.command.Command;
044    import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
045    import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
046    import org.apache.activemq.transport.tcp.TcpTransport;
047    import org.apache.activemq.util.IOExceptionSupport;
048    import org.apache.activemq.util.ServiceStopper;
049    import org.apache.activemq.wireformat.WireFormat;
050    import org.jabber.etherx.streams.Features;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * @deprecated
056     */
057    @Deprecated
058    public class XmppTransport extends TcpTransport {
059        protected static final QName ATTRIBUTE_TO = new QName("to");
060    
061        private static final transient Logger LOG = LoggerFactory.getLogger(XmppTransport.class);
062    
063        protected OutputStream outputStream;
064        protected InputStream inputStream;
065    
066        private static JAXBContext context;
067        private XMLEventReader xmlReader;
068        private Unmarshaller unmarshaller;
069        private Marshaller marshaller;
070        private XMLStreamWriter xmlWriter;
071        private String to = "client";
072        private ProtocolConverter converter;
073        private String from = "localhost";
074        private String brokerId = "broker-id-1";
075    
076        public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
077            super(wireFormat, socket);
078            init();
079        }
080    
081        public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri1) throws IOException {
082            super(wireFormat, socketFactory, uri, uri1);
083            init();
084        }
085    
086        private void init() {
087            LOG.debug("Creating new instance of XmppTransport");
088            converter = new ProtocolConverter(this);
089        }
090    
091        @Override
092        public void oneway(Object object) throws IOException {
093            if (object instanceof Command) {
094                Command command = (Command)object;
095    
096                if (command instanceof BrokerInfo) {
097                    BrokerInfo brokerInfo = (BrokerInfo)command;
098    
099                    brokerId = brokerInfo.getBrokerId().toString();
100                    from = brokerInfo.getBrokerName();
101                    try {
102                        writeOpenStream(brokerId, from);
103                    } catch (XMLStreamException e) {
104                        throw IOExceptionSupport.create(e);
105                    }
106                } else {
107                    try {
108                        converter.onActiveMQCommand(command);
109                    } catch (IOException e) {
110                        throw e;
111                    } catch (Exception e) {
112                        throw IOExceptionSupport.create(e);
113                    }
114                }
115            } else {
116                LOG.warn("Unkown command: " + object);
117            }
118        }
119    
120        /**
121         * Marshalls the given POJO to the client
122         */
123        public void marshall(Object command) throws IOException {
124            if (isStopped() || isStopping()) {
125                LOG.warn("Not marshalling command as shutting down: " + command);
126                return;
127            }
128            try {
129                marshaller.marshal(command, xmlWriter);
130                xmlWriter.flush();
131                outputStream.flush();
132            } catch (JAXBException e) {
133                throw IOExceptionSupport.create(e);
134            } catch (XMLStreamException e) {
135                throw IOExceptionSupport.create(e);
136            }
137        }
138    
139        @Override
140        public void doRun() throws IOException {
141            LOG.debug("XMPP consumer thread starting");
142            try {
143                XMLInputFactory xif = XMLInputFactory.newInstance();
144                xif.setXMLReporter(new XMLReporter() {
145                    @Override
146                    public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException {
147                        LOG.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation);
148                    }
149                });
150    
151                xmlReader = xif.createXMLEventReader(inputStream);
152    
153                XMLEvent docStart = xmlReader.nextEvent();
154    
155                XMLEvent rootElement = xmlReader.nextTag();
156    
157                if (rootElement instanceof StartElement) {
158                    StartElement startElement = (StartElement)rootElement;
159                    Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO);
160                    if (toAttribute != null) {
161                        to = toAttribute.getValue();
162                    }
163                }
164                while (true) {
165                    if (isStopped()) {
166                        break;
167                    }
168    
169                    XMLEvent event = xmlReader.peek();
170                    if (event.isStartElement()) {
171                        // unmarshal a new object
172                        Object object = unmarshaller.unmarshal(xmlReader);
173                        if (object != null) {
174                            LOG.debug("Unmarshalled new incoming event - " + object.getClass().getName());
175                            converter.onXmppCommand(object);
176                        }
177                    } else {
178                        if (event.getEventType() == XMLEvent.END_ELEMENT) {
179                            break;
180                        } else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
181                            break;
182                        } else {
183                            xmlReader.nextEvent();
184                        }
185    
186                    }
187                }
188            } catch (Exception e) {
189                throw IOExceptionSupport.create(e);
190            }
191        }
192    
193        public String getFrom() {
194            return from;
195        }
196    
197        @Override
198        protected void doStop(ServiceStopper stopper) throws Exception {
199            if (xmlWriter != null) {
200                try {
201                    xmlWriter.writeEndElement();
202                    xmlWriter.writeEndDocument();
203                    xmlWriter.close();
204                } catch (XMLStreamException e) {
205                    // the client may have closed first so ignore this
206                    LOG.info("Caught trying to close transport: " + e, e);
207                }
208            }
209            if (xmlReader != null) {
210                try {
211                    xmlReader.close();
212                } catch (XMLStreamException e) {
213                    // the client may have closed first so ignore this
214                    LOG.info("Caught trying to close transport: " + e, e);
215                }
216            }
217            super.doStop(stopper);
218        }
219    
220        @Override
221        protected void initializeStreams() throws Exception {
222            // TODO it would be preferable to use class discovery here!
223            if ( context == null ) {
224                context = JAXBContext.newInstance(
225                        "jabber.server:" +
226                        "jabber.server.dialback:" +
227                        "jabber.client:" +
228                        "jabber.iq._private:" +
229                        "jabber.iq.auth:" +
230                        "jabber.iq.gateway:" +
231                        "jabber.iq.version:" +
232                        "jabber.iq.roster:" +
233                        "jabber.iq.pass:" +
234                        "jabber.iq.last:" +
235                        "jabber.iq.oob:" +
236                        "jabber.iq.time:" +
237                        "storage.rosternotes:" +
238                        "ietf.params.xml.ns.xmpp_streams:" +
239                        "ietf.params.xml.ns.xmpp_sasl:" +
240                        "ietf.params.xml.ns.xmpp_stanzas:" +
241                        "ietf.params.xml.ns.xmpp_bind:" +
242                        "ietf.params.xml.ns.xmpp_tls:" +
243                        "org.jabber.protocol.muc:" +
244                        "org.jabber.protocol.rosterx:" +
245                        "org.jabber.protocol.disco_info:" +
246                        "org.jabber.protocol.disco_items:" +
247                        "org.jabber.protocol.activity:" +
248                        "org.jabber.protocol.amp_errors:" +
249                        "org.jabber.protocol.amp:" +
250                        "org.jabber.protocol.address:" +
251                        "org.jabber.protocol.muc_user:" +
252                        "org.jabber.protocol.muc_admin:" +
253                        "org.jabber.etherx.streams");
254            }
255            inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
256            outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
257    
258            unmarshaller = context.createUnmarshaller();
259            marshaller = context.createMarshaller();
260            marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
261        }
262    
263        protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
264            LOG.debug("Sending initial stream element");
265    
266            XMLOutputFactory factory = XMLOutputFactory.newInstance();
267            // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
268            xmlWriter = factory.createXMLStreamWriter(outputStream);
269            xmlWriter.writeStartDocument();
270            xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams");
271            xmlWriter.writeDefaultNamespace("jabber:client");
272            xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams");
273            xmlWriter.writeAttribute("version", "1.0");
274            xmlWriter.writeAttribute("id", id);
275            if (to == null) {
276                to = "client";
277            }
278            xmlWriter.writeAttribute("to", to);
279            xmlWriter.writeAttribute("from", from);
280    
281            // now lets write the features
282            Features features = new Features();
283    
284            // TODO support TLS
285            // features.getAny().add(new Starttls());
286    
287            //Mechanisms mechanisms = new Mechanisms();
288    
289            // TODO support SASL
290            // mechanisms.getMechanism().add("DIGEST-MD5");
291            // mechanisms.getMechanism().add("PLAIN");
292            //features.getAny().add(mechanisms);
293            features.getAny().add(new ietf.params.xml.ns.xmpp_bind.ObjectFactory().createBind());
294            features.getAny().add(new ietf.params.xml.ns.xmpp_session.ObjectFactory().createSession(""));
295            marshall(features);
296    
297            LOG.debug("Initial stream element sent!");
298        }
299    
300    }