当前位置: 首页 > news >正文

再了解一点Ribbon的重试机制

简介

在我们通过http的各种客户端进行调用的时候,难免会出现网络等各种偶发性的问题,如果没有重试的操作,当前这次请求只能是失败,从而在数据上或者用户页面上带来的数据和体验问题。
如果通过程序来设置出现异常的情况下,进行接口重试,从而保证它的可用性。

原理

我们使用ribbon来实现负载均衡以及远程服务接口的重试。关于ribbon负载均衡的大家可以看这篇文档Ribbon负载均衡原理分析仅供参考。

LoadBalancerFeignClient

我们还是要从ribbon和feign的整合的client地方说起。

public Response execute(Request request, Request.Options options) throws IOException {
		try {
			URI asUri = URI.create(request.url());
			String clientName = asUri.getHost();
			URI uriWithoutHost = cleanUrl(request.url(), clientName);
			//这里的delegate就是client的代理类
			//比如okhttpclient
			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
					this.delegate, request, uriWithoutHost);

			IClientConfig requestConfig = getClientConfig(options, clientName);
			//lbClient获取的就是负载均衡器 FeignLoadBalancer
			return lbClient(clientName)
					.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (ClientException e) {
			//省略。。。。
		}
	}

接着调用AbstractLoadBalancerAwareClient.executeWithLoadBalancer

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
            	//这里是负载均衡后或者重试后调用的回调函数
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                    	//封装最后调用的URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                        	//这里调用的是FeignLoadBalancer负载均衡器,最终是在它里面调用具体的client
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                //阻塞等待
                .toBlocking()
                //订阅Observable,也就是command.submit返回的Observable
                .single();
        } catch (Exception e) {
            //省略...
        }
        
    }

ribbon的负载均衡和重试逻辑都在submit方法体内,LoadBalancerCommand内完成重试逻辑的处理。
submit方法是真的长,其实里面全是回调函数,我们省略一些回调函数,看一些主要的业务逻辑。

public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        //省略
        //下面这2个值都是ribbon可配置的,我们设置重试次数就是通过这2个参数设置的
		//同一个节点,重试次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        //不同节点重试次数
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // selectServer负载均衡,从服务列表中,按照irule实现,选择对应可用的server
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        //省略...
                                        
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //调用远程服务,最终调用command.submit中的回调函数,往上面看
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            //省略...
                                    }
                                });
                        
                        //设置retry
                        //同一个节点重试
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
         //如果未选择到server,那么进行其他节点的重试,会重新调用selectServer()的流程
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            //省略....
    }

client调用

submit的回调函数最终会调到FeignLoadBalancer的execute方法

public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
			throws IOException {
		Request.Options options;
		if (configOverride != null) {
			//设置request的参数
			RibbonProperties override = RibbonProperties.from(configOverride);
			options = new Request.Options(override.connectTimeout(this.connectTimeout),
					override.readTimeout(this.readTimeout));
		}
		else {
			options = new Request.Options(this.connectTimeout, this.readTimeout);
		}
		//使用不同的client调用远程服务
		Response response = request.client().execute(request.toRequest(), options);
		return new RibbonResponse(request.getUri(), response);
	}

可根据需要配置不同的客户端,默认是ApacheHttpClient

feign.okhttp.enabled=true
feign.httpclient.enabled=false
feign.httpclient.connection-timeout=1000
feign.httpclient.max-connections=1000

可根据具体业务场景配置不同的参数。

重试

ribbon的重试是通过RetryHandler实现的。

public interface RetryHandler {

    public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
    
    //判断当前这个异常情况下,是否需要重试
    public boolean isRetriableException(Throwable e, boolean sameServer);
    public boolean isCircuitTrippingException(Throwable e);
        
    //同一个节点重试次数
    public int getMaxRetriesOnSameServer();

    //其他节点重试次数
    public int getMaxRetriesOnNextServer();
}

再来看一下Ribbon调用的时候使用的RetryHandler使用的是哪个实现。

protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
		RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
		LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
				.withLoadBalancerContext(this)
				.withRetryHandler(handler)
				.withLoadBalancerURI(request.getUri());
		customizeLoadBalancerCommandBuilder(request, config, builder);
		return builder.build();
	}

从这里我们可以看到RetryHandler使用的是RequestSpecificRetryHandler,继续再看下它的内部逻辑是什么?

public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
			RibbonRequest request, IClientConfig requestConfig) {
		if (this.ribbon.isOkToRetryOnAllOperations()) {
			return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
					requestConfig);
		}
		if (!request.toRequest().httpMethod().name().equals("GET")) {
			return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
					requestConfig);
		}
		else {
			return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
					requestConfig);
		}
	}

从上述代码我们可以判断出3个点

  • 如果设置了OkToRetryOnAllOperations=true, 那么连接异常和其他的异常全部重试
  • 如果OkToRetryOnAllOperations=false,并且不是get请求的情况下,只重试连接异常场景,其他情况下不进行重试
  • 如果OkToRetryOnAllOperations=false, 并且是get请求的情况,那么连接异常和其他的异常全部可重试

继续看它的判断是否可以重试的方法逻辑

public boolean isRetriableException(Throwable e, boolean sameServer) {
        if (okToRetryOnAllErrors) {
            return true;
        } 
        else if (e instanceof ClientException) {
            ClientException ce = (ClientException) e;
            if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
                return !sameServer;
            } else {
                return false;
            }
        } 
        else  {
            return okToRetryOnConnectErrors && isConnectionException(e);
        }
    }
  • 如果okToRetryOnAllErrors=true, 全部重试
  • 如果okToRetryOnAllErrors=false,并且是ClientException,并且错误code是SERVER_THROTTLED,其他异常code的情况下不重试
  • 如果也不是ClientException,判断是否要处理连接异常场景
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));

是否需要重试是根据submit中的上面截图的部分进行处理的。
附上重试相关的配置参数

ribbon.ConnectTimeout=1000
ribbon.ReadTimeout=1000
ribbon.MaxAutoRetries=1
ribbon.MaxAutoRetriesNextServer=0
ribbon.OkToRetryOnAllOperations=false

IClientConfig类可参考RibbonClientConfiguration自动装配的地方。

RxJava

说一点题外话,刚刚我们在看submit相关代码的时候,看的真是头疼,各种Function,各种回调函数,如果仅仅是看代码实在是太难了,必须通过参数以及debug才能一步一步的看明白。
RxJava是个什么呢?
它是一个做异步的框架,类似于AsyncTask。与java设计模式中的观察者模式一样,或者就是观察者模式,它是有Observer(观察者)和Observable(被观察者)对象组成。
在其回调方法中,有如下方法:

  • onCompleted: 事件队列完结,不会再触发onNext的事件
  • onNext: 普通事件信息
  • onError: 异常了
    我们在上面代码的时候,submit返回的就是一个Obserable对象,那么后续肯定要subscirbe方法,订阅这个事件,或者添加观察者对象,当触发对应事件后,就会监听到事件的变化,做对应的逻辑操作。
    观察者模式是解耦了,但是在看代码的时候着实难受,有兴趣的朋友大家仔细研究吧。
    重试就到这里了,喜欢的朋友点个收藏,多谢。

相关文章:

  • MySQL5.7安装教程
  • Python 3.9 有什么新变化 - 新的特性
  • 为什么我们认为GPT是一个技术爆炸
  • JavaSE进阶之(十六)枚举
  • [数据结构]直接插入排序、希尔排序
  • 10个杀手级应用的Python自动化脚本
  • Python截图自动化工具
  • avue强大的upload表单项内置方法的使用,用这个可以配合$DialogForm.show()弹框完美使用上传控件
  • C语言高级操作(四)——exex函数族
  • 一行代码“黑”掉任意网站
  • 蓝桥杯刷题冲刺 | 倒计时18天
  • 软件测试拿了几个20K offer,分享一波面经
  • 【面试题】大厂面试官:你做过什么有亮点的项目吗?
  • vue后台管理系统
  • ChatGPT-4.0 : 未来已来,你来不来
  • Servlet的详细使用
  • 按键修改阈值功能、报警功能、空气质量功能实现(STM32)
  • 第十四届蓝桥杯模拟赛【第三期】Python
  • 【C++】初识模板
  • 南京邮电大学数据库第三次课后作业
  • HTB-baby~
  • Docker使用DockerFile部署Go项目
  • 自己设计的网站,如何实现分页功能?(详细代码+注释)
  • O2O、C2C、B2B、B2C是什么意思
  • 带你一文透彻学习【PyTorch深度学习实践】分篇——加载数据集(Dataset类、DataLoader类核心参数讲解)附:实例源代码
  • 基于html+css的图片居中排列
  • Python通用验证码识别OCR库ddddocr的安装使用
  • 「无服务器架构」无服务器架构是应用程序的正确选择?需要考虑利弊。
  • Pycharm中修改git提交代码的账户和用户名【Git追溯注解,git blame】
  • 中介变量、调节变量与协变量