分布式事务的几种实现(阿里seata)

wuchangjian2021-11-04 10:10:23编程学习

seata的部署和配置

Seata中文官网

TCC模式的实现

通过阿里的Seata去运用TCC模式

一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

定义TCC接口

由于我们使用的是SpringCloud+Feign,Feign的调用基于http,因此此处我们使用LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可。

  • @LocalTCC 适用于SpringCloud+Feign模式下的TCC
  • @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(记得全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
  • @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
    BusinessActionContext 便是指TCC事务上下文

下面根据模型定义Service和实现,定义三个方法,prepare行为,commit和rollback方法。

@LocalTCC
public interface TccService {
    /**
     * 定义两阶段提交prepare方法
     * name = 该tcc的bean名称,全局唯一
     * commitMethod = commit 为二阶段确认方法
     * rollbackMethod = rollback 为二阶段取消方法
     * BusinessActionContextParameter注解 传递参数到二阶段中
     *
     * @return String
     */
    @TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
    String insert(Long productId,Integer count);


    /**
     * commit方法、可以另命名,但要保证与commitMethod一致
     * context可以传递try方法的参数
     *
     * @param context 上下文
     * @return boolean
     */
    boolean commitTcc(BusinessActionContext context);
    /**
     * 二阶段rollback方法
     *
     * @param context 上下文
     * @return boolean
     */
    boolean cancel(BusinessActionContext context);



}

@Slf4j
@Service
public class TccServiceImpl implements TccService {

    @Autowired
    private ResourceStorageService resourceStorageService;

    /**
     * tcc服务t(try)方法
     * 根据实际业务场景选择实际业务执行逻辑或者资源预留逻辑
     *
     * @return String
     */
    @Override
    @PostMapping("/tcc-insert")
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public String insert(Long productId,Integer count) {
        log.info("xid = " + RootContext.getXID());
        resourceStorageService.decrease(productId, count);
        //放开以下注解抛出异常
//        throw new RuntimeException("服务tcc测试回滚");
        return "success";
    }

    /**
     * tcc服务 confirm方法
     * 若一阶段采用资源预留,在二阶段确认时要提交预留的资源
     *
     * @param context 上下文
     * @return boolean
     */
    @Override
    public boolean commitTcc(BusinessActionContext context) {
        log.info("xid = " + context.getXid() + "提交成功");
        // 若try成功,一阶段资源预留,这里则要提交资源
        return true;
    }

    /**
     * tcc 服务 cancel方法
     *
     * @param context 上下文
     * @return boolean
     */
    @Override
    public boolean cancel(BusinessActionContext context) {
        //todo 这里写回滚操作
        System.out.println("please manually rollback this data:" + context.getActionContext("params"));
        return true;
    }

}

提供调用方法:

@Api(tags = "资源库存")
@RestController
@RequestMapping("/storage")
public class StorageController {
    

    @Autowired
    private TccService tccService;

    /**
     * 扣减库存
     */
    @RequestMapping("/decrease")
    public CommonResult decrease(Long productId, Integer count) {
        
        tccService.insert(productId,count);
        return new CommonResult("扣减库存成功!",200);
    }
}

下面通过feign调用,这里写调用方:

@Api(tags = "分布式-接口测试demo")
@RestController
@RequestMapping(value = "/v1/distributed")
public class DistributedController {


    /**
     * 库存服务,减少库存
     */
    @Resource
    private FeignStorageService feignStorageService;

    /**
     * 增加一条主题内容
     */
    @Resource
    private TopicService topicService;

    @GetMapping("/transaction")
    @ApiOperation(value = "分布式事务seata测试")
    @GlobalTransactional(name = "fsp-tra", rollbackFor = Exception.class)
    public CommonResult transaction() {
        //减少库存
        feignStorageService.decrease(1L, 1);
        //增加一条topic
        TopicDO topicDO = new TopicDO();
        topicDO.setName("购物热");
        topicService.save(topicDO);
        //执行出错
        int a = 1 / 0;
        return CommonResult.success("");
    }

}

库存的feign调用客户端

/**
 * @author haitao.li
 * @description: 库存
 * @date 2021/10/25 10:26
 */
@FeignClient(value = "tcly-storage")
public interface FeignStorageService {

    /**
     * 扣减库存
     * @param productId 查询id
     * @param count 产品数量
     * @return 扣除结果
     */
    @RequestMapping("/storage/decrease")
    CommonResult<Object> decrease(@RequestParam Long productId,@RequestParam  Integer count);
}

上面的代码,调用的时候,通过@GlobalTransactional(name = “fsp-tra”, rollbackFor = Exception.class)进行全局事务管理。

下面根据以上代码进行测试。

调用到最后的时候,一行代码出错: int a = 1 / 0; 正常情况 下,扣减库存和插入topic都会失败

执行前的storage:

执行前的topic

执行后结果:

topic没有添加,但是库存扣减成功了(因为我这里的cancel方法里没有做实际的回滚操作,看下面,如果实际执行了cancel则说明TCC正常)。

下面是库存服务 中的两阶段结果:

全局事务执行成功,并且库存服务的两阶段模型执行了第二阶段的cancel操作。

如果开启了@LocalTCC,那么必须通过cancel方法进行回滚。

XA/AT(2PC)

一个下订单的例子,进行扣减库存,生成订单,余额扣除。

通过业务模块business进行分布式 事务全局管控,storage,account,order三个微服务之间通过feign调用并处理事务。

样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

  @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxy for AT mode
        // return new DataSourceProxy(druidDataSource);

        // DataSourceProxyXA for XA mode
        return new DataSourceProxyXA(druidDataSource);
    }

完整代码(下载后根据以下步骤):

下载demo:seata-xa

准备工作

  1. 执行sql/all_in_one.sql

  2. 下载最新版本的 Seata Sever

  3. 解压并启动 Seata server

unzip seata-server-xxx.zip

cd distribution
sh ./bin/seata-server.sh 8091 file
  1. 启动 AccountXA, OrderXA, StorageXA, BusinessXA 服务

测试

  • 无错误成功提交
curl http://127.0.0.1:8084/purchase

具体调用参数请结合 BusinessController 的代码。

数据初始化逻辑,参见 BusinessService#initData() 方法。

基于初始化数据,和默认的调用逻辑,purchase 将可以被成功调用 3 次。

每次账户余额扣减 3000,由最初的 10000 减少到 1000。

第 4 次调用,因为账户余额不足,purchase 调用将失败。相应的:库存、订单、账户都回滚。

XA 模式与 AT 模式

只要切换数据源代理类型,该样例即可在 XA 模式和 AT 模式之间切换。

DataSourceConfiguration

XA 模式使用 DataSourceProxyXA


public class DataSourceProxy {

    @Bean("dataSourceProxy")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxyXA for XA mode
        return new DataSourceProxyXA(druidDataSource);
    }
}

AT 模式使用 DataSourceProxy


public class DataSourceProxy {

    @Bean("dataSourceProxy")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxyXA for AT mode
        return new DataSourceProxy(druidDataSource);
    }
}

当然,AT 模式需要在数据库中建立 undo_log 表。(XA 模式是不需要这个表的)

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

saga模式

saga模式

相关文章

libuv 的Processes

        libuv提供了相当多的子进程管理函数,并且是跨平台的&#...

[react] react16跟之前的版本生命周期有哪些变化?

[react] react16跟之前的版本生命周期有哪些变化?...

图一
为了充分利用结构化信息,作者提出了一种基于三上下文的知识图嵌入模型(Triple-Context-based Knowledge Embedding (TCE))。TCE的目标是改进两个实体之间复杂关系的建模,并利用一个新的上下文,该上下文考虑了三元组周围的结构化信息。本文提出了一种新的基于图的上下文——三重上下文,它由邻居上下文和路径上下文组成。一个邻居上下文由一个实体的直接外向关系和它的邻居实体组成,这些邻居实体由外向关系连接。由于多步关系路径反映了实体对之间的语义关系,有助于处理复杂关系。在TCE中,同样的概念被选择来构建路径上下文。将这两种情境整合在一起,构建三元组上下文,从而提高TCE对三元组上下文中缺失的实体和关系的预测效率。考虑到这一目标,我们引入了一个新的分数函数来评估三元组的可能性并学习嵌入。本
作者的主要贡献总结如下:
1)不是利用一组独立的三元组,而是利用KG中三元组周围的局部结构来研究知识的表示;
2)基于三元组上下文,提出了一种新的知识图嵌入模型TCE,该模型能够很好地利用三元组上下文;
在以往的知识图谱嵌入模型中,KGs中的三元组是独立的,忽略了各三元组之间的相互关系,一个KG可以看作是一个具有图结构信息的图,具有知识推理的能力。PTransE是TransE的一个扩展模型,用于建模基于路径的表示学习模型。PTransE利用实体对之间的连接关系事实,而不是只考虑两个实体之间的关系。然而,由于PTransE仅仅将关系路径作为KG中的新关系,而忽略了图结构中包含的其他信息,因此很难为不同实体和关系的复杂上下文建模。与本文最相关的模型是GAKE。在GAKE中,它定义了三种包含不同kgs结构化信息的图形上下文,用于表示学习。GAKE的评分函数只考虑目标实体或关系与上下文的联系。因此,三元组的内部关系被忽略,结构化信息仍然丢失。这些研究表明结构化信息能够增强知识图的嵌入。
图嵌入模型和知识图嵌入模型在利用网络结构学习顶点的连续表示这一点上是相似的。然而,两种嵌入模型之间存在着巨大的差异。图嵌入模型不能直接了解实体和关系在KG中的嵌入。这是因为图嵌入模型的目标是学习的嵌入的顶点,但知识图嵌入模型需要学习两个顶点和边作为在知识图谱中实体和关系的嵌入。在图嵌入模型中,假设图中连通顶点都是相似的。然而,这个假设在知识图谱中并不成立。顶点的相似度取决于语义和局部结构。例如,在三元组(Barack_Obama,Birth_Place,Hawaii)中,头层Barack_Obama和尾层Hawaii之间就没有共同的属性。
本文旨在提出一个KG嵌入模型,并提出一个基于图的模型TCE,该模型可以通过将三元组上下文构建为图上下文来学习实体和关系的潜在表示。TCE利用三元上下文的结构化信息来获得更好的嵌入。
首先介绍了一些必要的符号。知识图K由一组三元组组成。在K中,每个三元组记为(h,r,t)。K中的所有头实体和尾实体都属于一组实体E。K中的所有关系都属于一组关系R。我们的模型旨在学习并嵌入E中的所有实体和R表示的关系在d维空间Rd中。下面简要介绍了知识图,然后定义了三重上下文。
传统的观点是,一个KG由一组三元组组成,三元组描述一组独立的事实。为了利用KG的结构化信息,我们将把KG看作一个有向图。所有实体和关系在KG分别被视为顶点和有向边。
为了利用结构化的信息,我们从一组三元组(或事实)构建了一个有向图。构造有向图的步骤如下:给定K的一组三元组,对于K中的每一个三元组(h,r,t),其中h和t分别表示头实体和尾实体。首先创建两个顶点vi和vj,这两个顶点分别对应于三元组中的h和t。然后,利用连接h和t的关系r生成有向边e。值得注意的是,将vj与vi的反向关系作为机器翻译中反向翻译的常用方法。反向关系最大的优点是可以充分利用KG的结构化信息来提高性能。在有向图G中包含了所有的实体和关系之后,上述建立图的过程就结束了。图1显示了由9个实体、10个关系和10个事实组成的知识示例。
这里解释一下反向翻译,具体请看(https://zhuanlan.zhihu.com/p/330866548)。
假如要训练一个中译英的模型:
首先使用数据集语料训练一个英译中的模型(与目标方向相反。)
使用目标语言(英语)单语语料通过英译中模型的翻译生成中文。
将生成的语料与数据集语料混合训练中英模型。
在这里插入图片描述在KGs中,一个实体的邻居上下文是它的周围环境,它是主要与实体相互作用的局部结构。具体来说,给定一个实体,e的邻居上下文是一个集合Cn(e)= {(r,t)|∀r,t,(e,r,t)∈K},其中r是e的出边(关系),t是它通过r到达的实体。换句话说,e的邻居上下文是所有以e为头的三元组中出现的关系尾对。例如,如图1所示,实体Steven Curry的邻居上下文是Cn(Steven Curry)={(国籍,美国),(born_in_city,Akron)}。在我们的模型中,实体的外观是基于它的邻居上下文来预测的,作为实体和它的邻居上下文的兼容性的度量。
路径上下文是KGs关系建模和关系之间复杂推理模式捕获的重要信息,对于每个三元组,其路径上下文由从头实体到尾实体的多步关系路径组成。设有一对实体(h, t),则他们的路径上下文是Cp (h t) ={pi" alt="论文阅读 Triple Context-Based Knowledge Graph Embedding">

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。