当前位置: 首页 > news >正文

ServiceComb场景及其原理

文章目录

  • Java-Chassis
    • @EnableServiceComb初始化SCB
    • SPIServiceUtils自定义SPI加载器
    • 职责链管理器FilterChainsManager/ConsumerHandlerManager
    • @RpcSchema
    • 注册服务如何保活?
    • @RpcReference
    • PropertySourcesPlaceholderConfigurer
    • ThreadPoolExecutorEx/LinkedBlockingQueueEx
    • ConcurrentHashMapEx
    • AopProxyUtils/BeanUtils
    • ReflectionUtils
    • StringValueResolver

Java-Chassis

ServiceComb中是实现RPC的框架类似Dubbo,又称Java底座。

@EnableServiceComb初始化SCB

通过@ImportResource的方式导入ServiceComb的核心Bean定义文件"classpath*:META-INF/spring/scb-core-bean.xml";和"classpath*:META-INF/spring/*.bean.xml";,注入如下Bean定义。

  • ConfigurationSpringInitializer

  • CseApplicationListener,ContextRefreshedEvent事件监听器

    • 初始化相关的连接

      if (this.applicationContext == applicationContext) {
            // same object. avoid initialize many times.
            return;
          }
          this.applicationContext = applicationContext;
          BeanUtils.setContext(applicationContext);
          HttpClients.load(); // Http相关的
          RegistrationManager.INSTANCE.init(); // 服务注册 管理初始化,通过SPI获取。
          DiscoveryManager.INSTANCE.init(); // 服务发现 管理初始化,通过SPI获取。
      
    • ContextRefreshedEvent Bean实例化完事件触达,初始化ServiceComb引擎SCBEngine

      @Override
        public void onApplicationEvent(ApplicationEvent event) {
          if (initEventClass.isInstance(event)) {
            if (applicationContext instanceof AbstractApplicationContext) {
              ((AbstractApplicationContext) applicationContext).registerShutdownHook();
            }
      
            SCBEngine scbEngine = SCBEngine.getInstance(); // 获取单例
            //SCBEngine init first, hence we do not need worry that when other beans need use the
            //producer microserviceMeta, the SCBEngine is not inited.
      //        String serviceName = RegistryUtils.getMicroservice().getServiceName();
      //        SCBEngine.getInstance().setProducerMicroserviceMeta(new MicroserviceMeta(serviceName).setConsumer(false));
      //        SCBEngine.getInstance().setProducerProviderManager(applicationContext.getBean(ProducerProviderManager.class));
      //        SCBEngine.getInstance().setConsumerProviderManager(applicationContext.getBean(ConsumerProviderManager.class));
      //        SCBEngine.getInstance().setTransportManager(applicationContext.getBean(TransportManager.class));
            scbEngine.setApplicationContext(applicationContext);
            scbEngine.setPriorityPropertyManager(applicationContext.getBean(PriorityPropertyManager.class));
            scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class)); // 指责链管理类
            scbEngine.getConsumerProviderManager().getConsumerProviderList()
                .addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values()); // 获取支持的Consumer形式,如Rest/Pojo
            scbEngine.getProducerProviderManager().getProducerProviderList()
                .addAll(applicationContext.getBeansOfType(ProducerProvider.class).values()); // // 获取Provider注册的元信息,支持从Rest/Pojo中获取
            scbEngine.addBootListeners(applicationContext.getBeansOfType(BootListener.class).values()); // 获取全量SCB初始化监听器BootListener,用于监听SCB在生命周期内的各种事件。
      
            scbEngine.run();
          } else if (event instanceof ContextClosedEvent) {
            if (SCBEngine.getInstance() != null) {
              SCBEngine.getInstance().destroy();
            }
          }
        }
      
  • org.apache.servicecomb.core.executor.GroupExecutor

SPIServiceUtils.getOrLoadSortedService(Registration.class)

SPIServiceUtils自定义SPI加载器

ServiceComb大量使用SPI动态扩展对应的实现,SPI定义的使能及优先级,从全部jar包中

META-INF/services的路径文件中加载扩展类,文件名为接口类的全路径里面内容为接口实现。

  • SPIOrder,加载SPI的顺序,排序从小到大,order数值越小,优先级越高,调用越靠前。

  • SPIEnabled,当前SPI是否使能。

  • SPIServiceUtils,代理到JDK自带的ServiceLoader动态加载SPI类,并使用ReflectionUtils用于获取SPIOrder排序对应的实现类

    // 加载SPI全部类
    public static <T> List<T> loadSortedService(Class<T> serviceType) {
        List<Entry<Integer, T>> serviceEntries = new ArrayList<>();
        ServiceLoader<T> serviceLoader = ServiceLoader.load(serviceType); // 代理到JDK
        serviceLoader.forEach(service -> {
          int serviceOrder = 0;
          Method getOrder = ReflectionUtils.findMethod(service.getClass(), "getOrder"); // 默认为0,如果获取方法
          if (getOrder != null) {
            serviceOrder = (int) ReflectionUtils.invokeMethod(getOrder, service); // 触发方法
          }
    
          Entry<Integer, T> entry = new SimpleEntry<>(serviceOrder, service);
          serviceEntries.add(entry);
        });
    
        List<T> services = serviceEntries.stream()
            .sorted(Comparator.comparingInt(Entry::getKey))
            .map(Entry::getValue)
            .collect(Collectors.toList()); // 根据 getOrder 排序,并返回全部的serviceType。
    
        LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size());
        for (int idx = 0; idx < services.size(); idx++) {
          T service = services.get(idx);
          LOGGER.info("  {}. {}.", idx, service.getClass().getName());
        } // 输出加载的日志及其顺序,方便定位调试功能
    
        return services;
      }
    // Map<Class<?>, List<Object>> cache = new ConcurrentHashMap<>();
    // loadSortedService 并非线程安全,因此添加类锁
    public static <T> List<T> getOrLoadSortedService(Class<T> serviceType) {
        List<Object> services = cache.get(serviceType);
        if (services == null) {
          synchronized (LOCK) {
            services = cache.get(serviceType);
            if (services == null) {
              services = (List<Object>) loadSortedService(serviceType);
              cache.put(serviceType, services);
            }
          }
        }
    
    

SPI加载类的全量日志:


在这里插入图片描述
在这里插入图片描述

职责链管理器FilterChainsManager/ConsumerHandlerManager

不论在服务的提供方或者在服务的调用方,在触发请求或者响应请求的时候,都会通过职责链的方式顺序调用各种Handler,框架动态从handler.xml文件中读取Handler算子,支持在yaml文件中通过配置文件动态装配,非常灵活。

  • 实现了Handler仓库HandlerConfigUtils。PaaSResourceUtils继承Spring ResourceUtils,读取资源文件

    private static Config loadConfig() throws Exception {
        Config config = new Config();
    	// 1、读取配置文件 classpath* 代表在全量jar包中寻找
    	// 2、classpath*:config/cse.handler.xml   classpath*:config/cse.*.handler.xml
    	// 3、PathMatchingResourcePatternResolver 加载全部的Resource资源路径
        // 4、排序资源的路径(xxxx.handler.cse,根据名字xxx进行排序)
        List<Resource> resList =
            PaaSResourceUtils.getSortedResources("classpath*:config/cse.handler.xml", ".handler.xml");
        for (Resource res : resList) {
          // 5. 通过XmlMapper读取handler.xml文件映射到Config类中
          Config tmpConfig = XmlLoaderUtils.load(res, Config.class);
          config.mergeFrom(tmpConfig);
        }
    
  • Handler算子的编排,使用ConsumerHandlerManager和ProducerHandlerManager读取yaml中的配置文件,通过名字对Handler进行编排,Handler并未实现Order接口,调用顺序交给编排方。

    • Consumer/Provider支持自定义ConsumerHandlerManager/ProducerHandlerManager类生成职责链列表,最后的Consumer Handler-TransportClientHandler.INSTANCE, 最后的Provider Handler–ProducerOperationHandler.INSTANCE

      以下动态创建职责链配置:

      // 职责链Key,配置在yaml文件中
      protected List<Handler> create(String microserviceName) {
          // 是否定义服务对应的handler编排servicecomb.handler.chain.Provider.service.calculator
          // 没有则使用默认的 servicecomb.handler.chain.Provider.default
          String handlerChainKey = "servicecomb.handler.chain." + getName() + ".service." + microserviceName;
          String chainDef = DynamicPropertyFactory.getInstance()
              .getStringProperty(handlerChainKey,
                  defaultChainDef)
              .get();
          LOGGER.info("get handler chain for [{}]: [{}]", handlerChainKey, chainDef);
          return createHandlerChain(chainDef);
        }
      

BootListener

PojoInvocation保存List<Handler>职责链,并实现

@RpcSchema

@RpcSchema(schemaId = "hello")
public class HelloImpl implements Hello {

  @Override
  public String sayHi(String name) {
    return "Hello " + name;
  }

  @Override
  public String sayHello(Person person) {
    return "Hello person " + person.getName();
  }
}
  • 注解会被PojoProducers这个BeanPostProcessor处理,用于往注册中心注册服务。

    protected void processProvider(String beanName, Object bean) {
        // 1、aop后,新的实例的父类可能是原class,也可能只是个proxy,父类不是原class,所以,需要先取出原class,再取标注
        // 调用 AopProxyUtils.ultimateTargetClass
        Class<?> beanCls = BeanUtils.getImplClassFromBean(bean);
        if (beanCls == null) {
          return;
        }
        RpcSchema rpcSchema = beanCls.getAnnotation(RpcSchema.class);
        if (rpcSchema == null) {
          return;
        }
    
        // 2、获取schemaId,如果没有传递则获取接口名
        String schemaId = rpcSchema.schemaId();
        if (StringUtils.isEmpty(schemaId)) {
          Class<?>[] intfs = beanCls.getInterfaces();
          if (intfs.length == 1) {
            schemaId = intfs[0].getName();
          } else {
            throw new Error("Must be schemaId or implements only one interface");
          }
        }
    
        // 3、注册producer元信息
        PojoProducerMeta pojoProducerMeta = new PojoProducerMeta();
        pojoProducerMeta.setSchemaId(schemaId);
        pojoProducerMeta.setSchemaInterface(rpcSchema.schemaInterface());
        pojoProducerMeta.setInstance(bean);
    
        registerPojoProducer(pojoProducerMeta);
      }
    
  • 在SCB启动的时候,scbEngine.getProducerProviderManager会获取PojoProducerProvider -> 读取PojoProducers中注册的全部元信息。

    public List<ProducerMeta> init() {
        // for some test cases, there is no spring context
        if (BeanUtils.getContext() == null) {
          return Collections.emptyList();
        }
    
        PojoProducers pojoProducers = BeanUtils.getContext().getBean(PojoProducers.class);
        for (ProducerMeta producerMeta : pojoProducers.getProducerMetas()) {
          PojoProducerMeta pojoProducerMeta = (PojoProducerMeta) producerMeta;
          initPojoProducerMeta(pojoProducerMeta);
        }
    
        return pojoProducers.getProducerMetas();
      }
    
  • SCB.Run的时候,会通过获取到的元信息,将其注册到注册中心上对外提供服务。

注册服务如何保活?

@RpcReference

@RpcReference(microserviceName = "hello", schemaId = "hello")
private static Hello hello;

Pojo形式的Rpc消费方的注解,将自动从注册中心拉取对应的服务名,动态代理Compute并注入代理类。

  • 注解会被RpcReferenceProcessor这个BeanPostProcessor处理,动态代理增强Hello。十分注意StringValueResolver支持动态解析占位符。

    public class RpcReferenceProcessor implements BeanPostProcessor, EmbeddedValueResolverAware {
      private StringValueResolver resolver;
     @Override
      public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 扫描所有field,处理扩展的field标注
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
          public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
            processConsumerField(bean, field);
          }
        });
    
        return bean;
      }
      protected void processConsumerField(Object bean, Field field) {
        RpcReference reference = field.getAnnotation(RpcReference.class);
        if (reference == null) {
          return;
        }
    
        handleReferenceField(bean, field, reference);
      }
      private void handleReferenceField(Object obj, Field field,
          RpcReference reference) {
        String microserviceName = reference.microserviceName();
        microserviceName = resolver.resolveStringValue(microserviceName);
    
        PojoReferenceMeta pojoReference = new PojoReferenceMeta();
        pojoReference.setMicroserviceName(microserviceName);
        pojoReference.setSchemaId(reference.schemaId());
        pojoReference.setConsumerIntf(field.getType());
    
    	// proxy = Invoker.createProxy(microserviceName, schemaId, consumerIntf); 动态代理
        pojoReference.afterPropertiesSet();
    
        ReflectionUtils.makeAccessible(field);
        ReflectionUtils.setField(field, obj, pojoReference.getProxy());
      }
     
     
     }
    

PropertySourcesPlaceholderConfigurer

BeanFactoryPostProcessor,用于解析Bean对应的Property。

  • @Value注解,会被AutowiredAnnotationBeanPostProcessor解析,并替换占位符注入合适的值。
  • XML配置如果含有属性注入配置,则PropertySourcesPlaceholderConfigurer#postProcessBeanFactory会被替换占位符注入合适的值。

ThreadPoolExecutorEx/LinkedBlockingQueueEx

ThreadPoolExecutorEx

背景:扩展的ThreadPool,在coreSize到达的时候,并发继续增加,会优先创建Thread直到达到maxSize,才会放入BlockQueue,Dubbo和Tomcat都扩展了类似的功能,当BlockQueue的size为无限大的时候,避免Thread永远无法到达maxSize。

实现:默认ThreadPool按照如下逻辑执行Task。

public void execute(Runnable command) {		
		int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 1、workTask小于coreSize,则直接创建新的Thread执行command。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 2、已达到coreSize,则放入BlockQueue调用offer方法。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 3、BlockQueue放满,则继续添加worker,直到达到最大。
            reject(command);
}

如上要点在第二步,只需要修改workQueue的offer逻辑,就可以优先创建Thead,可使用扩展的LinkedBlockingQueueEx。

在offer的时候,获取TheadPool的size,当小于最大的时候,直接创建新的Thead,执行Task。

public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {
  private transient volatile ThreadPoolExecutorEx owner = null;
  public void setOwner(ThreadPoolExecutorEx owner) {
    this.owner = owner;
  }

  @Override
  public boolean offer(Runnable runnable) {
    // task can come before owner available
    if (owner == null) {
      return super.offer(runnable);
    }
    // can not create more thread, just queue the task
    if (owner.getPoolSize() == owner.getMaximumPoolSize()) {
      return super.offer(runnable);
    }
    // no need to create more thread, just queue the task
    if (owner.getNotFinished() <= owner.getPoolSize()) {
      return super.offer(runnable);
    }
    // all threads are busy, and can create new thread, not queue the task
    if (owner.getPoolSize() < owner.getMaximumPoolSize()) {
      return false;
    }
    return super.offer(runnable);
  }

  /*
   * when task is rejected (thread pool if full), force the item onto queue.
   */
  public boolean force(Runnable runnable) {
    if (owner == null || owner.isShutdown()) {
      throw new RejectedExecutionException("queue is not running.");
    }
    return super.offer(runnable);
  }
}

相应的扩展的TheadPool也override部分方法,用于当前提交的任务总数,完成的人数总数,拒绝的任务总数。

public class ThreadPoolExecutorEx extends ThreadPoolExecutor {
  private AtomicInteger submittedCount = new AtomicInteger();

  private AtomicInteger finishedCount = new AtomicInteger();

  private AtomicInteger rejectedCount = new AtomicInteger();

  public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,
      BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
    super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);
    if (queue instanceof LinkedBlockingQueueEx) {
      ((LinkedBlockingQueueEx) queue).setOwner(this);
    }
    setRejectedExecutionHandler(this::rejectedExecution);
  }

  @Override
  public void execute(Runnable command) {
    submittedCount.incrementAndGet();
    try {
      super.execute(command);
    } catch (RejectedExecutionException e) {
      if (getQueue() instanceof LinkedBlockingQueueEx) {
        final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();
        if (!queue.force(command)) {
          throw new RejectedExecutionException("thread pool queue is full");
        }
      } else {
        throw e;
      }
    }
  }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    rejectedCount.incrementAndGet();
    finishedCount.incrementAndGet();

    throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " +
        e.toString());
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    finishedCount.incrementAndGet();
  }

  public int getNotFinished() {
    return submittedCount.get() - finishedCount.get();
  }

  public int getRejectedCount() {
    return rejectedCount.get();
  }
}

ConcurrentHashMapEx

默认的ConcurrentHashMap当Key都在同一个桶的时候,调用computeIfAbsent的时候,都会对当前的桶加锁(分段锁),当在高并发读多余写的场景,这里的加锁会影响单个桶的性能,因此可以优先通过CAS获取下元素,然后再调用默认方法,扩展性能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dyFvn88b-1670145579656)(images/image-20221204161851865.png)]

  // ConcurrentHashMap.computeIfAbsent always do "synchronized" operation
  // so we wrap it to improve performance
  @Override
  public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
    V value = get(key); // CAS GET
    if (value != null) {
      return value;
    }

    return super.computeIfAbsent(key, mappingFunction);
  }

AopProxyUtils/BeanUtils

ReflectionUtils

StringValueResolver

相关文章:

  • 三菱FX5U PLSV指令-可变速度输出
  • ES6的symbol及es2021
  • leetcode每天5题-Day55-动态规划1
  • QDir(目录)
  • LeetCode每日一题——1774. 最接近目标价格的甜点成本
  • [附源码]JAVA毕业设计口腔医院网站(系统+LW)
  • 电子学会2021年3月青少年软件编程(图形化)等级考试试卷(四级)答案解析
  • 微服务架构
  • 研发效能工程实践-利用Superset快速打造大数据BI平台
  • Springboot RabbitMq源码解析之RabbitListener注解 (四)
  • MedNeRF:用于从单个X射线重建3D感知CT投影的医学神经辐射场
  • 性能测试工具:如何录制脚本?
  • Dubbo SPI扩展机制源码详解(基于2.7.10)
  • 创建你的第⼀个XXL-Job分布式调度任务
  • 基于X86的运算板卡加速边缘智能应用
  • 基于MCMC的交通量逆建模(Matlab代码实现)
  • [附源码]计算机毕业设计校园代取快递系统Springboot程序
  • MapStruct与lombok加载顺序问题与annotationProcessorPaths的关系?
  • Express:CORS 跨域资源共享
  • SpringBoot+Vue项目便捷洗衣服务平台
  • 5 年经验年薪百万,一位阿里 P8 分享自己的成长干货
  • 【C++笔试强训】第三天
  • brew换源
  • G银行借助光伏互联网平台发展户用光伏金融业务,加速绿色转型 | 案例研究
  • JAVA基础语法以及一些常见的练习
  • 如何在 Spring Boot 项目中使用 Thymeleaf 和 Bootstrap 实现文件上传
  • 智能电销机器人《各版本机器人部署》
  • 微信小程序的生命周期概览
  • 【华为上机真题 2022】数组组成的最小数字
  • Pr:导出设置之音频
  • 2022年福建美术联考考试题目
  • 复旦大学分数线2020 多少分能考上
  • 西安大学现在叫什么
  • 10岁女孩学声乐好吗 要注意什么
  • 上海视觉艺术学院2022艺术类专业各省级统考子科类对照表
  • 宁夏高考591分能上什么大学,高考591分左右可以上的学校有哪些
  • 2022专科提前批适合什么学生 有哪些条件
  • 2022黑龙江高考报名人数预测 报考人数预计多少
  • 生命的
  • 00到我们学校来上课