开发者社区> 开源小秘书> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

PolarDB-X 源码解读系列:DML 之 INSERT IGNORE 流程

简介: 本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程,其根据插入的表是否有 GSI 也有所变化。
+关注继续查看
福利推荐:阿里云、腾讯云、华为云等大品牌云产品全线2折优惠活动来袭,4核8G云服务器899元/3年,新老用户共享优惠,点击这里立即抢购>>>

作者:潜璟

上一篇源码阅读中,我们介绍了 INSERT 的执行流程。而 INSERT IGNORE 与 INSERT 不同,需要对插入值判断是否有 Unique Key 的冲突,并忽略有冲突的插入值。因此本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程,其根据插入的表是否有 GSI 也有所变化。

下推执行
如果插入的表只有一张主表,没有 GSI,那么只需要将 INSERT IGNORE 直接发送到对应的物理表上,由 DN 自行忽略存在冲突的值。在这种情况下,INSERT IGNORE 的执行过程和 INSERT 基本上相同,读者可以参考之前的源码阅读文章。

逻辑执行
而在有 GSI 的情况下,就不能简单地将 INSERT IGNORE 分别下发到主表和 GSI 对应的物理分表上,否则有可能出现主表和 GSI 数据不一致的情况。举个例子:

create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a);
insert ignore into t1 values (1,1),(1,2);

对于插入的两条记录,它们在主表上位于同一个物理表(a 相同),但是在 GSI 上位于不同的物理表(b 不相同),如果直接下发 INSERT IGNORE 的话,主表上只有 (1,1) 能够成功插入(主键冲突),而在 GSI 上 (1,1) 和 (1,2) 都能成功插入,于是 GSI 比主表多了一条数据。

针对这种情况,一种解决方案是根据插入值中的 Unique Key,先到数据库中 SELECT 出有可能冲突的数据到 CN,然后在 CN 判断冲突的值并删除。

进行 SELECT 的时候,最简单的方式就是将所有的 SELECT 直接发送到主表上,但是主表上可能没有对应的 Unique Key,这就导致 SELECT 的时候会进行全表扫描,影响性能。所以在优化器阶段,我们会根据 Unique Key 是在主表还是 GSI 上定义的来确定相应的 SELECT 需要发送到主表还是 GSI,具体代码位置:

com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTable

protected Map<String, List<List<String>>> groupUkByTable(LogicalInsertIgnore insertIgnore,
                                                             ExecutionContext executionContext) {
        // 找到每个 Unique Key 在主表和哪些 GSI 中存在
        Map<Integer, List<String>> ukAllTableMap = new HashMap<>();
        for (int i = 0; i < uniqueKeys.size(); i++) {
            List<String> uniqueKey = uniqueKeys.get(i);
            for (Map.Entry<String, Map<String, Set<String>>> e : writableTableUkMap.entrySet()) {
                String currentTableName = e.getKey().toUpperCase();
                Map<String, Set<String>> currentUniqueKeys = e.getValue();
                boolean found = false;
                for (Set<String> currentUniqueKey : currentUniqueKeys.values()) {
                    if (currentUniqueKey.size() != uniqueKey.size()) {
                        continue;
                    }
                    boolean match = currentUniqueKey.containsAll(uniqueKey);
                    if (match) {
                        found = true;
                        break;
                    }
                }
                if (found) {
                    ukAllTableMap.computeIfAbsent(i, k -> new ArrayList<>()).add(currentTableName);
                }
            }
        }

        // 确定是在哪一个表上进行 SELECT
        for (Map.Entry<Integer, List<String>> e : ukAllTableMap.entrySet()) {
            List<String> tableNames = e.getValue();

            if (tableNames.contains(primaryTableName.toUpperCase())) {
                tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k -> new ArrayList<>())
                    .add(uniqueKeys.get(e.getKey()));
            } else {
                final boolean onlyNonPublicGsi =
                    tableNames.stream().noneMatch(tn -> GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn)));

                boolean found = false;
                for (String tableName : tableNames) {
                    if (!onlyNonPublicGsi && GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) {
                        tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
                        found = true;
                        break;
                    } else if (onlyNonPublicGsi && GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) {
                        tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
                        found = true;
                        break;
                    }
                }
            }
        }

        return tableUkMap;
    }

而到了执行阶段,我们在 LogicalInsertIgnoreHandler 中处理 INSERT IGNORE。我们首先会进入 getDuplicatedValues 函数,其通过下发 SELECT 的方式查找表中已有的冲突的 Unique Key 的记录。我们将下发的 SELECT 语句中选择的列设置为 (value_index, uk_index, pk)。其中 value_index 和 uk_index 均为的常量。

举个例子,假设有表:

CREATE TABLE `t` (
    `id` int(11) NOT NULL,
    `a` int(11) NOT NULL,
    `b` int(11) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)
) DBPARTITION BY HASH(`id`)

以及一条 INSERT IGNORE 语句:

INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);

假设在 PolarDB-X 中执行时,其会将 Unique Key 编号为

0: id
1: g_i_a

INSERT IGNORE 语句中插入的每个值分别编号为

0: (1,2,3)
1: (2,3,4)
2: (3,4,5)

那么对于 (2,3,4) 的 UNIQUE KEY 构造的 GSI 上的 SELECT 即为:

查询 GSI

SELECT 1 as `value_index`, 1 as `uk_index`, `id`
FROM `g_i_a_xxxx`
WHERE `a` in 3;

假设表中已经存在 (5,3,6),那么这条 SELECT 的返回结果即为 (1,1,5)。此外,由于不同的 Unique Key 的 SELECT 返回格式是相同的,所以我们会将同一个物理库上不同的SELECT 查询 UNION 起来发送,以一次性得到多个结果,减少 CN 和 DN 之间的交互次数。只要某个 Unique Key 有重复值,我们就能根据 value_index 和 uk_index 确定是插入值的哪一行的哪个 Unique Key 是重复的。

当得到所有的返回结果之后,我们对数据进行去重。我们将上一步得到的冲突的的值放入一个 SET 中,然后顺序扫描所有的每一行插入值,如果发现有重复的就跳过该行,否则就将该行也加入到 SET 中(因为插入值之间也有可能存在相互冲突)。去重完毕之后,我们就得到了所有不存在冲突的值,将这些值插入到表中之后就完成了一条 INSERT IGNORE 的执行。

逻辑执行的执行流程:

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute

protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,
                            LogicalInsert.HandlerParams handlerParams) {
        // ...

        try {
            Map<String, List<List<String>>> ukGroupByTable = insertIgnore.getUkGroupByTable();
            List<Map<Integer, ParameterContext>> deduplicated;
            List<List<Object>> duplicateValues;
            // 获取表中已有的 Unique Key 冲突值
            duplicateValues = getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable,
                (rowCount) -> memoryAllocator.allocateReservedMemory(
                    MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true,
                handlerParams);

            final List<Map<Integer, ParameterContext>> batchParameters =
                executionContext.getParams().getBatchParameters();

            // 根据上一步得到的结果,去掉 INSERT IGNORE 中的冲突值
            deduplicated = getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(),
                insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues,
                batchParameters, executionContext);

            if (!deduplicated.isEmpty()) {
                insertEc.setParams(new Parameters(deduplicated));
            } else {
                // All duplicated
                return affectRows;
            }

            // 执行 INSERT
            try {
                if (gsiConcurrentWrite) {
                    affectRows = concurrentExecute(insertIgnore, insertEc);
                } else {
                    affectRows = sequentialExecute(insertIgnore, insertEc);
                }
            } catch (Throwable e) {
                handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters()));
            }
        } finally {
            selectValuesPool.destroy();
        }
        return affectRows;
    }

RETURNING 优化
上一节提到的 INSERT IGNORE 的逻辑执行方式,虽然保证了数据的正确性,但是也使得一条 INSERT IGNORE 语句至少需要 CN 和 DN 的两次交互才能完成(第一次 SELECT,第二次 INSERT),影响了 INSERT IGNORE 的执行性能。

目前的 DN 已经支持了 AliSQL 的 RETURNING 优化,其可以在 DN 的 INSERT IGNORE 执行完毕之后返回成功插入的值。利用这一功能,PolarDB-X 对 INSERT IGNORE 进行了进一步的优化:直接将 INSERT IGNORE 下发,如果在主表和 GSI 上全部成功返回,那么就说明插入值中没有冲突,于是就成功完成该条 INSERT IGNORE 的执行;否则就将多插入的值删除。

执行时,CN 首先会根据上文中的语法下发带有 RETURNING 的物理 INSERT IGNORE 语句到 DN,比如:

call dbms_trans.returning("a", "insert into t1_xxxx values(1,1)");

其中返回列是主键,用来标识插入的一批数据中哪些被成功插入了;t1_xxxx 是逻辑表 t1 的一个物理分表。当主表和 GSI 上的所有 INSERT IGNORE 执行完毕之后,我们计算主表和 GSI 中成功插入值的交集作为最后的结果,然后删除多插入的值。这部分代码在

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemoved

private Map<String, List<List<Object>>> getRowsToBeRemoved(String tableName,
                                                               Map<String, List<List<Object>>> tableInsertedValues,
                                                               List<Integer> beforePkMapping,
                                                               List<ColumnMeta> pkColumnMetas) {
        final Map<String, Set<GroupKey>> tableInsertedPks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        final Map<String, List<Pair<GroupKey, List<Object>>>> tablePkRows =
            new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        tableInsertedValues.forEach((tn, insertedValues) -> {
            final Set<GroupKey> insertedPks = new TreeSet<>();
            final List<Pair<GroupKey, List<Object>>> pkRows = new ArrayList<>();
            for (List<Object> inserted : insertedValues) {
                final Object[] groupKeys = beforePkMapping.stream().map(inserted::get).toArray();
                final GroupKey pk = new GroupKey(groupKeys, pkColumnMetas);
                insertedPks.add(pk);
                pkRows.add(Pair.of(pk, inserted));
            }
            tableInsertedPks.put(tn, insertedPks);
            tablePkRows.put(tn, pkRows);
        });

        // Get intersect of inserted values
        final Set<GroupKey> distinctPks = new TreeSet<>();
        for (GroupKey pk : tableInsertedPks.get(tableName)) {
            if (tableInsertedPks.values().stream().allMatch(pks -> pks.contains(pk))) {
                distinctPks.add(pk);
            }
        }

        // Remove values which not exists in at least one insert results
        final Map<String, List<List<Object>>> tableDeletePks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        tablePkRows.forEach((tn, pkRows) -> {
            final List<List<Object>> deletePks = new ArrayList<>();
            pkRows.forEach(pkRow -> {
                if (!distinctPks.contains(pkRow.getKey())) {
                    deletePks.add(pkRow.getValue());
                }
            });
            if (!deletePks.isEmpty()) {
                tableDeletePks.put(tn, deletePks);
            }
        });
        return tableDeletePks;
    }

与上一节的逻辑执行的“悲观执行”相比,使用 RETURNING 优化的 INSERT IGNORE 相当于“乐观执行”,如果插入的值本身没有冲突,那么一条 INSERT IGNORE 语句 CN 和 DN 间只需要一次交互即可;而在有冲突的情况下,我们需要下发 DELETE 语句将主表或 GSI 中多插入的值删除,于是 CN 和 DN 间需要两次交互。可以看出,即便是有冲突的情况,CN 和 DN 间的交互次数也不会超过上一节的逻辑执行。因此在无法直接下推的情况下,INSERT IGNORE 的执行策略是默认使用 RETURNING 优化执行。

当然 RETURNING 优化的使用也有一些限制,比如插入的 Value 有重复主键时就不能使用,因为这种情况下无法判断具体是哪一行被成功插入,哪一行需要删除;具体可以阅读代码中的条件判断。当不能使用 RETURNING 优化时,系统会自动选择上一节中的逻辑执行方式执行该条 INSERT IGNORE 语句以保证数据的正确性。

使用 RETURNING 优化的执行流程:

com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute

protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,
                            LogicalInsert.HandlerParams handlerParams) {
        // ...

        // 判断能否使用 RETURNING 优化
        boolean canUseReturning =
            executorContext.getStorageInfoManager().supportsReturning() && executionContext.getParamManager()
                .getBoolean(ConnectionParams.DML_USE_RETURNING) && allDnUseXDataSource && gsiCanUseReturning
                && !isBroadcast && !ComplexTaskPlanUtils.canWrite(tableMeta);

        if (canUseReturning) {
            canUseReturning = noDuplicateValues(insertIgnore, insertEc);
        }

        if (canUseReturning) {
            // 执行 INSERT IGNORE 并获得返回结果
            final List<RelNode> allPhyPlan =
                new ArrayList<>(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams));
            getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan);
            final Map<String, List<List<Object>>> tableInsertedValues =
                executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator,
                    selectRowType);

            // ...

            // 生成 DELETE
            final boolean removeAllInserted =
                targetTableNames.stream().anyMatch(tn -> !tableInsertedValues.containsKey(tn));

            if (removeAllInserted) {
                affectedRows -=
                    removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues);
                if (returnIgnored) {
                    ignoredRows = totalRows;
                }
            } else {
                final List<Integer> beforePkMapping = insertIgnore.getBeforePkMapping();
                final List<ColumnMeta> pkColumnMetas = insertIgnore.getPkColumnMetas();

                // 计算所有插入值的交集
                final Map<String, List<List<Object>>> tableDeletePks =
                    getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas);

                affectedRows -=
                    removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks);
                if (returnIgnored) {
                    ignoredRows +=
                        Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTableName())).map(List::size)
                            .orElse(0);
                }

            }

            handlerParams.optimizedWithReturning = true;

            if (returnIgnored) {
                return ignoredRows;
            } else {
                return affectedRows;
            }
        } else {
            handlerParams.optimizedWithReturning = false;
        }

        // ... 
    }

最后以一个例子来展现 RETURNING 优化的执行流程与逻辑执行的不同。通过 /+TDDL:CMD_EXTRA(DML_USE_RETURNING=TRUE)/ 这条 HINT,用户可以手动控制是否使用 RETURNING 优化。

首先建表并插入一条数据:

CREATE TABLE `t` (
    `id` int(11) NOT NULL,
    `a` int(11) NOT NULL,
    `b` int(11) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)
) DBPARTITION BY HASH(`id`);

INSERT INTO t VALUES (1,3,3);

再执行一条 INSERT IGNORE:

INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);

其中 (1,2,3) 与 (1,3,3) 主键冲突,(2,3,4) 与 (1,3,3) 对于 Unique Key g_i_a 冲突。如果是 RETURNING 优化:

1.jpeg
可以看到 PolarDB-X 先进行了 INSERT IGNORE,再将多插入的数据删除:(1,2,3) 在主表上冲突在 UGSI 上成功插入,(2,3,4) 在 UGSI 上冲突在主表上成功插入,因此分别下发对应的 DELETE 到 UGSI 和主表上。

如果关闭 RETURNING 优化,逻辑执行:
2.jpg
可以看到 PolarDB-X 先进行了 SELECT,再将没有冲突的数据 (3,4,5) 插入。

小结
本文介绍了 PolarDB-X 中 INSERT IGNORE 的执行流程。除了 INSERT IGNORE 之外,还有一些 DML 语句在执行时也需要进行重复值的判断,比如 REPLACE、INSERT ON DUPLICATE KEY UPDATE 等,这些语句在有 GSI 的情况下均采用了逻辑执行的方式,即先进行 SELECT 再进行判重、更新等操作,感兴趣的读者可以自行阅读相关代码。

欢迎关注PolarDB-X知乎机构号,阅读更多技术好文。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
PolarDB-X源码解读:DDL的一生(下)
在《DDL的一生(上)》中,我们以添加全局二级索引为例,从DDL开发者的视角介绍了如何在DDL引擎框架下实现一个逻辑DDL。在本篇,作者将从DDL引擎的视角出发,向读者介绍DDL引擎的架构、实现,以及DDL引擎与DDL Job的交互逻辑。
0 0
PolarDB-X源码解读系列:DML之Insert流程
Insert类的SQL语句的流程可初略分为:解析、校验、优化器、执行器、物理执行(GalaxyEngine执行)。本文将以一条简单的Insert语句通过调试的方式进行解读。
0 0
PolarDB-X 源码解读:事务的一生
本文将主要解读 PolarDB-X 中事务部分的相关代码,着重解读事务的一生在计算节点(CN)中的关键代码:从开始、执行、到最后提交这一整个生命周期。
0 0
PolarDB-X 源码解读系列:Global Binlog的一生
本篇为PolarDB-X“全局Binlog解读”系列文章的代码解读篇,将对binlog的产生,以及如何通过系统处理并最终生成Global Binlog的过程进行分析。
0 0
PolarDB-X源码解读系列:DDL的一生(上)
本文主要解读了 PolarDB-X 中 CN 端的DDL Job定义相关的代码,以添加全局二级索引为例,对DDL Job定义和执行的整体流程进行了梳理,并着重阐述了DDL job定义中涉及online和crash safe特性的关键逻辑。
0 0
PolarDB-X 源码解读:DDL的一生(上)
一条SQL语句进入PolarDB-X的CN后,将经历协议层、优化器、执行器的完整处理流程。首先经过解析、鉴权、校验,被解析为关系代数树后,在优化器中经历RBO和CBO生成执行计划,最终在DN上执行完成。与DML不同的是,逻辑DDL语句还涉及对元数据的读写和物理DDL,直接影响系统状态一致性。
0 0
PolarDB-X迎来开源后首个重大版本升级,2.1版本新增5大特色功能
2022 年 5 月25日,阿里云开源 PolarDB-X 升级发布新版本!PolarDB-X 从 2009 年开始服务于阿里巴巴电商核心系统, 2015 年开始对外提供商业化服务,并于 2021 年10月正式开源。本次发布是开源后首个重大版本升级,重磅推出在稳定性、生态融合以及易用性上有了长足进步的 2.1 版本,该版本在内核能力上首次对齐商业版,新增 X-Paxos、自动分区、OSS 冷热数据分离等诸多重要特性,并在 MySQL 生态融合、K8S 生态融合方向持续迭代。
0 0
PolarDB-X 发布 2.1.0 版本,Paxos 重磅开源
2022年4月1号,PolarDB-X 正式开源X-Paxos,基于原生MySQL存储节点,提供Paxos三副本共识协议,可以做到金融级数据库的高可用和容灾能力,做到RPO=0的生产级别可用性,可以满足同城三机房、两地三中心等容灾架构。
0 0
每周问答精选:PolarDB-X 存储资源包是抵扣哪个版本的按量付费?
为了方便各位小伙伴能够方便、快速的了解到 PolarDB 开源数据库的相关的使用问题,社区每周将精选群内高质量的问题通过该栏目予以统一答复,希望能够对大家有所帮助。
0 0
PolarDB-X 1.0-用户指南-实例管理-升级版本
您可以升级更新PolarDB-X 1.0实例到最新版本,快速体验实例新特性,本文将介绍如何在控制台上升级PolarDB-X 1.0实例版本。
0 0
+关注
开源小秘书
开源数据库运营小编~
文章
问答
文章排行榜
最热
最新
相关电子书
更多
PolarDB-X 2.0核心技术能力解读
立即下载
PolarDB-X 基于 X-Paxos 一致性协议的高可用
立即下载
PolarDB-X (开源版)从入门到实战
立即下载


http://www.vxiaotou.com