001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.util.HashMap; 020 import java.util.Iterator; 021 import java.util.Map; 022 import java.util.Set; 023 import java.util.SortedSet; 024 import java.util.TreeSet; 025 import java.util.concurrent.ScheduledExecutorService; 026 import java.util.concurrent.TimeUnit; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * Default implementation of the {@link TimeoutMap}. 033 * 034 * @version $Revision: 585 $ 035 */ 036 public class DefaultTimeoutMap implements TimeoutMap, Runnable { 037 038 private static final transient Log LOG = LogFactory.getLog(DefaultTimeoutMap.class); 039 040 private final Map map = new HashMap(); 041 private SortedSet index = new TreeSet(); 042 private ScheduledExecutorService executor; 043 private long purgePollTime; 044 045 public DefaultTimeoutMap() { 046 this(null, 1000L); 047 } 048 049 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 050 this.executor = executor; 051 this.purgePollTime = requestMapPollTimeMillis; 052 schedulePoll(); 053 } 054 055 public Object get(Object key) { 056 TimeoutMapEntry entry = null; 057 synchronized (map) { 058 entry = (TimeoutMapEntry) map.get(key); 059 if (entry == null) { 060 return null; 061 } 062 index.remove(entry); 063 updateExpireTime(entry); 064 index.add(entry); 065 } 066 return entry.getValue(); 067 } 068 069 public void put(Object key, Object value, long timeoutMillis) { 070 TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis); 071 synchronized (map) { 072 Object oldValue = map.put(key, entry); 073 if (oldValue != null) { 074 index.remove(oldValue); 075 } 076 updateExpireTime(entry); 077 index.add(entry); 078 } 079 } 080 081 public void remove(Object id) { 082 synchronized (map) { 083 TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id); 084 if (entry != null) { 085 index.remove(entry); 086 } 087 } 088 } 089 090 public Object[] getKeys() { 091 Object[] keys = null; 092 synchronized (map) { 093 Set keySet = map.keySet(); 094 keys = new Object[keySet.size()]; 095 keySet.toArray(keys); 096 } 097 return keys; 098 } 099 100 public int size() { 101 synchronized (map) { 102 return map.size(); 103 } 104 } 105 106 /** 107 * The timer task which purges old requests and schedules another poll 108 */ 109 public void run() { 110 purge(); 111 schedulePoll(); 112 } 113 114 public void purge() { 115 long now = currentTime(); 116 synchronized (map) { 117 for (Iterator iter = index.iterator(); iter.hasNext();) { 118 TimeoutMapEntry entry = (TimeoutMapEntry) iter.next(); 119 if (entry == null) { 120 break; 121 } 122 if (entry.getExpireTime() < now) { 123 if (isValidForEviction(entry)) { 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("Evicting inactive request for correlationID: " + entry); 126 } 127 map.remove(entry.getKey()); 128 iter.remove(); 129 } 130 } else { 131 break; 132 } 133 } 134 } 135 } 136 137 // Properties 138 // ------------------------------------------------------------------------- 139 public long getPurgePollTime() { 140 return purgePollTime; 141 } 142 143 /** 144 * Sets the next purge poll time in milliseconds 145 */ 146 public void setPurgePollTime(long purgePollTime) { 147 this.purgePollTime = purgePollTime; 148 } 149 150 public ScheduledExecutorService getExecutor() { 151 return executor; 152 } 153 154 /** 155 * Sets the executor used to schedule purge events of inactive requests 156 */ 157 public void setExecutor(ScheduledExecutorService executor) { 158 this.executor = executor; 159 } 160 161 // Implementation methods 162 // ------------------------------------------------------------------------- 163 164 /** 165 * lets schedule each time to allow folks to change the time at runtime 166 */ 167 protected void schedulePoll() { 168 if (executor != null) { 169 executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS); 170 } 171 } 172 173 /** 174 * A hook to allow derivations to avoid evicting the current entry 175 */ 176 protected boolean isValidForEviction(TimeoutMapEntry entry) { 177 return true; 178 } 179 180 protected void updateExpireTime(TimeoutMapEntry entry) { 181 long now = currentTime(); 182 entry.setExpireTime(entry.getTimeout() + now); 183 } 184 185 protected long currentTime() { 186 return System.currentTimeMillis(); 187 } 188 }