001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.lang.reflect.ParameterizedType; 020 import java.lang.reflect.Type; 021 import java.util.Map; 022 import java.util.concurrent.ScheduledExecutorService; 023 import java.util.concurrent.ScheduledThreadPoolExecutor; 024 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Component; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.ExchangePattern; 030 import org.apache.camel.PollingConsumer; 031 import org.apache.camel.util.ObjectHelper; 032 033 /** 034 * A default endpoint useful for implementation inheritance 035 * 036 * @version $Revision: 40913 $ 037 */ 038 public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> { 039 private String endpointUri; 040 private CamelContext context; 041 private Component component; 042 private ScheduledExecutorService executorService; 043 private ExchangePattern exchangePattern = ExchangePattern.InOnly; 044 045 protected DefaultEndpoint(String endpointUri, Component component) { 046 this(endpointUri, component.getCamelContext()); 047 this.component = component; 048 } 049 050 protected DefaultEndpoint(String endpointUri, CamelContext context) { 051 this.endpointUri = endpointUri; 052 this.context = context; 053 } 054 055 public int hashCode() { 056 return endpointUri.hashCode() * 37 + 1; 057 } 058 059 @Override 060 public boolean equals(Object object) { 061 if (object instanceof DefaultEndpoint) { 062 DefaultEndpoint that = (DefaultEndpoint) object; 063 return ObjectHelper.equal(this.endpointUri, that.endpointUri); 064 } 065 return false; 066 } 067 068 @Override 069 public String toString() { 070 return "Endpoint[" + endpointUri + "]"; 071 } 072 073 public String getEndpointUri() { 074 return endpointUri; 075 } 076 077 public CamelContext getContext() { 078 return context; 079 } 080 081 public Component getComponent() { 082 return component; 083 } 084 085 public void setContext(CamelContext context) { 086 this.context = context; 087 } 088 089 /** 090 * @return the executor 091 */ 092 public synchronized ScheduledExecutorService getExecutorService() { 093 if (executorService == null) { 094 Component c = getComponent(); 095 if (c != null && c instanceof DefaultComponent) { 096 DefaultComponent dc = (DefaultComponent) c; 097 executorService = dc.getExecutorService(); 098 } 099 if (executorService == null) { 100 executorService = createExecutorService(); 101 } 102 } 103 return executorService; 104 } 105 106 /** 107 * @param executorService the executor to set 108 */ 109 public synchronized void setExecutorService(ScheduledExecutorService executorService) { 110 this.executorService = executorService; 111 } 112 113 public PollingConsumer<E> createPollingConsumer() throws Exception { 114 return new EventDrivenPollingConsumer<E>(this); 115 } 116 117 /** 118 * Converts the given exchange to the specified exchange type 119 */ 120 public E convertTo(Class<E> type, Exchange exchange) { 121 // TODO we could infer type parameter 122 if (type.isInstance(exchange)) { 123 return type.cast(exchange); 124 } 125 return getContext().getExchangeConverter().convertTo(type, exchange); 126 } 127 128 public E createExchange(Exchange exchange) { 129 Class<E> exchangeType = getExchangeType(); 130 if (exchangeType != null) { 131 if (exchangeType.isInstance(exchange)) { 132 return exchangeType.cast(exchange); 133 } 134 } 135 E answer = createExchange(); 136 answer.copyFrom(exchange); 137 return answer; 138 } 139 140 /** 141 * Returns the type of the exchange which is generated by this component 142 */ 143 public Class<E> getExchangeType() { 144 Type type = getClass().getGenericSuperclass(); 145 if (type instanceof ParameterizedType) { 146 ParameterizedType parameterizedType = (ParameterizedType) type; 147 Type[] arguments = parameterizedType.getActualTypeArguments(); 148 if (arguments.length > 0) { 149 Type argumentType = arguments[0]; 150 if (argumentType instanceof Class) { 151 return (Class<E>) argumentType; 152 } 153 } 154 } 155 return null; 156 } 157 158 public E createExchange() { 159 return createExchange(getExchangePattern()); 160 } 161 162 public E createExchange(ExchangePattern pattern) { 163 return (E) new DefaultExchange(getContext(), pattern); 164 } 165 166 public ExchangePattern getExchangePattern() { 167 return exchangePattern; 168 } 169 170 public void setExchangePattern(ExchangePattern exchangePattern) { 171 this.exchangePattern = exchangePattern; 172 } 173 174 protected ScheduledThreadPoolExecutor createExecutorService() { 175 return new ScheduledThreadPoolExecutor(10); 176 } 177 178 public void configureProperties(Map options) { 179 } 180 }