001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.util.HashMap;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.Set;
023    import java.util.SortedSet;
024    import java.util.TreeSet;
025    import java.util.concurrent.ScheduledExecutorService;
026    import java.util.concurrent.TimeUnit;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     *
033     * @version $Revision: 37863 $
034     */
035    public class DefaultTimeoutMap implements TimeoutMap, Runnable {
036    
037        private static final Log LOG = LogFactory.getLog(DefaultTimeoutMap.class);
038    
039        private Map map = new HashMap();
040        private SortedSet index = new TreeSet();
041        private ScheduledExecutorService executor;
042        private long purgePollTime;
043    
044        public DefaultTimeoutMap() {
045            this(null, 1000L);
046        }
047    
048        public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
049            this.executor = executor;
050            this.purgePollTime = requestMapPollTimeMillis;
051            schedulePoll();
052        }
053    
054        public Object get(Object key) {
055            TimeoutMapEntry entry = null;
056            synchronized (map) {
057                entry = (TimeoutMapEntry) map.get(key);
058                if (entry == null) {
059                    return null;
060                }
061                index.remove(entry);
062                updateExpireTime(entry);
063                index.add(entry);
064            }
065            return entry.getValue();
066        }
067    
068        public void put(Object key, Object value, long timeoutMillis) {
069            TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis);
070            synchronized (map) {
071                Object oldValue = map.put(key, entry);
072                if (oldValue != null) {
073                    index.remove(oldValue);
074                }
075                updateExpireTime(entry);
076                index.add(entry);
077            }
078        }
079    
080        public void remove(Object id) {
081            synchronized (map) {
082                TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id);
083                if (entry != null) {
084                    index.remove(entry);
085                }
086            }
087        }
088    
089        /**
090         * Returns a copy of the keys in the map
091         */
092        public Object[] getKeys() {
093            Object[] keys = null;
094            synchronized (map) {
095                Set keySet = map.keySet();
096                keys = new Object[keySet.size()];
097                keySet.toArray(keys);
098            }
099            return keys;
100        }
101        
102        public int size() {
103            synchronized (map) {
104                return map.size();
105            }
106        }
107    
108        /**
109         * The timer task which purges old requests and schedules another poll
110         */
111        public void run() {
112            purge();
113            schedulePoll();
114        }
115    
116        /**
117         * Purges any old entries from the map
118         */
119        public void purge() {
120            long now = currentTime();
121            synchronized (map) {
122                for (Iterator iter = index.iterator(); iter.hasNext();) {
123                    TimeoutMapEntry entry = (TimeoutMapEntry) iter.next();
124                    if (entry == null) {
125                        break;
126                    }
127                    if (entry.getExpireTime() < now) {
128                        if (isValidForEviction(entry)) {
129                            if (LOG.isDebugEnabled()) {
130                                LOG.debug("Evicting inactive request for correlationID: " + entry);
131                            }
132                            map.remove(entry.getKey());
133                            iter.remove();
134                        }
135                    } else {
136                        break;
137                    }
138                }
139            }
140        }
141    
142        // Properties
143        // -------------------------------------------------------------------------
144        public long getPurgePollTime() {
145            return purgePollTime;
146        }
147    
148        /**
149         * Sets the next purge poll time in milliseconds
150         */
151        public void setPurgePollTime(long purgePollTime) {
152            this.purgePollTime = purgePollTime;
153        }
154    
155        public ScheduledExecutorService getExecutor() {
156            return executor;
157        }
158    
159        /**
160         * Sets the executor used to schedule purge events of inactive requests
161         */
162        public void setExecutor(ScheduledExecutorService executor) {
163            this.executor = executor;
164        }
165    
166        // Implementation methods
167        // -------------------------------------------------------------------------
168    
169        /**
170         * lets schedule each time to allow folks to change the time at runtime
171         */
172        protected void schedulePoll() {
173            if (executor != null) {
174                executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS);
175            }
176        }
177    
178        /**
179         * A hook to allow derivations to avoid evicting the current entry
180         */
181        protected boolean isValidForEviction(TimeoutMapEntry entry) {
182            return true;
183        }
184    
185        protected void updateExpireTime(TimeoutMapEntry entry) {
186            long now = currentTime();
187            entry.setExpireTime(entry.getTimeout() + now);
188        }
189    
190        protected long currentTime() {
191            return System.currentTimeMillis();
192        }
193    }