001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.AsyncProcessor; 021 import org.apache.camel.CamelException; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Message; 024 import org.apache.camel.util.AsyncProcessorHelper; 025 026 public class HandleFaultProcessor extends DelegateProcessor implements AsyncProcessor { 027 028 @Override 029 public String toString() { 030 return "HandleFaultProcessor(" + processor + ")"; 031 } 032 033 @Override 034 public void process(Exchange exchange) throws Exception { 035 AsyncProcessorHelper.process(this, exchange); 036 } 037 038 public boolean process(final Exchange exchange, final AsyncCallback callback) { 039 if (processor == null) { 040 // no processor so nothing to process, so return 041 callback.done(true); 042 return true; 043 } 044 045 if (processor instanceof AsyncProcessor) { 046 return ((AsyncProcessor)processor).process(exchange, new AsyncCallback() { 047 048 public void done(boolean doneSynchronously) { 049 // Take the fault message out before we keep on going 050 Message faultMessage = exchange.getFault(false); 051 if (faultMessage != null) { 052 final Object faultBody = faultMessage.getBody(); 053 if (faultBody != null) { 054 faultMessage.setBody(null); // Reset it since we are handling it. 055 if (faultBody instanceof Throwable) { 056 exchange.setException((Throwable)faultBody); 057 } else { 058 if (exchange.getException() == null) { 059 exchange.setException(new CamelException("Message contains fault of type " 060 + faultBody.getClass().getName() + ":\n" + faultBody)); 061 } 062 } 063 } 064 } 065 callback.done(doneSynchronously); 066 } 067 }); 068 } 069 070 try { 071 processor.process(exchange); 072 } catch (Throwable e) { 073 exchange.setException(e); 074 } 075 076 final Message faultMessage = exchange.getFault(false); 077 if (faultMessage != null) { 078 final Object faultBody = faultMessage.getBody(); 079 if (faultBody != null) { 080 faultMessage.setBody(null); // Reset it since we are handling it. 081 if (faultBody instanceof Throwable) { 082 exchange.setException((Throwable)faultBody); 083 } else { 084 if (exchange.getException() == null) { 085 exchange.setException(new CamelException("Message contains fault of type " 086 + faultBody.getClass().getName() + ":\n" + faultBody)); 087 } 088 } 089 } 090 } 091 callback.done(true); 092 return true; 093 } 094 }