001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.Set;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.BrokerServiceAware;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.scheduler.JobSchedulerStore;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQQueue;
034import org.apache.activemq.command.ActiveMQTempQueue;
035import org.apache.activemq.command.ActiveMQTempTopic;
036import org.apache.activemq.command.ActiveMQTopic;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.ProducerId;
041import org.apache.activemq.command.SubscriptionInfo;
042import org.apache.activemq.command.TransactionId;
043import org.apache.activemq.command.XATransactionId;
044import org.apache.activemq.openwire.OpenWireFormat;
045import org.apache.activemq.protobuf.Buffer;
046import org.apache.activemq.store.AbstractMessageStore;
047import org.apache.activemq.store.MessageRecoveryListener;
048import org.apache.activemq.store.MessageStore;
049import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
050import org.apache.activemq.store.PersistenceAdapter;
051import org.apache.activemq.store.TopicMessageStore;
052import org.apache.activemq.store.TransactionRecoveryListener;
053import org.apache.activemq.store.TransactionStore;
054import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
055import org.apache.activemq.store.kahadb.data.KahaDestination;
056import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
057import org.apache.activemq.store.kahadb.data.KahaLocation;
058import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
059import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
060import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
061import org.apache.activemq.store.kahadb.disk.journal.Location;
062import org.apache.activemq.store.kahadb.disk.page.Transaction;
063import org.apache.activemq.usage.MemoryUsage;
064import org.apache.activemq.usage.SystemUsage;
065import org.apache.activemq.util.ByteSequence;
066import org.apache.activemq.wireformat.WireFormat;
067
068public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
069
070    private final WireFormat wireFormat = new OpenWireFormat();
071    private BrokerService brokerService;
072
073    @Override
074    public void setBrokerName(String brokerName) {
075    }
076    @Override
077    public void setUsageManager(SystemUsage usageManager) {
078    }
079
080    @Override
081    public TransactionStore createTransactionStore() throws IOException {
082        return new TransactionStore(){
083
084            @Override
085            public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
086                if (preCommit != null) {
087                    preCommit.run();
088                }
089                processCommit(txid);
090                if (postCommit != null) {
091                    postCommit.run();
092                }
093            }
094            @Override
095            public void prepare(TransactionId txid) throws IOException {
096                processPrepare(txid);
097            }
098            @Override
099            public void rollback(TransactionId txid) throws IOException {
100                processRollback(txid);
101            }
102            @Override
103            public void recover(TransactionRecoveryListener listener) throws IOException {
104                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
105                    XATransactionId xid = (XATransactionId)entry.getKey();
106                    ArrayList<Message> messageList = new ArrayList<Message>();
107                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
108
109                    for (Operation op : entry.getValue()) {
110                        if( op.getClass() == AddOpperation.class ) {
111                            AddOpperation addOp = (AddOpperation)op;
112                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
113                            messageList.add(msg);
114                        } else {
115                            RemoveOpperation rmOp = (RemoveOpperation)op;
116                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
117                            ackList.add(ack);
118                        }
119                    }
120
121                    Message[] addedMessages = new Message[messageList.size()];
122                    MessageAck[] acks = new MessageAck[ackList.size()];
123                    messageList.toArray(addedMessages);
124                    ackList.toArray(acks);
125                    listener.recover(xid, addedMessages, acks);
126                }
127            }
128            @Override
129            public void start() throws Exception {
130            }
131            @Override
132            public void stop() throws Exception {
133            }
134        };
135    }
136
137    public class KahaDBMessageStore extends AbstractMessageStore {
138        protected KahaDestination dest;
139
140        public KahaDBMessageStore(ActiveMQDestination destination) {
141            super(destination);
142            this.dest = convert( destination );
143        }
144
145        @Override
146        public ActiveMQDestination getDestination() {
147            return destination;
148        }
149
150        @Override
151        public void addMessage(ConnectionContext context, Message message) throws IOException {
152            KahaAddMessageCommand command = new KahaAddMessageCommand();
153            command.setDestination(dest);
154            command.setMessageId(message.getMessageId().toProducerKey());
155            processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
156        }
157
158        @Override
159        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
160            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
161            command.setDestination(dest);
162            command.setMessageId(ack.getLastMessageId().toProducerKey());
163            processRemove(command, ack.getTransactionId());
164        }
165
166        @Override
167        public void removeAllMessages(ConnectionContext context) throws IOException {
168            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
169            command.setDestination(dest);
170            process(command);
171        }
172
173        @Override
174        public Message getMessage(MessageId identity) throws IOException {
175            final String key = identity.toProducerKey();
176
177            // Hopefully one day the page file supports concurrent read operations... but for now we must
178            // externally synchronize...
179            ByteSequence data;
180            synchronized(indexMutex) {
181                data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
182                    @Override
183                    public ByteSequence execute(Transaction tx) throws IOException {
184                        StoredDestination sd = getStoredDestination(dest, tx);
185                        Long sequence = sd.messageIdIndex.get(tx, key);
186                        if( sequence ==null ) {
187                            return null;
188                        }
189                        return sd.orderIndex.get(tx, sequence).data;
190                    }
191                });
192            }
193            if( data == null ) {
194                return null;
195            }
196
197            Message msg = (Message)wireFormat.unmarshal( data );
198            return msg;
199        }
200
201        @Override
202        public void recover(final MessageRecoveryListener listener) throws Exception {
203            synchronized(indexMutex) {
204                pageFile.tx().execute(new Transaction.Closure<Exception>(){
205                    @Override
206                    public void execute(Transaction tx) throws Exception {
207                        StoredDestination sd = getStoredDestination(dest, tx);
208                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
209                            Entry<Long, MessageRecord> entry = iterator.next();
210                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
211                        }
212                    }
213                });
214            }
215        }
216
217        long cursorPos=0;
218
219        @Override
220        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
221            synchronized(indexMutex) {
222                pageFile.tx().execute(new Transaction.Closure<Exception>(){
223                    @Override
224                    public void execute(Transaction tx) throws Exception {
225                        StoredDestination sd = getStoredDestination(dest, tx);
226                        Entry<Long, MessageRecord> entry=null;
227                        int counter = 0;
228                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
229                            entry = iterator.next();
230                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
231                            counter++;
232                            if( counter >= maxReturned ) {
233                                break;
234                            }
235                        }
236                        if( entry!=null ) {
237                            cursorPos = entry.getKey()+1;
238                        }
239                    }
240                });
241            }
242        }
243
244        @Override
245        public void resetBatching() {
246            cursorPos=0;
247        }
248
249
250        @Override
251        public void setBatch(MessageId identity) throws IOException {
252            final String key = identity.toProducerKey();
253
254            // Hopefully one day the page file supports concurrent read operations... but for now we must
255            // externally synchronize...
256            Long location;
257            synchronized(indexMutex) {
258                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
259                    @Override
260                    public Long execute(Transaction tx) throws IOException {
261                        StoredDestination sd = getStoredDestination(dest, tx);
262                        return sd.messageIdIndex.get(tx, key);
263                    }
264                });
265            }
266            if( location!=null ) {
267                cursorPos=location+1;
268            }
269
270        }
271
272        @Override
273        public void setMemoryUsage(MemoryUsage memoryUsage) {
274        }
275        @Override
276        public void start() throws Exception {
277        }
278        @Override
279        public void stop() throws Exception {
280        }
281
282        @Override
283        public void recoverMessageStoreStatistics() throws IOException {
284            int count = 0;
285            synchronized(indexMutex) {
286                count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
287                    @Override
288                    public Integer execute(Transaction tx) throws IOException {
289                        // Iterate through all index entries to get a count of messages in the destination.
290                        StoredDestination sd = getStoredDestination(dest, tx);
291                        int rc=0;
292                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
293                            iterator.next();
294                            rc++;
295                        }
296                        return rc;
297                    }
298                });
299            }
300            getMessageStoreStatistics().getMessageCount().setCount(count);
301        }
302
303    }
304
305    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
306        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
307            super(destination);
308        }
309
310        @Override
311        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
312                                MessageId messageId, MessageAck ack) throws IOException {
313            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
314            command.setDestination(dest);
315            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
316            command.setMessageId(messageId.toProducerKey());
317            // We are not passed a transaction info.. so we can't participate in a transaction.
318            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
319            // to pass back to the XA recover method.
320            // command.setTransactionInfo();
321            processRemove(command, null);
322        }
323
324        @Override
325        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
326            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
327            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
328            command.setDestination(dest);
329            command.setSubscriptionKey(subscriptionKey);
330            command.setRetroactive(retroactive);
331            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
332            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
333            process(command);
334        }
335
336        @Override
337        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
338            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
339            command.setDestination(dest);
340            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
341            process(command);
342        }
343
344        @Override
345        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
346
347            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
348            synchronized(indexMutex) {
349                pageFile.tx().execute(new Transaction.Closure<IOException>(){
350                    @Override
351                    public void execute(Transaction tx) throws IOException {
352                        StoredDestination sd = getStoredDestination(dest, tx);
353                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
354                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
355                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
356                            subscriptions.add(info);
357
358                        }
359                    }
360                });
361            }
362
363            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
364            subscriptions.toArray(rc);
365            return rc;
366        }
367
368        @Override
369        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
370            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
371            synchronized(indexMutex) {
372                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
373                    @Override
374                    public SubscriptionInfo execute(Transaction tx) throws IOException {
375                        StoredDestination sd = getStoredDestination(dest, tx);
376                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
377                        if( command ==null ) {
378                            return null;
379                        }
380                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
381                    }
382                });
383            }
384        }
385
386        @Override
387        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
388            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
389            synchronized(indexMutex) {
390                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
391                    @Override
392                    public Integer execute(Transaction tx) throws IOException {
393                        StoredDestination sd = getStoredDestination(dest, tx);
394                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
395                        if ( cursorPos==null ) {
396                            // The subscription might not exist.
397                            return 0;
398                        }
399                        cursorPos += 1;
400
401                        int counter = 0;
402                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
403                            iterator.next();
404                            counter++;
405                        }
406                        return counter;
407                    }
408                });
409            }
410        }
411
412        @Override
413        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
414            return 0;
415        }
416
417        private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
418
419        @Override
420        public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
421            return stats;
422        }
423
424        @Override
425        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
426            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
427            synchronized(indexMutex) {
428                pageFile.tx().execute(new Transaction.Closure<Exception>(){
429                    @Override
430                    public void execute(Transaction tx) throws Exception {
431                        StoredDestination sd = getStoredDestination(dest, tx);
432                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
433                        cursorPos += 1;
434
435                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
436                            Entry<Long, MessageRecord> entry = iterator.next();
437                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
438                        }
439                    }
440                });
441            }
442        }
443
444        @Override
445        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
446            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
447            synchronized(indexMutex) {
448                pageFile.tx().execute(new Transaction.Closure<Exception>(){
449                    @Override
450                    public void execute(Transaction tx) throws Exception {
451                        StoredDestination sd = getStoredDestination(dest, tx);
452                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
453                        if( cursorPos == null ) {
454                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
455                            cursorPos += 1;
456                        }
457
458                        Entry<Long, MessageRecord> entry=null;
459                        int counter = 0;
460                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
461                            entry = iterator.next();
462                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
463                            counter++;
464                            if( counter >= maxReturned ) {
465                                break;
466                            }
467                        }
468                        if( entry!=null ) {
469                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
470                        }
471                    }
472                });
473            }
474        }
475
476        @Override
477        public void resetBatching(String clientId, String subscriptionName) {
478            try {
479                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
480                synchronized(indexMutex) {
481                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
482                        @Override
483                        public void execute(Transaction tx) throws IOException {
484                            StoredDestination sd = getStoredDestination(dest, tx);
485                            sd.subscriptionCursors.remove(subscriptionKey);
486                        }
487                    });
488                }
489            } catch (IOException e) {
490                throw new RuntimeException(e);
491            }
492        }
493    }
494
495    String subscriptionKey(String clientId, String subscriptionName){
496        return clientId+":"+subscriptionName;
497    }
498
499    @Override
500    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
501        return new KahaDBMessageStore(destination);
502    }
503
504    @Override
505    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
506        return new KahaDBTopicMessageStore(destination);
507    }
508
509    /**
510     * Cleanup method to remove any state associated with the given destination.
511     * This method does not stop the message store (it might not be cached).
512     *
513     * @param destination Destination to forget
514     */
515    @Override
516    public void removeQueueMessageStore(ActiveMQQueue destination) {
517    }
518
519    /**
520     * Cleanup method to remove any state associated with the given destination
521     * This method does not stop the message store (it might not be cached).
522     *
523     * @param destination Destination to forget
524     */
525    @Override
526    public void removeTopicMessageStore(ActiveMQTopic destination) {
527    }
528
529    @Override
530    public void deleteAllMessages() throws IOException {
531    }
532
533
534    @Override
535    public Set<ActiveMQDestination> getDestinations() {
536        try {
537            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
538            synchronized(indexMutex) {
539                pageFile.tx().execute(new Transaction.Closure<IOException>(){
540                    @Override
541                    public void execute(Transaction tx) throws IOException {
542                        for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
543                            Entry<String, StoredDestination> entry = iterator.next();
544                            rc.add(convert(entry.getKey()));
545                        }
546                    }
547                });
548            }
549            return rc;
550        } catch (IOException e) {
551            throw new RuntimeException(e);
552        }
553    }
554
555    @Override
556    public long getLastMessageBrokerSequenceId() throws IOException {
557        return 0;
558    }
559
560    @Override
561    public long size() {
562        if ( !started.get() ) {
563            return 0;
564        }
565        try {
566            return pageFile.getDiskSize();
567        } catch (IOException e) {
568            throw new RuntimeException(e);
569        }
570    }
571
572    @Override
573    public void beginTransaction(ConnectionContext context) throws IOException {
574        throw new IOException("Not yet implemented.");
575    }
576    @Override
577    public void commitTransaction(ConnectionContext context) throws IOException {
578        throw new IOException("Not yet implemented.");
579    }
580    @Override
581    public void rollbackTransaction(ConnectionContext context) throws IOException {
582        throw new IOException("Not yet implemented.");
583    }
584
585    @Override
586    public void checkpoint(boolean sync) throws IOException {
587    }
588
589    ///////////////////////////////////////////////////////////////////
590    // Internal conversion methods.
591    ///////////////////////////////////////////////////////////////////
592
593
594
595    KahaLocation convert(Location location) {
596        KahaLocation rc = new KahaLocation();
597        rc.setLogId(location.getDataFileId());
598        rc.setOffset(location.getOffset());
599        return rc;
600    }
601
602    KahaDestination convert(ActiveMQDestination dest) {
603        KahaDestination rc = new KahaDestination();
604        rc.setName(dest.getPhysicalName());
605        switch( dest.getDestinationType() ) {
606        case ActiveMQDestination.QUEUE_TYPE:
607            rc.setType(DestinationType.QUEUE);
608            return rc;
609        case ActiveMQDestination.TOPIC_TYPE:
610            rc.setType(DestinationType.TOPIC);
611            return rc;
612        case ActiveMQDestination.TEMP_QUEUE_TYPE:
613            rc.setType(DestinationType.TEMP_QUEUE);
614            return rc;
615        case ActiveMQDestination.TEMP_TOPIC_TYPE:
616            rc.setType(DestinationType.TEMP_TOPIC);
617            return rc;
618        default:
619            return null;
620        }
621    }
622
623    ActiveMQDestination convert(String dest) {
624        int p = dest.indexOf(":");
625        if( p<0 ) {
626            throw new IllegalArgumentException("Not in the valid destination format");
627        }
628        int type = Integer.parseInt(dest.substring(0, p));
629        String name = dest.substring(p+1);
630
631        switch( KahaDestination.DestinationType.valueOf(type) ) {
632        case QUEUE:
633            return new ActiveMQQueue(name);
634        case TOPIC:
635            return new ActiveMQTopic(name);
636        case TEMP_QUEUE:
637            return new ActiveMQTempQueue(name);
638        case TEMP_TOPIC:
639            return new ActiveMQTempTopic(name);
640        default:
641            throw new IllegalArgumentException("Not in the valid destination format");
642        }
643    }
644
645    @Override
646    public long getLastProducerSequenceId(ProducerId id) {
647        return -1;
648    }
649
650    @Override
651    public void allowIOResumption() {
652        if (pageFile != null) {
653            pageFile.allowIOResumption();
654        }
655    }
656
657    @Override
658    public void setBrokerService(BrokerService brokerService) {
659        this.brokerService = brokerService;
660    }
661
662    @Override
663    public void load() throws IOException {
664        if( brokerService!=null ) {
665            wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
666        }
667        super.load();
668    }
669    @Override
670    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
671        throw new UnsupportedOperationException();
672    }
673}