001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.List; 020 import java.util.concurrent.ArrayBlockingQueue; 021 import java.util.concurrent.BlockingQueue; 022 import java.util.concurrent.RejectedExecutionException; 023 import java.util.concurrent.RejectedExecutionHandler; 024 import java.util.concurrent.ThreadFactory; 025 import java.util.concurrent.ThreadPoolExecutor; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.atomic.AtomicBoolean; 028 029 import org.apache.camel.AsyncCallback; 030 import org.apache.camel.AsyncProcessor; 031 import org.apache.camel.Exchange; 032 import org.apache.camel.Service; 033 import org.apache.camel.util.AsyncProcessorHelper; 034 035 /** 036 * A processor that forces async processing of the exchange using a thread pool. 037 * 038 * @version $Revision: 36321 $ 039 */ 040 public class ThreadProcessor implements AsyncProcessor, Service { 041 042 private ThreadPoolExecutor executor; 043 private long stackSize; 044 private ThreadGroup threadGroup; 045 private int priority = Thread.NORM_PRIORITY; 046 private boolean daemon = true; 047 private String name = "Thread Processor"; 048 private BlockingQueue<Runnable> taskQueue; 049 private long keepAliveTime; 050 private int maxSize = 1; 051 private int coreSize = 1; 052 private final AtomicBoolean shutdown = new AtomicBoolean(true); 053 054 class ProcessCall implements Runnable { 055 private final Exchange exchange; 056 private final AsyncCallback callback; 057 058 public ProcessCall(Exchange exchange, AsyncCallback callback) { 059 this.exchange = exchange; 060 this.callback = callback; 061 } 062 063 public void run() { 064 if (shutdown.get()) { 065 exchange.setException(new RejectedExecutionException()); 066 callback.done(false); 067 } else { 068 callback.done(false); 069 } 070 } 071 } 072 073 public void process(Exchange exchange) throws Exception { 074 AsyncProcessorHelper.process(this, exchange); 075 } 076 077 public boolean process(final Exchange exchange, final AsyncCallback callback) { 078 if (shutdown.get()) { 079 throw new IllegalStateException("ThreadProcessor is not running."); 080 } 081 ProcessCall call = new ProcessCall(exchange, callback); 082 executor.execute(call); 083 return false; 084 } 085 086 public void start() throws Exception { 087 shutdown.set(false); 088 getExecutor().setRejectedExecutionHandler(new RejectedExecutionHandler() { 089 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 090 ProcessCall call = (ProcessCall)runnable; 091 call.exchange.setException(new RejectedExecutionException()); 092 call.callback.done(false); 093 } 094 }); 095 } 096 097 public void stop() throws Exception { 098 shutdown.set(true); 099 executor.shutdown(); 100 executor.awaitTermination(0, TimeUnit.SECONDS); 101 } 102 103 public long getStackSize() { 104 return stackSize; 105 } 106 107 public void setStackSize(long stackSize) { 108 this.stackSize = stackSize; 109 } 110 111 public ThreadGroup getThreadGroup() { 112 return threadGroup; 113 } 114 115 public void setThreadGroup(ThreadGroup threadGroup) { 116 this.threadGroup = threadGroup; 117 } 118 119 public int getPriority() { 120 return priority; 121 } 122 123 public void setPriority(int priority) { 124 this.priority = priority; 125 } 126 127 public boolean isDaemon() { 128 return daemon; 129 } 130 131 public void setDaemon(boolean daemon) { 132 this.daemon = daemon; 133 } 134 135 public String getName() { 136 return name; 137 } 138 139 public void setName(String name) { 140 this.name = name; 141 } 142 143 public long getKeepAliveTime() { 144 return keepAliveTime; 145 } 146 147 public void setKeepAliveTime(long keepAliveTime) { 148 this.keepAliveTime = keepAliveTime; 149 } 150 151 public int getMaxSize() { 152 return maxSize; 153 } 154 155 public void setMaxSize(int maxSize) { 156 this.maxSize = maxSize; 157 } 158 159 public int getCoreSize() { 160 return coreSize; 161 } 162 163 public void setCoreSize(int coreSize) { 164 this.coreSize = coreSize; 165 } 166 167 public BlockingQueue<Runnable> getTaskQueue() { 168 if (taskQueue == null) { 169 taskQueue = new ArrayBlockingQueue<Runnable>(1000); 170 } 171 return taskQueue; 172 } 173 174 public void setTaskQueue(BlockingQueue<Runnable> taskQueue) { 175 this.taskQueue = taskQueue; 176 } 177 178 public ThreadPoolExecutor getExecutor() { 179 if (executor == null) { 180 executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() { 181 public Thread newThread(Runnable runnable) { 182 Thread thread; 183 if (getStackSize() > 0) { 184 thread = new Thread(getThreadGroup(), runnable, getName(), getStackSize()); 185 } else { 186 thread = new Thread(getThreadGroup(), runnable, getName()); 187 } 188 thread.setDaemon(isDaemon()); 189 thread.setPriority(getPriority()); 190 return thread; 191 } 192 }); 193 } 194 return executor; 195 } 196 197 public void setExecutor(ThreadPoolExecutor executor) { 198 this.executor = executor; 199 } 200 201 }