6. 查询处理实现技术
6. 查询处理实现技术
1. 关系数据库基本架构
三层架构模型
┌─────────────────┐
│ 计算层 │ ← 查询处理、优化、执行
│ (Query Processor)│
├─────────────────┤
│ 存储层 │ ← 数据存储、索引、缓冲区
│ (Storage Engine) │
├─────────────────┤
│ 磁盘存储 │ ← 数据文件、日志文件
│ (Disk) │
└─────────────────┘
计算层核心模块: - 查询解析器(Parser) - 查询优化器(Optimizer) - 查询执行器(Executor)
存储层核心模块: - 缓冲区管理器(Buffer Manager) - 索引管理器(Index Manager) - 事务管理器(Transaction Manager)
2. SQL查询执行过程详解
三阶段执行流程
-- 示例查询
SELECT s.name, avg(sc.grade) as avg_grade
FROM student s
JOIN sc ON s.sno = sc.sno
WHERE s.department = 'CS'
GROUP BY s.sno, s.name
HAVING avg(sc.grade) > 85;
阶段1:Interpretation(翻译)
SQL → 解析树 → 关系代数表达式
原始SQL:
SELECT ... FROM ... WHERE ...
转换为关系代数:
π_name,avg_grade(σ_avg_grade>85(
γ_sno,name;avg(grade)→avg_grade(
σ_department='CS'(student ⨝ sc)
)
))
阶段2:Query Optimization(查询优化)
-- 可能的执行计划1:先连接后过滤
(student ⨝ sc) → σ_department='CS' → γ_avg(grade) → σ_avg>85
-- 可能的执行计划2:先过滤后连接(通常更优)
σ_department='CS'(student) ⨝ sc → γ_avg(grade) → σ_avg>85
优化器考虑因素: - 表的大小和统计信息 - 可用的索引 - 连接顺序 - 操作符的实现算法
阶段3:Query Evaluation(查询执行) - 按照选定的查询计划执行 - 调用相应的操作符实现算法
3. 数据处理性能问题深度分析
存储层次性能对比
层级 访问时间 容量 特点
CPU寄存器 ~1ns KB 最快,容量最小
CPU缓存 ~10ns MB 高速缓存
主内存(DRAM) ~100ns GB 易失性存储
固态硬盘(SSD) ~100μs TB 非易失,较快
机械硬盘(HDD) ~10ms TB 非易失,慢速
I/O成本模型
# 简化的I/O成本计算
def calculate_io_cost(algorithm, table_size, buffer_size):
"""
algorithm: 使用的算法
table_size: 表的大小(页数)
buffer_size: 缓冲区大小(页数)
"""
if algorithm == "sequential_scan":
return table_size # 顺序扫描需要读取所有页
elif algorithm == "external_sort":
# 外部排序的I/O成本 ≈ 2N * log_M(N)
# N: 页数, M: 缓冲区页数
passes = math.ceil(math.log(table_size, buffer_size))
return 2 * table_size * passes
优化原则
- 减少I/O次数:主要性能瓶颈
- 利用局部性原理:顺序访问优于随机访问
- 缓冲区管理:合理利用内存减少磁盘访问
4. 选择算子(Selection)的实现策略
实现方法对比
方法1:全表扫描(Table Scan)
def table_scan(file, condition):
"""全表扫描实现"""
io_count = 0
results = []
for page in file.pages:
io_count += 1 # 读取一页
for tuple in page:
if condition(tuple): # 检查条件
results.append(tuple)
return results, io_count
方法2:索引扫描(Index Scan)
def index_scan(index, condition):
"""索引扫描实现"""
# 通过索引找到满足条件的记录指针
pointers = index.lookup(condition)
# 按页排序以减少随机I/O
sorted_pointers = sort_by_page(pointers)
# 读取数据页
results = []
current_page = None
for pointer in sorted_pointers:
if pointer.page != current_page:
io_count += 1 # 读取新页
current_page = pointer.page
page_data = read_page(current_page)
results.append(page_data[pointer.slot])
return results, io_count
选择率(Selectivity)的影响
# 选择率 = 满足条件的元组数 / 总元组数
selectivity = matching_tuples / total_tuples
if selectivity > 0.15: # 高选择率
use_table_scan()
else: # 低选择率
use_index_scan()
5. 投影算子(Projection)的实现
去重(Duplicate Elimination)算法
方法1:排序去重
def sort_based_distinct(relation, attributes):
"""基于排序的去重"""
# 外部排序
sorted_relation = external_sort(relation, attributes)
# 去重
results = []
prev_tuple = None
for tuple in sorted_relation:
key = extract_key(tuple, attributes)
if key != prev_tuple:
results.append(tuple)
prev_tuple = key
return results
def external_sort(relation, sort_attributes):
"""外部排序实现"""
# 阶段1:生成有序归并段
runs = []
buffer = []
for page in relation:
buffer.extend(page.tuples)
if len(buffer) >= BUFFER_SIZE:
# 内存排序并写入归并段
buffer.sort(key=lambda x: extract_key(x, sort_attributes))
runs.append(write_run(buffer))
buffer = []
# 阶段2:多路归并
while len(runs) > 1:
new_runs = []
for i in range(0, len(runs), MERGE_WAYS):
merged = merge_runs(runs[i:i+MERGE_WAYS], sort_attributes)
new_runs.append(merged)
runs = new_runs
return runs[0]
方法2:哈希去重
def hash_based_distinct(relation, attributes):
"""基于哈希的去重"""
hash_table = {}
for page in relation:
for tuple in page:
key = hash(extract_key(tuple, attributes))
if key not in hash_table:
hash_table[key] = tuple
return list(hash_table.values())
数据局部性(Data Locality)原理
# 好的算法:利用空间局部性
def sequential_processing(file):
"""顺序处理 - 高局部性"""
for page in file.pages: # 顺序读取
process_page(page) # 处理整页数据
def random_processing(pointers):
"""随机处理 - 低局部性"""
for pointer in pointers: # 随机读取
page = read_page(pointer.page) # 每次可能读不同页
process_tuple(page[pointer.slot])
6. 连接算子(Join)的实现
1. 嵌套循环连接(Nested Loop Join)
基础版本(简单但低效)
def nested_loop_join(outer, inner, condition):
"""嵌套循环连接 - 基础版本"""
results = []
# 遍历外表的所有页
for outer_page in outer:
# 遍历外表的每个元组
for outer_tuple in outer_page:
# 对于每个外表元组,完整扫描内表
for inner_page in inner: # 问题:内表被多次扫描!
for inner_tuple in inner_page:
if condition(outer_tuple, inner_tuple):
results.append(combine(outer_tuple, inner_tuple))
return results
# I/O成本:B(outer) + |outer| × B(inner)
# 其中:B(outer)是外表的页数,|outer|是外表的元组数
优化版本:块嵌套循环连接(Block Nested Loop Join)
def block_nested_loop_join(outer, inner, condition, buffer_size):
"""块嵌套循环连接 - 优化版本"""
results = []
# 每次读取外表的多个页到缓冲区
for outer_block in read_blocks(outer, buffer_size - 1): # 留1页给内表
# 扫描整个内表
for inner_page in inner:
# 比较外区块中的所有元组与内表页中的元组
for outer_tuple in outer_block:
for inner_tuple in inner_page:
if condition(outer_tuple, inner_tuple):
results.append(combine(outer_tuple, inner_tuple))
return results
# I/O成本:B(outer) + ⌈B(outer)/(M-1)⌉ × B(inner)
# 其中M是缓冲区页数,显著优于基础版本
2. 散列连接(Hash Join)
基础散列连接
def hash_join(outer, inner, join_attr):
"""基础散列连接 - 假设内表能完全放入内存"""
# 阶段1:构建阶段 - 扫描内表,构建哈希表
hash_table = {}
for inner_page in inner:
for inner_tuple in inner_page:
key = get_attr(inner_tuple, join_attr)
if key not in hash_table:
hash_table[key] = []
hash_table[key].append(inner_tuple)
# 阶段2:探测阶段 - 扫描外表,探测哈希表
results = []
for outer_page in outer:
for outer_tuple in outer_page:
key = get_attr(outer_tuple, join_attr)
if key in hash_table:
for inner_tuple in hash_table[key]:
results.append(combine(outer_tuple, inner_tuple))
return results
# I/O成本:B(inner) + B(outer)
# 前提:哈希表能完全放入内存
分区散列连接(Grace Hash Join)- 处理大数据集
def grace_hash_join(outer, inner, join_attr, num_partitions):
"""分区散列连接 - 处理无法放入内存的大表"""
# 阶段1:分区阶段
outer_partitions = [[] for _ in range(num_partitions)]
inner_partitions = [[] for _ in range(num_partitions)]
# 分区外表
for outer_page in outer:
for outer_tuple in outer_page:
key = get_attr(outer_tuple, join_attr)
partition_id = hash(key) % num_partitions
outer_partitions[partition_id].append(outer_tuple)
# 分区内表
for inner_page in inner:
for inner_tuple in inner_page:
key = get_attr(inner_tuple, join_attr)
partition_id = hash(key) % num_partitions
inner_partitions[partition_id].append(inner_tuple)
# 阶段2:连接阶段 - 对每个分区分别进行散列连接
results = []
for i in range(num_partitions):
# 为当前分区构建哈希表(内表分区)
partition_hash_table = {}
for inner_tuple in inner_partitions[i]:
key = get_attr(inner_tuple, join_attr)
if key not in partition_hash_table:
partition_hash_table[key] = []
partition_hash_table[key].append(inner_tuple)
# 探测外表分区
for outer_tuple in outer_partitions[i]:
key = get_attr(outer_tuple, join_attr)
if key in partition_hash_table:
for inner_tuple in partition_hash_table[key]:
results.append(combine(outer_tuple, inner_tuple))
return results
# I/O成本:3 × (B(outer) + B(inner)) (读+写+读)
3. 索引连接(Index Join)
def index_join(outer, inner, join_attr, index_on_inner):
"""索引连接 - 利用内表上的索引"""
results = []
for outer_page in outer:
for outer_tuple in outer_page:
key = get_attr(outer_tuple, join_attr)
# 使用索引查找匹配的内表元组
matching_pointers = index_on_inner.lookup(key)
# 按页排序指针以减少随机I/O
sorted_pointers = sort_pointers_by_page(matching_pointers)
# 读取匹配的内表元组
current_page = None
page_data = None
for pointer in sorted_pointers:
if pointer.page != current_page:
# 读取新页
current_page = pointer.page
page_data = read_page(current_page)
inner_tuple = page_data[pointer.slot]
results.append(combine(outer_tuple, inner_tuple))
return results
# I/O成本:B(outer) + |outer| × (索引高度 + 匹配元组数/每页元组数)
4. 归并连接(Merge Join)- 详细实现
def merge_join(outer, inner, join_attr):
"""归并连接 - 详细实现"""
# 如果输入未排序,需要先排序(这里假设已排序或已预处理)
outer_sorted = external_sort(outer, join_attr) if not is_sorted(outer) else outer
inner_sorted = external_sort(inner, join_attr) if not is_sorted(inner) else inner
outer_iter = iter(outer_sorted)
inner_iter = iter(inner_sorted)
results = []
outer_tuple = next(outer_iter, None)
inner_tuple = next(inner_iter, None)
# 用于处理重复键
inner_buffer = []
current_inner_key = None
while outer_tuple and inner_tuple:
outer_key = get_attr(outer_tuple, join_attr)
inner_key = get_attr(inner_tuple, join_attr)
if outer_key == inner_key:
# 处理内表当前键的所有元组
if inner_key != current_inner_key:
# 新的键,重新收集内表元组
inner_buffer = []
temp_inner = inner_tuple
while temp_inner and get_attr(temp_inner, join_attr) == inner_key:
inner_buffer.append(temp_inner)
temp_inner = next(inner_iter, None)
current_inner_key = inner_key
inner_tuple = temp_inner
# 处理外表当前键的所有元组
temp_outer = outer_tuple
while temp_outer and get_attr(temp_outer, join_attr) == outer_key:
# 与内表当前键的所有元组连接
for inner_match in inner_buffer:
results.append(combine(temp_outer, inner_match))
temp_outer = next(outer_iter, None)
outer_tuple = temp_outer
elif outer_key < inner_key:
outer_tuple = next(outer_iter, None)
else:
inner_tuple = next(inner_iter, None)
return results
# I/O成本:如果已排序:B(outer) + B(inner)
# 如果需要排序:排序成本 + B(outer) + B(inner)
5. 连接算法性能对比
| 算法 | 最佳适用场景 | I/O成本 | 内存需求 | 前提条件 | ||
|---|---|---|---|---|---|---|
| 嵌套循环 | 小表连接 | B(O) + | O | ×B(I) | 低 | 无 |
| 块嵌套循环 | 中等表连接 | B(O) + ⌈B(O)/(M-1)⌉×B(I) | M页缓冲区 | 无 | ||
| 散列连接 | 等值连接,内存足够 | B(O) + B(I) | 高(容纳内表) | 等值条件 | ||
| 分区散列 | 大表等值连接 | 3×(B(O)+B(I)) | 中等 | 等值条件 | ||
| 索引连接 | 内表有索引 | B(O) + | O | ×(H + C/P) | 低 | 索引存在 |
| 归并连接 | 已排序或需排序 | B(O) + B(I) + 排序成本 | 中等 | 可排序 |
6. 连接算法选择策略
def choose_join_algorithm(outer_size, inner_size, join_condition,
available_indexes, buffer_size):
"""选择最优的连接算法"""
B_outer = outer_size.pages
B_inner = inner_size.pages
# 检查是否为等值连接
is_equijoin = is_equality_condition(join_condition)
join_attr = get_join_attribute(join_condition) if is_equijoin else None
# 算法选择逻辑
if B_inner <= buffer_size - 2 and is_equijoin:
# 内表能放入内存,使用散列连接
return "Hash Join"
elif available_indexes and join_attr in available_indexes:
# 有可用索引,使用索引连接
return "Index Join"
elif is_sorted(outer, join_attr) and is_sorted(inner, join_attr):
# 表已按连接属性排序,使用归并连接
return "Merge Join"
elif B_outer * B_inner < 10000: # 小数据量
# 使用块嵌套循环连接
return "Block Nested Loop Join"
else:
# 大数据量等值连接,使用分区散列连接
return "Grace Hash Join" if is_equijoin else "Block Nested Loop Join"