跳转至

6. 查询处理实现技术

6. 查询处理实现技术

1. 关系数据库基本架构

三层架构模型

┌─────────────────┐
│   计算层        │ ← 查询处理、优化、执行
│  (Query Processor)│
├─────────────────┤
│   存储层        │ ← 数据存储、索引、缓冲区
│  (Storage Engine) │
├─────────────────┤
│   磁盘存储      │ ← 数据文件、日志文件
│   (Disk)        │
└─────────────────┘

计算层核心模块: - 查询解析器(Parser) - 查询优化器(Optimizer) - 查询执行器(Executor)

存储层核心模块: - 缓冲区管理器(Buffer Manager) - 索引管理器(Index Manager) - 事务管理器(Transaction Manager)

2. SQL查询执行过程详解

三阶段执行流程

-- 示例查询
SELECT s.name, avg(sc.grade) as avg_grade
FROM student s 
JOIN sc ON s.sno = sc.sno
WHERE s.department = 'CS'
GROUP BY s.sno, s.name
HAVING avg(sc.grade) > 85;

阶段1:Interpretation(翻译)

SQL → 解析树 → 关系代数表达式

原始SQL:
SELECT ... FROM ... WHERE ...

转换为关系代数:
π_name,avg_grade(σ_avg_grade>85( 
    γ_sno,name;avg(grade)→avg_grade(
        σ_department='CS'(student ⨝ sc)
    )
))

阶段2:Query Optimization(查询优化)

-- 可能的执行计划1:先连接后过滤
(student  sc)  σ_department='CS'  γ_avg(grade)  σ_avg>85

-- 可能的执行计划2:先过滤后连接(通常更优)
σ_department='CS'(student)  sc  γ_avg(grade)  σ_avg>85

优化器考虑因素: - 表的大小和统计信息 - 可用的索引 - 连接顺序 - 操作符的实现算法

阶段3:Query Evaluation(查询执行) - 按照选定的查询计划执行 - 调用相应的操作符实现算法

3. 数据处理性能问题深度分析

存储层次性能对比

层级       访问时间     容量       特点
CPU寄存器   ~1ns        KB        最快,容量最小
CPU缓存     ~10ns       MB        高速缓存
主内存(DRAM) ~100ns      GB        易失性存储
固态硬盘(SSD) ~100μs     TB        非易失,较快
机械硬盘(HDD) ~10ms      TB        非易失,慢速

I/O成本模型

# 简化的I/O成本计算
def calculate_io_cost(algorithm, table_size, buffer_size):
    """
    algorithm: 使用的算法
    table_size: 表的大小(页数)
    buffer_size: 缓冲区大小(页数)
    """
    if algorithm == "sequential_scan":
        return table_size  # 顺序扫描需要读取所有页

    elif algorithm == "external_sort":
        # 外部排序的I/O成本 ≈ 2N * log_M(N)
        # N: 页数, M: 缓冲区页数
        passes = math.ceil(math.log(table_size, buffer_size))
        return 2 * table_size * passes

优化原则

  1. 减少I/O次数:主要性能瓶颈
  2. 利用局部性原理:顺序访问优于随机访问
  3. 缓冲区管理:合理利用内存减少磁盘访问

4. 选择算子(Selection)的实现策略

实现方法对比

-- 查询:找出计算机系的学生
SELECT * FROM student WHERE dept = 'CS';

方法1:全表扫描(Table Scan)

def table_scan(file, condition):
    """全表扫描实现"""
    io_count = 0
    results = []

    for page in file.pages:
        io_count += 1  # 读取一页
        for tuple in page:
            if condition(tuple):  # 检查条件
                results.append(tuple)

    return results, io_count
- 适用场景:选择率高(>15%),无可用索引 - I/O成本:B(表的总页数)

方法2:索引扫描(Index Scan)

def index_scan(index, condition):
    """索引扫描实现"""
    # 通过索引找到满足条件的记录指针
    pointers = index.lookup(condition)

    # 按页排序以减少随机I/O
    sorted_pointers = sort_by_page(pointers)

    # 读取数据页
    results = []
    current_page = None

    for pointer in sorted_pointers:
        if pointer.page != current_page:
            io_count += 1  # 读取新页
            current_page = pointer.page
            page_data = read_page(current_page)

        results.append(page_data[pointer.slot])

    return results, io_count
- 适用场景:选择率低,有合适的索引 - I/O成本:索引高度 + 结果集页数

选择率(Selectivity)的影响

# 选择率 = 满足条件的元组数 / 总元组数
selectivity = matching_tuples / total_tuples

if selectivity > 0.15:  # 高选择率
    use_table_scan()
else:                   # 低选择率  
    use_index_scan()

5. 投影算子(Projection)的实现

去重(Duplicate Elimination)算法

方法1:排序去重

def sort_based_distinct(relation, attributes):
    """基于排序的去重"""
    # 外部排序
    sorted_relation = external_sort(relation, attributes)

    # 去重
    results = []
    prev_tuple = None

    for tuple in sorted_relation:
        key = extract_key(tuple, attributes)
        if key != prev_tuple:
            results.append(tuple)
            prev_tuple = key

    return results

def external_sort(relation, sort_attributes):
    """外部排序实现"""
    # 阶段1:生成有序归并段
    runs = []
    buffer = []

    for page in relation:
        buffer.extend(page.tuples)
        if len(buffer) >= BUFFER_SIZE:
            # 内存排序并写入归并段
            buffer.sort(key=lambda x: extract_key(x, sort_attributes))
            runs.append(write_run(buffer))
            buffer = []

    # 阶段2:多路归并
    while len(runs) > 1:
        new_runs = []
        for i in range(0, len(runs), MERGE_WAYS):
            merged = merge_runs(runs[i:i+MERGE_WAYS], sort_attributes)
            new_runs.append(merged)
        runs = new_runs

    return runs[0]

方法2:哈希去重

def hash_based_distinct(relation, attributes):
    """基于哈希的去重"""
    hash_table = {}

    for page in relation:
        for tuple in page:
            key = hash(extract_key(tuple, attributes))
            if key not in hash_table:
                hash_table[key] = tuple

    return list(hash_table.values())

数据局部性(Data Locality)原理

# 好的算法:利用空间局部性
def sequential_processing(file):
    """顺序处理 - 高局部性"""
    for page in file.pages:  # 顺序读取
        process_page(page)   # 处理整页数据

def random_processing(pointers):
    """随机处理 - 低局部性"""  
    for pointer in pointers:  # 随机读取
        page = read_page(pointer.page)  # 每次可能读不同页
        process_tuple(page[pointer.slot])

6. 连接算子(Join)的实现

1. 嵌套循环连接(Nested Loop Join)

基础版本(简单但低效)
def nested_loop_join(outer, inner, condition):
    """嵌套循环连接 - 基础版本"""
    results = []

    # 遍历外表的所有页
    for outer_page in outer:
        # 遍历外表的每个元组
        for outer_tuple in outer_page:
            # 对于每个外表元组,完整扫描内表
            for inner_page in inner:  # 问题:内表被多次扫描!
                for inner_tuple in inner_page:
                    if condition(outer_tuple, inner_tuple):
                        results.append(combine(outer_tuple, inner_tuple))

    return results

# I/O成本:B(outer) + |outer| × B(inner)
# 其中:B(outer)是外表的页数,|outer|是外表的元组数
优化版本:块嵌套循环连接(Block Nested Loop Join)
def block_nested_loop_join(outer, inner, condition, buffer_size):
    """块嵌套循环连接 - 优化版本"""
    results = []

    # 每次读取外表的多个页到缓冲区
    for outer_block in read_blocks(outer, buffer_size - 1):  # 留1页给内表
        # 扫描整个内表
        for inner_page in inner:
            # 比较外区块中的所有元组与内表页中的元组
            for outer_tuple in outer_block:
                for inner_tuple in inner_page:
                    if condition(outer_tuple, inner_tuple):
                        results.append(combine(outer_tuple, inner_tuple))

    return results

# I/O成本:B(outer) + ⌈B(outer)/(M-1)⌉ × B(inner)
# 其中M是缓冲区页数,显著优于基础版本

2. 散列连接(Hash Join)

基础散列连接
def hash_join(outer, inner, join_attr):
    """基础散列连接 - 假设内表能完全放入内存"""
    # 阶段1:构建阶段 - 扫描内表,构建哈希表
    hash_table = {}

    for inner_page in inner:
        for inner_tuple in inner_page:
            key = get_attr(inner_tuple, join_attr)
            if key not in hash_table:
                hash_table[key] = []
            hash_table[key].append(inner_tuple)

    # 阶段2:探测阶段 - 扫描外表,探测哈希表
    results = []

    for outer_page in outer:
        for outer_tuple in outer_page:
            key = get_attr(outer_tuple, join_attr)
            if key in hash_table:
                for inner_tuple in hash_table[key]:
                    results.append(combine(outer_tuple, inner_tuple))

    return results

# I/O成本:B(inner) + B(outer)
# 前提:哈希表能完全放入内存
分区散列连接(Grace Hash Join)- 处理大数据集
def grace_hash_join(outer, inner, join_attr, num_partitions):
    """分区散列连接 - 处理无法放入内存的大表"""

    # 阶段1:分区阶段
    outer_partitions = [[] for _ in range(num_partitions)]
    inner_partitions = [[] for _ in range(num_partitions)]

    # 分区外表
    for outer_page in outer:
        for outer_tuple in outer_page:
            key = get_attr(outer_tuple, join_attr)
            partition_id = hash(key) % num_partitions
            outer_partitions[partition_id].append(outer_tuple)

    # 分区内表
    for inner_page in inner:
        for inner_tuple in inner_page:
            key = get_attr(inner_tuple, join_attr)
            partition_id = hash(key) % num_partitions
            inner_partitions[partition_id].append(inner_tuple)

    # 阶段2:连接阶段 - 对每个分区分别进行散列连接
    results = []

    for i in range(num_partitions):
        # 为当前分区构建哈希表(内表分区)
        partition_hash_table = {}
        for inner_tuple in inner_partitions[i]:
            key = get_attr(inner_tuple, join_attr)
            if key not in partition_hash_table:
                partition_hash_table[key] = []
            partition_hash_table[key].append(inner_tuple)

        # 探测外表分区
        for outer_tuple in outer_partitions[i]:
            key = get_attr(outer_tuple, join_attr)
            if key in partition_hash_table:
                for inner_tuple in partition_hash_table[key]:
                    results.append(combine(outer_tuple, inner_tuple))

    return results

# I/O成本:3 × (B(outer) + B(inner))  (读+写+读)

3. 索引连接(Index Join)

def index_join(outer, inner, join_attr, index_on_inner):
    """索引连接 - 利用内表上的索引"""
    results = []

    for outer_page in outer:
        for outer_tuple in outer_page:
            key = get_attr(outer_tuple, join_attr)

            # 使用索引查找匹配的内表元组
            matching_pointers = index_on_inner.lookup(key)

            # 按页排序指针以减少随机I/O
            sorted_pointers = sort_pointers_by_page(matching_pointers)

            # 读取匹配的内表元组
            current_page = None
            page_data = None

            for pointer in sorted_pointers:
                if pointer.page != current_page:
                    # 读取新页
                    current_page = pointer.page
                    page_data = read_page(current_page)

                inner_tuple = page_data[pointer.slot]
                results.append(combine(outer_tuple, inner_tuple))

    return results

# I/O成本:B(outer) + |outer| × (索引高度 + 匹配元组数/每页元组数)

4. 归并连接(Merge Join)- 详细实现

def merge_join(outer, inner, join_attr):
    """归并连接 - 详细实现"""
    # 如果输入未排序,需要先排序(这里假设已排序或已预处理)
    outer_sorted = external_sort(outer, join_attr) if not is_sorted(outer) else outer
    inner_sorted = external_sort(inner, join_attr) if not is_sorted(inner) else inner

    outer_iter = iter(outer_sorted)
    inner_iter = iter(inner_sorted)

    results = []
    outer_tuple = next(outer_iter, None)
    inner_tuple = next(inner_iter, None)

    # 用于处理重复键
    inner_buffer = []
    current_inner_key = None

    while outer_tuple and inner_tuple:
        outer_key = get_attr(outer_tuple, join_attr)
        inner_key = get_attr(inner_tuple, join_attr)

        if outer_key == inner_key:
            # 处理内表当前键的所有元组
            if inner_key != current_inner_key:
                # 新的键,重新收集内表元组
                inner_buffer = []
                temp_inner = inner_tuple
                while temp_inner and get_attr(temp_inner, join_attr) == inner_key:
                    inner_buffer.append(temp_inner)
                    temp_inner = next(inner_iter, None)
                current_inner_key = inner_key
                inner_tuple = temp_inner

            # 处理外表当前键的所有元组
            temp_outer = outer_tuple
            while temp_outer and get_attr(temp_outer, join_attr) == outer_key:
                # 与内表当前键的所有元组连接
                for inner_match in inner_buffer:
                    results.append(combine(temp_outer, inner_match))
                temp_outer = next(outer_iter, None)

            outer_tuple = temp_outer

        elif outer_key < inner_key:
            outer_tuple = next(outer_iter, None)
        else:
            inner_tuple = next(inner_iter, None)

    return results

# I/O成本:如果已排序:B(outer) + B(inner)
#         如果需要排序:排序成本 + B(outer) + B(inner)

5. 连接算法性能对比

算法 最佳适用场景 I/O成本 内存需求 前提条件
嵌套循环 小表连接 B(O) + O ×B(I)
块嵌套循环 中等表连接 B(O) + ⌈B(O)/(M-1)⌉×B(I) M页缓冲区
散列连接 等值连接,内存足够 B(O) + B(I) 高(容纳内表) 等值条件
分区散列 大表等值连接 3×(B(O)+B(I)) 中等 等值条件
索引连接 内表有索引 B(O) + O ×(H + C/P) 索引存在
归并连接 已排序或需排序 B(O) + B(I) + 排序成本 中等 可排序

6. 连接算法选择策略

def choose_join_algorithm(outer_size, inner_size, join_condition, 
                         available_indexes, buffer_size):
    """选择最优的连接算法"""

    B_outer = outer_size.pages
    B_inner = inner_size.pages

    # 检查是否为等值连接
    is_equijoin = is_equality_condition(join_condition)
    join_attr = get_join_attribute(join_condition) if is_equijoin else None

    # 算法选择逻辑
    if B_inner <= buffer_size - 2 and is_equijoin:
        # 内表能放入内存,使用散列连接
        return "Hash Join"

    elif available_indexes and join_attr in available_indexes:
        # 有可用索引,使用索引连接
        return "Index Join"

    elif is_sorted(outer, join_attr) and is_sorted(inner, join_attr):
        # 表已按连接属性排序,使用归并连接
        return "Merge Join"

    elif B_outer * B_inner < 10000:  # 小数据量
        # 使用块嵌套循环连接
        return "Block Nested Loop Join"

    else:
        # 大数据量等值连接,使用分区散列连接
        return "Grace Hash Join" if is_equijoin else "Block Nested Loop Join"