所有分类
  • 所有分类
  • 未分类

Spring Cloud-FeignClient的原理

简介

说明

本文介绍SpringCloud的FeignClient的原理。

Feign服务调用的工作原理可以总结为以下几个步骤

  1. 首先通过@EnableFeignCleints注解开启FeignCleint。
  2. 根据Feign的规则实现接口,添加@FeignCleint注解。程序启动后,会扫描所有有@FeignCleint的类,并将这些信息注入到ioc容器中。
  3. 注入时从FeignClientFactoryBean.class获取FeignClient
  4. 当接口的方法被调用时,通过jdk的代理,来生成具体的RequesTemplate,RequesTemplate生成http的Request
  5. Request交给Client去处理,其中Client可以是HttpUrlConnection、HttpClient也可以是Okhttp
  6. Client被封装到LoadBalanceClient类,这个类结合类Ribbon做到了负载均衡。

整体流程图

默认配置

Decoder: ResponseEntityDecoder(对SpringDecoder的封装)
Encoder: SpringEncoder
Logger: Slf4jLogger
Contract: SpringMvcContract
Feign.Builder: Feign.Builder(配置了hystrix则为HystrixFeign.Builder)

配置类

spring-cloud-netflix-core的FeignClientsConfiguration.class:

@Configuration
public class FeignClientsConfiguration {

	@Autowired
	private ObjectFactory<HttpMessageConverters> messageConverters;

	@Autowired(required = false)
	private List<AnnotatedParameterProcessor> parameterProcessors = new ArrayList<>();

	@Autowired(required = false)
	private List<FeignFormatterRegistrar> feignFormatterRegistrars = new ArrayList<>();

	@Autowired(required = false)
	private Logger logger;

	@Bean
	@ConditionalOnMissingBean
	public Decoder feignDecoder() {
		return new OptionalDecoder(
				new ResponseEntityDecoder(new SpringDecoder(this.messageConverters)));
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnMissingClass("org.springframework.data.domain.Pageable")
	public Encoder feignEncoder() {
		return new SpringEncoder(this.messageConverters);
	}

	@Bean
	@ConditionalOnClass(name = "org.springframework.data.domain.Pageable")
	@ConditionalOnMissingBean
	public Encoder feignEncoderPageable() {
		return new PageableSpringEncoder(new SpringEncoder(this.messageConverters));
	}

	@Bean
	@ConditionalOnMissingBean
	public Contract feignContract(ConversionService feignConversionService) {
		return new SpringMvcContract(this.parameterProcessors, feignConversionService);
	}

	@Bean
	public FormattingConversionService feignConversionService() {
		FormattingConversionService conversionService = new DefaultFormattingConversionService();
		for (FeignFormatterRegistrar feignFormatterRegistrar : this.feignFormatterRegistrars) {
			feignFormatterRegistrar.registerFormatters(conversionService);
		}
		return conversionService;
	}

	@Bean
	@ConditionalOnMissingBean
	public Retryer feignRetryer() {
		return Retryer.NEVER_RETRY;
	}

	@Bean
	@Scope("prototype")
	@ConditionalOnMissingBean
	public Feign.Builder feignBuilder(Retryer retryer) {
		return Feign.builder().retryer(retryer);
	}

	@Bean
	@ConditionalOnMissingBean(FeignLoggerFactory.class)
	public FeignLoggerFactory feignLoggerFactory() {
		return new DefaultFeignLoggerFactory(this.logger);
	}

	@Bean
	@ConditionalOnClass(name = "org.springframework.data.domain.Page")
	public Module pageJacksonModule() {
		return new PageJacksonModule();
	}

	@Configuration
	@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
	protected static class HystrixFeignConfiguration {
		@Bean
		@Scope("prototype")
		@ConditionalOnMissingBean
		@ConditionalOnProperty(name = "feign.hystrix.enabled")
		public Feign.Builder feignHystrixBuilder() {
			return HystrixFeign.builder();
		}
	}
}

一、启动时扫描

1.@EnableFeignClients

开启Spring Cloud Feign功能是实现的。程序启动时,首先会检测是否有@EnableFeignClients注解,如果有,则会开启扫描功能,并扫描被@FeignClient注解修饰的接口。接下来,我们从@EnableFeignClients注解入手,分析Feign远程调用服务的工作原理。

查看@EnableFeignClients注解的源码,具体代码如下所示。

package org.springframework.cloud.openfeign;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({FeignClientsRegistrar.class})
public @interface EnableFeignClients {
    String[] value() default {};
    String[] basePackages() default {};
    Class<?>[] basePackageClasses() default {};
    Class<?>[] defaultConfiguration() default {};
    Class<?>[] clients() default {};
}

2.FeignClientsRegistrar

从上述源码可以看出,@EnableFeignClients注解引入了FeignClientsRegistrar类,跟踪该类源码,发现其内部定义了一个registerBeanDefinitions方法,具体代码如下所示。

public void registerBeanDefinitions(AnnotationMetadata metadata, 
BeanDefinitionRegistry registry) {
        this.registerDefaultConfiguration(metadata, registry);
        this.registerFeignClients(metadata, registry);
}

registerDefaultConfiguration()

注册所有@EnableFeignClients的defaultConfiguration属性标记的配置类数组(配置类数组:用@Configuration标记的配置类,可自定义feign.codec.Decoder、feign.codec.Encoder、feign.Contract)

registerFeignClients()

扫描并注册所有@FeignClient注解的接口。

这里,我们重点查看registerFeignClients()方法内部实现细节。

registerFeignClients()的源码如下所示。

public void registerFeignClients(AnnotationMetadata metadata,
                                 BeanDefinitionRegistry registry) {
    ClassPathScanningCandidateComponentProvider scanner = getScanner();//1
    scanner.setResourceLoader(this.resourceLoader);

    Set<String> basePackages;

    Map<String, Object> attrs = metadata
            .getAnnotationAttributes(EnableFeignClients.class.getName());//2
    AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
            FeignClient.class);//3
    final Class<?>[] clients = attrs == null ? null
            : (Class<?>[]) attrs.get("clients");
    if (clients == null || clients.length == 0) {//4
        scanner.addIncludeFilter(annotationTypeFilter);
        basePackages = getBasePackages(metadata);
    }
    else {//5
        final Set<String> clientClasses = new HashSet<>();
        basePackages = new HashSet<>();
        for (Class<?> clazz : clients) {
            basePackages.add(ClassUtils.getPackageName(clazz));
            clientClasses.add(clazz.getCanonicalName());
        }
        AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
            @Override
            protected boolean match(ClassMetadata metadata) {
                String cleaned = metadata.getClassName().replaceAll("\\$", ".");
                return clientClasses.contains(cleaned);
            }
        };
        scanner.addIncludeFilter(
                new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
    }

    for (String basePackage : basePackages) {
        Set<BeanDefinition> candidateComponents = scanner
                .findCandidateComponents(basePackage);
        for (BeanDefinition candidateComponent : candidateComponents) {
            if (candidateComponent instanceof AnnotatedBeanDefinition) {
                // verify annotated class is an interface
                AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                Assert.isTrue(annotationMetadata.isInterface(),
                        "@FeignClient can only be specified on an interface");

                Map<String, Object> attributes = annotationMetadata
                        .getAnnotationAttributes(
                                FeignClient.class.getCanonicalName());

                String name = getClientName(attributes);
                registerClientConfiguration(registry, name,
                        attributes.get("configuration"));//6

                registerFeignClient(registry, annotationMetadata, attributes);//7
            }
        }
    }
}
  1. 1处获取ClassPathScanner,用于扫描类路径
  2. 2处获取EnableFeignClients的所有属性
  3. 3处构造一个AnnotationTypeFilter,构造方法参数是FeignClient,这个用于过滤出只含有FeignClient的类
  4. 获得EnableFeignClients的clients属性值,4处如果是空,则获得EnableFeignClients所在的package路径(如果没有设置basePackageClasses)
  5. 5处,EnableFeignClients的clients属性不是空,则遍历,放入集合中,同时获取client所在的package路面,加入到basePacakges中;构造AbstractClassTestingTypeFilter,这是增加一个过滤条件,即标FeignClient注解的接口,必须在EnableFeignClients的clients中
  6. 遍历basePackages,获取每个package下的符合条件的类,得到对应的beanDefinition,6处得到FeignClient的configuration值,通过FeignClientSpecification其注册到spring容器中,有意思的是这里检查了FeignClient注解的类须是接口,不然会报错。
  7. 7处将FeignClient注解的接口封装到FeignClientFactoryBean中。FactoryBean大家懂的,Spring中接口都封装到这个里面。

综述

上述代码中,首先定义了一个基于classpath的组件扫描器,然后组件扫描器会根据指定的扫描位置和@EnableFeignClients注解属性找到开发人员定义的所有Feign客户端,也就是所有添加了@FeignClient注解的所有接口,最后将注册Feign客户端的动作交给registerFeignClient()方法完成。

二、registerFeignClient()

registerFeignClient()方法的源码如下所示。

private void registerFeignClient(BeanDefinitionRegistry registry,
		         AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
	String className = annotationMetadata.getClassName();//1
	BeanDefinitionBuilder definition = BeanDefinitionBuilder
			.genericBeanDefinition(FeignClientFactoryBean.class);//2
	validate(attributes);
	definition.addPropertyValue("url", getUrl(attributes));//3
	definition.addPropertyValue("path", getPath(attributes));
	String name = getName(attributes);
	definition.addPropertyValue("name", name);
	String contextId = getContextId(attributes);
	definition.addPropertyValue("contextId", contextId);
	definition.addPropertyValue("type", className);
	definition.addPropertyValue("decode404", attributes.get("decode404"));
	definition.addPropertyValue("fallback", attributes.get("fallback"));
	definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
	definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

	String alias = contextId + "FeignClient";
	AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();

	boolean primary = (Boolean) attributes.get("primary"); // has a default, won't be
															// null
	beanDefinition.setPrimary(primary);

	String qualifier = getQualifier(attributes);
	if (StringUtils.hasText(qualifier)) {
		alias = qualifier;
	}

	BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
			new String[] { alias });
	// 动态注册
	BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

1.1处:获取使用@FeignClient注解修饰的类名

2.2处:创建一个BeanDefinitionBuilder,指定待创建的Bean是FeignClientFactoryBean

3.3处:将@FeignClient注解上的属性值与FeignClientFactoryBean.class关联起来。

综述:

首先会将使用@FeignClient注解修饰的类名及其注解信息获取出来,赋值给BeanDefinitionBuilder(此builder指定待创建的Bean是FeignClientFactoryBean),然后根据BeanDefinitionBuilder得到BeanDefinition,最后将BeanDefinition注入IoC容器。

三、注入时从FeignClientFactoryBean.class获取

FeignClientFactoryBean.class实现的接口

FactoryBean接口:getObject、getObjectType、isSingleton方法;

InitializingBean接口:afterPropertiesSet方法;

ApplicationContextAware:setApplicationContext方法。

class FeignClientFactoryBean
		implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
    //xxx
}

FactoryBean接口

实现接口 FactoryBean#getObject()后,由spring框架生产FeignClient(FeignClientFactoryBean通过Targeter生产FeignClient)。

FeignClientFactoryBean.class  //org.springframework.cloud.openfeign.FeignClientFactoryBean.java

以下代码片段去除了次要代码。 

package org.springframework.cloud.openfeign;

class FeignClientFactoryBean
		implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {

	private Class<?> type;
	private String name;
	private ApplicationContext applicationContext;

	@Override
	public Object getObject() throws Exception {
		return getTarget();
	}

	/**
	 * @param <T> the target type of the Feign client
	 * @return a {@link Feign} client created with the specified data and the context information
	 */
	<T> T getTarget() {
		// FeignContext是在FeignAutoConfiguration中注册到Spring容器中的。详见下方:src1
		FeignContext context = applicationContext.getBean(FeignContext.class);
		
		// 从FeignContext中获取Feign.Builder。(从Spring容器获取。bean定义见本文“简介”=> 整体配置
		Feign.Builder builder = feign(context);

		if (!StringUtils.hasText(url)) {
			if (!name.startsWith("http")) {
				url = "http://" + name;
			}
			else {
				url = name;
			}
			url += cleanPath();
			return (T) loadBalance(builder, context,
					new HardCodedTarget<>(type, name, url));
		}
		if (StringUtils.hasText(url) && !url.startsWith("http")) {
			url = "http://" + url;
		}
		String url = this.url + cleanPath();
		Client client = getOptional(context, Client.class);
		if (client != null) {
			if (client instanceof LoadBalancerFeignClient) {
				// not load balancing because we have a url,
				// but ribbon is on the classpath, so unwrap
				client = ((LoadBalancerFeignClient) client).getDelegate();
			}
			if (client instanceof FeignBlockingLoadBalancerClient) {
				// not load balancing because we have a url,
				// but Spring Cloud LoadBalancer is on the classpath, so unwrap
				client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
			}
			builder.client(client);
		}
		Targeter targeter = get(context, Targeter.class);
		return (T) targeter.target(this, builder, context,
				new HardCodedTarget<>(type, name, url));
	}

	@Override
	public Class<?> getObjectType() {
		return type;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	@Override
	public void setApplicationContext(ApplicationContext context) throws BeansException {
		this.applicationContext = context;
	}
	
	@Override
	public void afterPropertiesSet() {
		Assert.hasText(contextId, "Context id must be set");
		Assert.hasText(name, "Name must be set");
	}
}

setType被调用的位置:FeignClientBuilder.Builder#Builder(final ApplicationContext applicationContext, final Class<T> type, final String name)  //构造函数

private Builder(final ApplicationContext applicationContext, final Class<T> type,
		final String name) {
	this.feignClientFactoryBean = new FeignClientFactoryBean();

	this.feignClientFactoryBean.setApplicationContext(applicationContext);
	this.feignClientFactoryBean.setType(type);
	this.feignClientFactoryBean.setName(FeignClientsRegistrar.getName(name));
	this.feignClientFactoryBean.setContextId(FeignClientsRegistrar.getName(name));
	// preset default values - these values resemble the default values on the
	// FeignClient annotation
	this.url("").path("").decode404(false).fallback(void.class)
			.fallbackFactory(void.class);
}

src1

将Spring容器所有的FeignClientSpecification放入到FeignContext中,FeignClientSpecification在Feign源码分析之EnableFeignClients中讲过,即EnableFeignClients的defaultConfiguration。

public class FeignAutoConfiguration {
	@Autowired(required = false)
	private List<FeignClientSpecification> configurations = new ArrayList<>();

	@Bean
	public HasFeatures feignFeature() {
		return HasFeatures.namedFeature("Feign", Feign.class);
	}

	@Bean
	public FeignContext feignContext() {
		FeignContext context = new FeignContext();
		context.setConfigurations(this.configurations);
		return context;
	}
}

Targeter

Targeter有两个实现类:HystrixTargeter和DefaultTargeter。

  1. feign.hystrix.HystrixFeign类存在时,将 HystrixTargeter 注册为 Targeter 类型的 bean
  2. feign.hystrix.HystrixFeign类不存在时,使用 DefaultTargeter 。
  3. 看起来似乎可以使用自定义的Targeter代替Hystrix或默认的,这样就可以自定义各种功能了。实际上不行,因为 Targeter 是 package 访问级别的。

生成Feign

第三部分的FactoryBean接口处说到,FeignClientFactoryBean.class#getTarget最后调用到的是

Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context,
        new HardCodedTarget<>(this.type, this.name, url));

 本处以HystrixTargeter为例进行分析  // HystrixTargeter

class HystrixTargeter implements Targeter {
	@Override
	public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
			FeignContext context, Target.HardCodedTarget<T> target) {
		if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
			return feign.target(target);
		}
		feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
		SetterFactory setterFactory = getOptional(factory.getName(), context,
				SetterFactory.class);
		if (setterFactory != null) {
			builder.setterFactory(setterFactory);
		}
		Class<?> fallback = factory.getFallback();
		if (fallback != void.class) {
			return targetWithFallback(factory.getName(), context, target, builder,
					fallback);
		}
		Class<?> fallbackFactory = factory.getFallbackFactory();
		if (fallbackFactory != void.class) {
			return targetWithFallbackFactory(factory.getName(), context, target, builder,
					fallbackFactory);
		}
        //可发现:若同时定义fallBack和fallBackFactory,会用fallBack(fallBackFactory无效)

		return feign.target(target);
	}
	
	//其他代码
}

追踪”feign.target(target)”,发现在feign包的Feign.Builder里(HystrixFeign.Builder没有覆写此方法):

feign.target(target):  // Feign.Builder#target

public abstract class Feign {
	public static class Builder {
		private InvocationHandlerFactory invocationHandlerFactory =
			new InvocationHandlerFactory.Default();
	
		public <T> T target(Target<T> target) {
			return build().newInstance(target);
		}
		
		public Feign build() {
			SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
			  new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
				  logLevel, decode404, closeAfterDecode, propagationPolicy);
			ParseHandlersByName handlersByName =
			  new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
				  errorDecoder, synchronousMethodHandlerFactory);
			return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
		}

		public Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
			this.invocationHandlerFactory = invocationHandlerFactory;
			return this;
		}
		// xxx
	}
	// xxx
}

build().newInstance(target):  // ReflectiveFeign#newInstance

public class ReflectiveFeign extends Feign {
  private final ParseHandlersByName targetToHandlersByName;
  private final InvocationHandlerFactory factory;
  private final QueryMapEncoder queryMapEncoder;
  
  ReflectiveFeign(ParseHandlersByName targetToHandlersByName, InvocationHandlerFactory factory,
      QueryMapEncoder queryMapEncoder) {
    this.targetToHandlersByName = targetToHandlersByName;
    this.factory = factory;
    this.queryMapEncoder = queryMapEncoder;
  }

  public <T> T newInstance(Target<T> target) {
    Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

    for (Method method : target.type().getMethods()) {
      if (method.getDeclaringClass() == Object.class) {
        continue;
      } else if (Util.isDefault(method)) {
        DefaultMethodHandler handler = new DefaultMethodHandler(method);
        defaultMethodHandlers.add(handler);
        methodToHandler.put(method, handler);
      } else {
        methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
      }
    }
    InvocationHandler handler = factory.create(target, methodToHandler);
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
        new Class<?>[] {target.type()}, handler);

    for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
      defaultMethodHandler.bindTo(proxy);
    }
    return proxy;
  }
  // xxx
}

代理类(feign的核心)

上边有一行:InvocationHandler handler = factory.create(target, methodToHandler);

这个factory是什么呢?见:上边Feign.Builder#target

默认是InvocationHandlerFactory.Default():

public interface InvocationHandlerFactory {
  InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch);
  interface MethodHandler {
    Object invoke(Object[] argv) throws Throwable;
  }

  static final class Default implements InvocationHandlerFactory {
    @Override
    public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
      return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
    }
  }
}

但是,可以被覆盖的,若使用了hystrix,则会被覆盖:  //HystrixFeign.Builder#build

Feign build(final FallbackFactory<?> nullableFallbackFactory) {
  super.invocationHandlerFactory(new InvocationHandlerFactory() {
	@Override
	public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
	  return new HystrixInvocationHandler(target, dispatch, setterFactory,
		  nullableFallbackFactory);
	}
  });
  super.contract(new HystrixDelegatingContract(contract));
  return super.build();
}

所以,使用了hystrix后的InvocationHandler是: HystrixInvocationHandler。

继续跟踪:ReflectiveFeign#newInstance确定了InvocationHandler后的操作:Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler);

结论

注入@FeignClient标记的类时,实际注入的是这个类的代理类,它的代理是:HystrixInvocationHandler

Targeter自动配置类

@Configuration
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public Targeter feignTargeter() {
		return new HystrixTargeter();
	}
}
@Configuration
@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public Targeter feignTargeter() {
		return new DefaultTargeter();
	}
}

其他

添加拦截器

@FeignClient注解的类可以实现RequestInterceptor接口,之后交给Spring容器,feign会自动加上这个拦截器,这个的实现也在FeignClientFactoryBean#configureUsingConfiguration方法中,如下

Map<String, RequestInterceptor> requestInterceptors = context
.getInstances(this.contextId, RequestInterceptor.class);
if (requestInterceptors != null) {
    builder.requestInterceptors(requestInterceptors.values());
}

四、拦截@FeignClient修饰接口中的方法

在本文:第三部分=> Targeter=> 结论  已经分析到,@FeignClient注解的类会被加入动态代理:HystrixInvocationHandler。本部分就来分析这个代理具体做了什么。 

//对动态代理不太熟悉的,可看下此文章:Java设计模式系列–代理模式(静态代理与动态代理的使用) – 自学精灵

 HystrixInvocationHandler#invoke

final class HystrixInvocationHandler implements InvocationHandler {
  private final Map<Method, MethodHandler> dispatch;
  // 其他代码
  HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
      SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
    this.dispatch = checkNotNull(dispatch, "dispatch");
	// 其他代码
  }
  
  @Override
  public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {
    // 其他代码
    HystrixCommand<Object> hystrixCommand =
        new HystrixCommand<Object>(setterMethodMap.get(method)) {
          @Override
          protected Object run() throws Exception {
            try {
              return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            } catch (Exception e) {
              throw e;
            } catch (Throwable t) {
              throw (Error) t;
            }
          }
  
          @Override
          protected Object getFallback() {
			// 其他代码
          }
        };
  
    // 其他代码
    return hystrixCommand.execute();
  }

重点:HystrixInvocationHandler.this.dispatch.get(method).invoke(args)。而dispatch是构造函数传进来的,继续追踪

调用HystrixInvocationHandler构造函数:HystrixFeign.Builder#build

Feign build(final FallbackFactory<?> nullableFallbackFactory) {
  super.invocationHandlerFactory(new InvocationHandlerFactory() {
	@Override
	public InvocationHandler create(Target target,
									Map<Method, MethodHandler> dispatch) {
	  return new HystrixInvocationHandler(target, dispatch, setterFactory,
		  nullableFallbackFactory);
	}
  });
  super.contract(new HystrixDelegatingContract(contract));
  return super.build();
}

 create里边调用了构造方法将dispatch放进去的,追踪调用create的位置

调用create方法:ReflectiveFeign#newInstance

public class ReflectiveFeign extends Feign {
  // 这是一个静态内部类,在下边
  private final ParseHandlersByName targetToHandlersByName;
  private final InvocationHandlerFactory factory;

  @Override
  public <T> T newInstance(Target<T> target) {
    Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

    for (Method method : target.type().getMethods()) {
      if (method.getDeclaringClass() == Object.class) {
        continue;
      } else if (Util.isDefault(method)) {
        DefaultMethodHandler handler = new DefaultMethodHandler(method);
        defaultMethodHandlers.add(handler);
        methodToHandler.put(method, handler);
      } else {
	    // 重点。追踪nameToHandler
        methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
      }
    }
    InvocationHandler handler = factory.create(target, methodToHandler);
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
        new Class<?>[] {target.type()}, handler);

    for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
      defaultMethodHandler.bindTo(proxy);
    }
    return proxy;
  }
  
  static final class ParseHandlersByName {
    private final Contract contract;
    private final Options options;
    private final Encoder encoder;
    private final Decoder decoder;
    private final ErrorDecoder errorDecoder;
    private final QueryMapEncoder queryMapEncoder;
    private final SynchronousMethodHandler.Factory factory;

    ParseHandlersByName(
        Contract contract,
        Options options,
        Encoder encoder,
        Decoder decoder,
        QueryMapEncoder queryMapEncoder,
        ErrorDecoder errorDecoder,
        SynchronousMethodHandler.Factory factory) {
      this.contract = contract;
      this.options = options;
      this.factory = factory;
      this.errorDecoder = errorDecoder;
      this.queryMapEncoder = queryMapEncoder;
      this.encoder = checkNotNull(encoder, "encoder");
      this.decoder = checkNotNull(decoder, "decoder");
    }

    public Map<String, MethodHandler> apply(Target key) {
      List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type());
      Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
      for (MethodMetadata md : metadata) {
        BuildTemplateByResolvingArgs buildTemplate;
        if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
          buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder);
        } else if (md.bodyIndex() != null) {
          buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder);
        } else {
          buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder);
        }
		// 重点。追踪factory.create(即:SynchronousMethodHandler.Factory#create)
        result.put(md.configKey(), factory.create(key, md, buildTemplate, options, decoder, errorDecoder));
      }
      return result;
    }
  }
}

SynchronousMethodHandler.Factory#create

import feign.InvocationHandlerFactory.MethodHandler;

final class SynchronousMethodHandler implements MethodHandler {
  private final MethodMetadata metadata;
  private final Target<?> target;
  private final Client client;
  private final RequestTemplate.Factory buildTemplateFromArgs;
  private final List<RequestInterceptor> requestInterceptors;
  //其他代码

  private SynchronousMethodHandler(Target<?> target, Client client, Retryer retryer,
      List<RequestInterceptor> requestInterceptors, Logger logger,
      Logger.Level logLevel, MethodMetadata metadata,
      RequestTemplate.Factory buildTemplateFromArgs, Options options,
      Decoder decoder, ErrorDecoder errorDecoder, boolean decode404,
      boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy) {
    this.target = checkNotNull(target, "target");
    this.client = checkNotNull(client, "client for %s", target);
    this.requestInterceptors =
        checkNotNull(requestInterceptors, "requestInterceptors for %s", target);
    this.metadata = checkNotNull(metadata, "metadata for %s", target);
    this.buildTemplateFromArgs = checkNotNull(buildTemplateFromArgs, "metadata for %s", target);
	//其他代码
  }

  @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
        try {
          retryer.continueOrPropagate(e);
        } catch (RetryableException th) {
          Throwable cause = th.getCause();
          if (propagationPolicy == UNWRAP && cause != null) {
            throw cause;
          } else {
            throw th;
          }
        }
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
  
  static class Factory {
    // 其他代码
	
    public MethodHandler create(Target<?> target,
                                MethodMetadata md,
                                RequestTemplate.Factory buildTemplateFromArgs,
                                Options options,
                                Decoder decoder,
                                ErrorDecoder errorDecoder) {
      return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
          logLevel, md, buildTemplateFromArgs, options, decoder,
          errorDecoder, decode404, closeAfterDecode, propagationPolicy);
    }
  }
  
  Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);
	Response response;
	//其他代码
	response = client.execute(request, options);
	//其他代码
  }
}

当调用被@FeignClient修饰接口中的方法时,该方法会被SynchronousMethodHandler动态代理:生成一个RequestTemplate对象(也就是上边的invoke方法)。

executeAndDecode()方法会通过RequestTemplate 生成 Request对象。Request对象将交给Client#execute

五、调用Client组件

追踪上边的接口:Client client(在feign包中定义的)。

Feign最终发送request请求以及接收response响应,都是由Client组件完成的。Client的默认实现类是Client.Default(feign包里),该类由HttpURLConnnection实现网络请求,另外还支持HttpClient、Okhttp。

首先来看以下在FeignRibbonClient的自动配置类,FeignRibbonClientAutoConfiguration

主要在工程启动的时候注入一些bean 

@ConditionalOnClass({ ILoadBalancer.class, Feign.class })
@Configuration
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FeignRibbonClientAutoConfiguration {
        @Bean
	@ConditionalOnMissingBean
	public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
			SpringClientFactory clientFactory) {
		return new LoadBalancerFeignClient(new Client.Default(null, null),
				cachingFactory, clientFactory);
	}
}

在缺失配置feignClient的情况下,会自动注入Client.Default(feign包下)

public interface Client {
  Response execute(Request request, Options options) throws IOException;

  class Default implements Client {

    private final SSLSocketFactory sslContextFactory;
    private final HostnameVerifier hostnameVerifier;
	
    public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
      this.sslContextFactory = sslContextFactory;
      this.hostnameVerifier = hostnameVerifier;
    }

    @Override
    public Response execute(Request request, Options options) throws IOException {
      HttpURLConnection connection = convertAndSend(request, options);
      return convertResponse(connection, request);
    }
	
	HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
      final HttpURLConnection connection =
          (HttpURLConnection) new URL(request.url()).openConnection();
		  
	  // 若是https协议,设置ssl
      if (connection instanceof HttpsURLConnection) {
        HttpsURLConnection sslCon = (HttpsURLConnection) connection;
        if (sslContextFactory != null) {
          sslCon.setSSLSocketFactory(sslContextFactory);
        }
        if (hostnameVerifier != null) {
          sslCon.setHostnameVerifier(hostnameVerifier);
        }
      }
	  
	  //设置连接超时时间、读超时时间、请求方法等
      connection.setConnectTimeout(options.connectTimeoutMillis());
      connection.setReadTimeout(options.readTimeoutMillis());
      connection.setAllowUserInteraction(false);
      connection.setInstanceFollowRedirects(options.isFollowRedirects());
      connection.setRequestMethod(request.httpMethod().name());
	  
	  // 看header中是否有"Content-Encoding"项,若有则使用其编码("gzip"或"deflate")
      Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
      boolean gzipEncodedRequest =
          contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
      boolean deflateEncodedRequest =
          contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);

	  // 设置请求头部
      boolean hasAcceptHeader = false;
      Integer contentLength = null;
      for (String field : request.headers().keySet()) {
        if (field.equalsIgnoreCase("Accept")) {
          hasAcceptHeader = true;
        }
        for (String value : request.headers().get(field)) {
          if (field.equals(CONTENT_LENGTH)) {
            if (!gzipEncodedRequest && !deflateEncodedRequest) {
              contentLength = Integer.valueOf(value);
              connection.addRequestProperty(field, value);
            }
          } else {
            connection.addRequestProperty(field, value);
          }
        }
      }
      // Some servers choke on the default accept string.
      if (!hasAcceptHeader) {
        connection.addRequestProperty("Accept", "*/*");
      }
	  // 若有请求体,详细设置:选择gzip/deflater压缩等
      if (request.requestBody().asBytes() != null) {
        if (contentLength != null) {
          connection.setFixedLengthStreamingMode(contentLength);
        } else {
          connection.setChunkedStreamingMode(8196);
        }
        connection.setDoOutput(true);
        OutputStream out = connection.getOutputStream();
        if (gzipEncodedRequest) {
          out = new GZIPOutputStream(out);
        } else if (deflateEncodedRequest) {
          out = new DeflaterOutputStream(out);
        }
        try {
          out.write(request.requestBody().asBytes());
        } finally {
          try {
            out.close();
          } catch (IOException suppressed) { // NOPMD
          }
        }
      }
      return connection;
    }

    Response convertResponse(HttpURLConnection connection, Request request) throws IOException {
      int status = connection.getResponseCode();
      String reason = connection.getResponseMessage();

      if (status < 0) {
        throw new IOException(format("Invalid status(%s) executing %s %s", status,
            connection.getRequestMethod(), connection.getURL()));
      }

      Map<String, Collection<String>> headers = new LinkedHashMap<String, Collection<String>>();
      for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {
        // response message
        if (field.getKey() != null) {
          headers.put(field.getKey(), field.getValue());
        }
      }

      Integer length = connection.getContentLength();
      if (length == -1) {
        length = null;
      }
      InputStream stream;
      if (status >= 400) {
        stream = connection.getErrorStream();
      } else {
        stream = connection.getInputStream();
      }
      return Response.builder()
          .status(status)
          .reason(reason)
          .headers(headers)
          .request(request)
          .body(stream, length)
          .build();
    }
  }
}

它使用的网络请求框架为HttpURLConnection。

怎么在feign中使用HttpClient

查看FeignRibbonClientAutoConfiguration的源码

@ConditionalOnClass({ ILoadBalancer.class, Feign.class })
@Configuration
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FeignRibbonClientAutoConfiguration {
    //省略代码

    @Configuration
	@ConditionalOnClass(ApacheHttpClient.class)
	@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
	protected static class HttpClientFeignLoadBalancedConfiguration {

		@Autowired(required = false)
		private HttpClient httpClient;

		@Bean
		@ConditionalOnMissingBean(Client.class)
		public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
				SpringClientFactory clientFactory) {
			ApacheHttpClient delegate;
			if (this.httpClient != null) {
				delegate = new ApacheHttpClient(this.httpClient);
			}
			else {
				delegate = new ApacheHttpClient();
			}
			return new LoadBalancerFeignClient(delegate, cachingFactory, clientFactory);
		}
	}

//省略代码
}

从代码@ConditionalOnClass(ApacheHttpClient.class)注解可知道,只需要在pom文件加上HttpClient的classpath就行了,另外需要在配置文件上加上feign.httpclient.enabled为true,从 @ConditionalOnProperty注解可知,这个可以不写,在默认的情况下就为true.

在pom文件加上:

<dependency>
    <groupId>com.netflix.feign</groupId>
    <artifactId>feign-httpclient</artifactId>
    <version>RELEASE</version>
</dependency>

同理,若想要feign使用Okhttp,则只需要在pom文件上加上feign-okhttp的依赖:

<dependency>
    <groupId>com.netflix.feign</groupId>
    <artifactId>feign-okhttp</artifactId>
    <version>RELEASE</version>
</dependency>

六、Feign的负载均衡

通过上述的FeignRibbonClientAutoConfiguration类配置Client的类型(httpurlconnection,okhttp和httpclient)时候,可知最终向容器注入的是LoadBalancerFeignClient,即负载均衡客户端。

“第四部分:拦截@FeignClient修饰接口中的方法”最后,可以看到最终调用Client#execute

LoadBalancerFeignClient

@Override
public Response execute(Request request, Request.Options options) throws IOException {
	try {
		URI asUri = URI.create(request.url());
		String clientName = asUri.getHost();
		URI uriWithoutHost = cleanUrl(request.url(), clientName);
		FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
				this.delegate, request, uriWithoutHost);

		IClientConfig requestConfig = getClientConfig(options, clientName);
		return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
				requestConfig).toResponse();
	}
	catch (ClientException e) {
		IOException io = findIOException(e);
		if (io != null) {
			throw io;
		}
		throw new RuntimeException(e);
	}
}

executeWithLoadBalancer()方法(AbstractLoadBalancerAwareClient#executeWithLoadBalancer),即通过负载均衡的方式请求:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
	RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
	LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
			.withLoadBalancerContext(this)
			.withRetryHandler(handler)
			.withLoadBalancerURI(request.getUri())
			.build();

	try {
		return command.submit(
			new ServerOperation<T>() {
				@Override
				public Observable<T> call(Server server) {
					URI finalUri = reconstructURIWithServer(server, request.getUri());
					S requestForServer = (S) request.replaceUri(finalUri);
					try {
						return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
					} 
					catch (Exception e) {
						return Observable.error(e);
					}
				}
			})
			.toBlocking()
			.single();
	} catch (Exception e) {
		Throwable t = e.getCause();
		if (t instanceof ClientException) {
			throw (ClientException) t;
		} else {
			throw new ClientException(e);
		}
	}
}	

submit()  //LoadBalancerCommand#submit

需要注意的是: LoadBalancerCommand所在包为:ribbon-loadbalance-xxx.jar。说明,feign是通过调用了Ribbon的负载均衡。

Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server))
		                .concatMap(new Func1<Server, Observable<T>>() {
	// Called for each server being selected
	public Observable<T> call(Server server) {
		context.setServer(server); 
		final ServerStats stats = loadBalancerContext.getServerStats(server);
		// Called for each attempt and retry
		Observable<T> o = Observable
				.just(server)
				.concatMap(new Func1<Server, Observable<T>>() {
			//onCompleted、onError、onNext、recordStats等设置 
			//执行retryPolicy等
		}
	}
}

上述代码中有个selectServer()(LoadBalancerCommand#selectServer()),该方法是选择服务的进行负载均衡的方法,代码如下:

private Observable<Server> selectServer() {
	return Observable.create(new OnSubscribe<Server>() {
		@Override
		public void call(Subscriber<? super Server> next) {
			try {
				Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
				next.onNext(server);
				next.onCompleted();
			} catch (Exception e) {
				next.onError(e);
			}
		}
	});
}

详见:Spring Cloud Ribbon–负载均衡的原理 – 自学精灵

0

评论0

请先

显示验证码
没有账号?注册  忘记密码?

社交账号快速登录