跳到主要内容

数据合并与连接

在数据分析中,数据通常分散在多个表格或数据源中,需要将它们合并在一起进行分析。Pandas 提供了强大的数据合并功能,类似于 SQL 的 JOIN 操作。本章将详细介绍如何使用 Pandas 进行数据合并与连接。

合并方法概述

Pandas 提供了多种数据合并方式:

方法用途类似 SQL 操作
pd.concat()沿轴向拼接多个对象UNION
pd.merge()基于列进行数据库风格连接JOIN
df.join()基于索引进行连接JOIN ON INDEX
pd.merge_ordered()有序合并ORDERED JOIN
pd.merge_asof()近似匹配合并ASOF JOIN

使用 concat 拼接数据

concat 是最基础的合并函数,用于沿指定轴将多个 Pandas 对象拼接在一起。

基本用法

import pandas as pd
import numpy as np

# 创建示例数据
df1 = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2'],
'C': ['C0', 'C1', 'C2']
}, index=[0, 1, 2])

df2 = pd.DataFrame({
'A': ['A3', 'A4', 'A5'],
'B': ['B3', 'B4', 'B5'],
'C': ['C3', 'C4', 'C5']
}, index=[3, 4, 5])

# 纵向拼接(默认 axis=0)
result = pd.concat([df1, df2])
print(result)
# A B C
# 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2
# 3 A3 B3 C3
# 4 A4 B4 C4
# 5 A5 B5 C5

concat 接受一个列表,列表中的元素可以是 DataFrame 或 Series。拼接后的结果保留原始索引。

横向拼接

# 创建不同列的数据
df3 = pd.DataFrame({
'D': ['D0', 'D1', 'D2'],
'E': ['E0', 'E1', 'E2']
}, index=[0, 1, 2])

# 横向拼接(axis=1)
result = pd.concat([df1, df3], axis=1)
print(result)
# A B C D E
# 0 A0 B0 C0 D0 E0
# 1 A1 B1 C1 D1 E1
# 2 A2 B2 C2 D2 E2

横向拼接时,按照索引对齐数据。如果索引不完全匹配,会产生缺失值。

处理索引冲突

# 创建索引重叠的数据
df4 = pd.DataFrame({
'A': ['A3', 'A4', 'A5'],
'B': ['B3', 'B4', 'B5']
}, index=[0, 1, 2]) # 索引与 df1 重复

# 直接拼接会保留重复索引
result = pd.concat([df1, df4])
print(result.index) # Int64Index([0, 1, 2, 0, 1, 2], dtype='int64')

# 忽略原始索引,重新生成
result = pd.concat([df1, df4], ignore_index=True)
print(result.index) # RangeIndex(start=0, stop=6, step=1)

当数据源没有有意义的索引时,使用 ignore_index=True 可以避免索引冲突。

添加层级标识

当需要区分数据来源时,可以使用 keys 参数:

# 为每个数据源添加层级标识
result = pd.concat([df1, df2], keys=['第一组', '第二组'])
print(result)
# A B C
# 第一组 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2
# 第二组 3 A3 B3 C3
# 4 A4 B4 C4
# 5 A5 B5 C5

# 通过层级选择数据
print(result.loc['第一组'])
# A B C
# 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2

这会创建一个多级索引(MultiIndex),便于追踪数据来源。

处理列名不一致

df5 = pd.DataFrame({
'A': ['A0', 'A1'],
'B': ['B0', 'B1'],
'C': ['C0', 'C1']
})

df6 = pd.DataFrame({
'B': ['B2', 'B3'],
'C': ['C2', 'C3'],
'D': ['D2', 'D3']
})

# 外连接(默认):保留所有列,缺失值填充 NaN
result_outer = pd.concat([df5, df6], axis=0)
print(result_outer)
# A B C D
# 0 A0 B0 C0 NaN
# 1 A1 B1 C1 NaN
# 0 NaN B2 C2 D2
# 1 NaN B3 C3 D3

# 内连接:只保留共有列
result_inner = pd.concat([df5, df6], axis=0, join='inner')
print(result_inner)
# B C
# 0 B0 C0
# 1 B1 C1
# 0 B2 C2
# 1 B3 C3

join 参数控制如何处理不匹配的列:

  • 'outer'(默认):取并集,缺失值填充 NaN
  • 'inner':取交集,只保留共有列

拼接 Series 与 DataFrame

# 将 Series 拼接到 DataFrame
s1 = pd.Series(['X0', 'X1', 'X2'], name='X')
result = pd.concat([df1, s1], axis=1)
print(result)
# A B C X
# 0 A0 B0 C0 X0
# 1 A1 B1 C1 X1
# 2 A2 B2 C2 X2

# 未命名的 Series 会自动编号
s2 = pd.Series(['Y0', 'Y1', 'Y2'])
result = pd.concat([df1, s2], axis=1)
print(result)
# A B C 0
# 0 A0 B0 C0 Y0
# 1 A1 B1 C1 Y1
# 2 A2 B2 C2 Y2

使用 merge 进行数据库风格连接

merge 函数用于基于共同列将两个 DataFrame 连接起来,类似于 SQL 的 JOIN 操作。

基本用法

# 创建示例数据
employees = pd.DataFrame({
'emp_id': [1, 2, 3, 4],
'name': ['张三', '李四', '王五', '赵六'],
'dept_id': [10, 20, 10, 30]
})

departments = pd.DataFrame({
'dept_id': [10, 20, 40],
'dept_name': ['技术部', '市场部', '人事部']
})

# 基于 dept_id 连接
result = pd.merge(employees, departments, on='dept_id')
print(result)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 3 王五 10 技术部
# 2 2 李四 20 市场部

默认情况下,merge 执行内连接(inner join),只保留两个表中都存在的键值。

连接类型

how 参数控制连接方式:

# 内连接(默认):只保留两表共有的键
inner = pd.merge(employees, departments, on='dept_id', how='inner')
print("内连接结果:")
print(inner)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 3 王五 10 技术部
# 2 2 李四 20 市场部

# 左连接:保留左表所有行
left = pd.merge(employees, departments, on='dept_id', how='left')
print("\n左连接结果:")
print(left)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 2 李四 20 市场部
# 2 3 王五 10 技术部
# 3 4 赵六 30 NaN

# 右连接:保留右表所有行
right = pd.merge(employees, departments, on='dept_id', how='right')
print("\n右连接结果:")
print(right)
# emp_id name dept_id dept_name
# 0 1.0 张三 10 技术部
# 1 3.0 王五 10 技术部
# 2 2.0 李四 20 市场部
# 3 NaN NaN 40 人事部

# 外连接:保留两表所有行
outer = pd.merge(employees, departments, on='dept_id', how='outer')
print("\n外连接结果:")
print(outer)
# emp_id name dept_id dept_name
# 0 1.0 张三 10 技术部
# 1 3.0 王五 10 技术部
# 2 2.0 李四 20 市场部
# 3 4.0 赵六 30 NaN
# 4 NaN NaN 40 人事部

各连接类型的含义:

连接类型SQL 等价描述
innerINNER JOIN只保留两表共有的键
leftLEFT OUTER JOIN保留左表所有行
rightRIGHT OUTER JOIN保留右表所有行
outerFULL OUTER JOIN保留两表所有行
crossCROSS JOIN笛卡尔积

指定连接键

当两个表的连接列名不同时,使用 left_onright_on

orders = pd.DataFrame({
'order_id': [101, 102, 103],
'customer_no': [1, 2, 1],
'amount': [1000, 2000, 1500]
})

customers = pd.DataFrame({
'cust_id': [1, 2, 3],
'cust_name': ['客户A', '客户B', '客户C']
})

# 列名不同时指定各自的连接列
result = pd.merge(orders, customers,
left_on='customer_no',
right_on='cust_id')
print(result)
# order_id customer_no amount cust_id cust_name
# 0 101 1 1000 1 客户A
# 1 103 1 1500 1 客户A
# 2 102 2 2000 2 客户B

多列连接

可以基于多个列进行连接:

sales = pd.DataFrame({
'year': [2023, 2023, 2024, 2024],
'product': ['A', 'B', 'A', 'B'],
'sales': [100, 150, 120, 180]
})

targets = pd.DataFrame({
'year': [2023, 2023, 2024, 2024],
'product': ['A', 'B', 'A', 'B'],
'target': [90, 140, 110, 170]
})

# 基于年份和产品两个列连接
result = pd.merge(sales, targets, on=['year', 'product'])
print(result)
# year product sales target
# 0 2023 A 100 90
# 1 2023 B 150 140
# 2 2024 A 120 110
# 3 2024 B 180 170

处理重复列名

当两个表有相同名称的非连接列时,merge 会自动添加后缀:

df_a = pd.DataFrame({
'key': ['K0', 'K1'],
'value': ['A0', 'A1']
})

df_b = pd.DataFrame({
'key': ['K0', 'K1'],
'value': ['B0', 'B1']
})

# 自动添加后缀
result = pd.merge(df_a, df_b, on='key')
print(result)
# key value_x value_y
# 0 K0 A0 B0
# 1 K1 A1 B1

# 自定义后缀
result = pd.merge(df_a, df_b, on='key', suffixes=('_左表', '_右表'))
print(result)
# key value_左表 value_右表
# 0 K0 A0 B0
# 1 K1 A1 B1

基于索引连接

df_left = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])

df_right = pd.DataFrame({
'C': ['C0', 'C1', 'C2'],
'D': ['D0', 'D1', 'D2']
}, index=['K0', 'K2', 'K3'])

# 使用 left_index 和 right_index 基于索引连接
result = pd.merge(df_left, df_right,
left_index=True,
right_index=True,
how='outer')
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1
# K3 NaN NaN C2 D2

验证连接关系

validate 参数用于检查连接键的唯一性,防止意外的数据膨胀:

# 一对一连接验证
df_one = pd.DataFrame({'key': ['A', 'B'], 'value': [1, 2]})
df_one2 = pd.DataFrame({'key': ['A', 'B'], 'info': ['X', 'Y']})

# 验证是否为一对一关系
result = pd.merge(df_one, df_one2, on='key', validate='one_to_one')
print(result)
# key value info
# 0 A 1 X
# 1 B 2 Y

validate 参数的可选值:

  • 'one_to_one''1:1':两表的连接键都是唯一的
  • 'one_to_many''1:m':左表连接键唯一
  • 'many_to_one''m:1':右表连接键唯一
  • 'many_to_many''m:m':允许重复键

使用 join 方法

join 方法是基于索引进行连接的便捷方法,语法更简洁:

# 创建示例数据
df_left = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])

df_right = pd.DataFrame({
'C': ['C0', 'C1', 'C2'],
'D': ['D0', 'D1', 'D2']
}, index=['K0', 'K2', 'K3'])

# 使用 join(默认左连接)
result = df_left.join(df_right)
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1

# 指定连接方式
result = df_left.join(df_right, how='outer')
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1
# K3 NaN NaN C2 D2

连接多个 DataFrame

df_right2 = pd.DataFrame({
'E': ['E0', 'E1', 'E2']
}, index=['K0', 'K1', 'K2'])

# 一次性连接多个 DataFrame
result = df_left.join([df_right, df_right2])
print(result)
# A B C D E
# K0 A0 B0 C0 D0 E0
# K1 A1 B1 NaN NaN E1
# K2 A2 B2 C1 D1 E2

基于列连接

join 也可以基于列进行连接:

df_left = pd.DataFrame({
'key': ['K0', 'K1', 'K2'],
'A': ['A0', 'A1', 'A2']
})

df_right = pd.DataFrame({
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])

# 指定左表的连接列
result = df_left.join(df_right, on='key')
print(result)
# key A B
# 0 K0 A0 B0
# 1 K1 A1 B1
# 2 K2 A2 B2

高级合并操作

merge_ordered 有序合并

用于合并有序数据,可以填充缺失值:

# 创建有序数据
left = pd.DataFrame({
'key': ['A', 'B', 'C', 'D'],
'value_left': [1, 2, 3, 4]
})

right = pd.DataFrame({
'key': ['B', 'D', 'E'],
'value_right': [20, 40, 50]
})

# 有序合并(默认外连接)
result = pd.merge_ordered(left, right, on='key', fill_method='ffill')
print(result)
# key value_left value_right
# 0 A 1.0 NaN
# 1 B 2.0 20.0
# 2 C 3.0 20.0 # 向前填充
# 3 D 4.0 40.0
# 4 E NaN 50.0

merge_ordered 特别适合合并时间序列数据。

merge_asof 近似匹配

用于基于最近键值进行合并,常用于时间序列对齐:

# 创建时间序列数据
quotes = pd.DataFrame({
'time': pd.to_datetime([
'2024-01-01 09:00:00',
'2024-01-01 09:30:00',
'2024-01-01 10:00:00',
'2024-01-01 10:30:00'
]),
'price': [100, 101, 102, 103]
})

trades = pd.DataFrame({
'time': pd.to_datetime([
'2024-01-01 09:15:00',
'2024-01-01 09:45:00',
'2024-01-01 10:15:00'
]),
'quantity': [100, 200, 150]
})

# 查找每笔交易时刻的最近报价(向前查找)
result = pd.merge_asof(trades, quotes, on='time')
print(result)
# time quantity price
# 0 2024-01-01 09:15:00 100 100
# 1 2024-01-01 09:45:00 200 101
# 2 2024-01-01 10:15:00 150 102

# 设置时间容差
result = pd.merge_asof(trades, quotes, on='time',
tolerance=pd.Timedelta('20min'))
print(result)
# time quantity price
# 0 2024-01-01 09:15:00 100 100
# 1 2024-01-01 09:45:00 200 101
# 2 2024-01-01 10:15:00 150 102

merge_asof 的关键参数:

  • direction:查找方向,'backward'(默认)、'forward''nearest'
  • tolerance:允许的最大时间差
  • allow_exact_matches:是否允许精确匹配

实战示例

示例:电商订单分析

import pandas as pd

# 创建示例数据
orders = pd.DataFrame({
'order_id': [1, 2, 3, 4, 5],
'customer_id': [101, 102, 101, 103, 102],
'product_id': ['P1', 'P2', 'P1', 'P3', 'P2'],
'quantity': [2, 1, 3, 1, 2],
'order_date': pd.date_range('2024-01-01', periods=5)
})

customers = pd.DataFrame({
'customer_id': [101, 102, 103, 104],
'name': ['张三', '李四', '王五', '赵六'],
'city': ['北京', '上海', '广州', '深圳']
})

products = pd.DataFrame({
'product_id': ['P1', 'P2', 'P3', 'P4'],
'product_name': ['手机', '电脑', '平板', '耳机'],
'price': [5000, 8000, 3000, 500]
})

# 计算订单金额
order_details = pd.merge(orders, products, on='product_id')
order_details['total'] = order_details['quantity'] * order_details['price']

# 关联客户信息
full_data = pd.merge(order_details, customers, on='customer_id')

print("完整订单数据:")
print(full_data[['order_id', 'name', 'product_name', 'quantity', 'total']])

# 分析:各客户消费总额
customer_total = full_data.groupby('name')['total'].sum().sort_values(ascending=False)
print("\n客户消费总额:")
print(customer_total)

# 分析:各城市销售情况
city_sales = full_data.groupby('city')['total'].sum()
print("\n城市销售情况:")
print(city_sales)

示例:多数据源整合

# 模拟多个数据源
sales_2023 = pd.DataFrame({
'month': range(1, 7),
'sales': [100, 120, 130, 125, 140, 150]
})

sales_2024 = pd.DataFrame({
'month': range(1, 7),
'sales': [110, 135, 145, 140, 160, 170]
})

# 合并两年数据并标记年份
sales_2023['year'] = 2023
sales_2024['year'] = 2024

all_sales = pd.concat([sales_2023, sales_2024], ignore_index=True)

# 计算同比增长
comparison = pd.merge(
sales_2023[['month', 'sales']].rename(columns={'sales': 'sales_2023'}),
sales_2024[['month', 'sales']].rename(columns={'sales': 'sales_2024'}),
on='month'
)
comparison['growth_rate'] = (comparison['sales_2024'] - comparison['sales_2023']) / comparison['sales_2023'] * 100

print("同比增长分析:")
print(comparison)

示例:处理重复键问题

# 创建有重复键的数据
students = pd.DataFrame({
'class_id': ['A', 'A', 'B', 'B'],
'student_name': ['张三', '李四', '王五', '赵六']
})

scores = pd.DataFrame({
'class_id': ['A', 'B'],
'teacher': ['王老师', '李老师']
})

# 直接合并会产生笛卡尔积(一对多)
result = pd.merge(students, scores, on='class_id')
print("一对多合并结果:")
print(result)
# class_id student_name teacher
# 0 A 张三 王老师
# 1 A 李四 王老师
# 2 B 王五 李老师
# 3 B 赵六 李老师

小结

本章我们学习了:

  1. concat 拼接:纵向和横向拼接、处理索引冲突、添加层级标识
  2. merge 连接:内连接、左连接、右连接、外连接、交叉连接
  3. join 方法:基于索引的便捷连接方式
  4. 高级合并:有序合并、近似匹配合并
  5. 实战应用:电商订单分析、多数据源整合

选择合适的合并方法:

  • 简单拼接数据:使用 concat
  • 基于列值连接:使用 merge
  • 基于索引连接:使用 join
  • 时间序列对齐:使用 merge_asof

练习

  1. 创建两个 DataFrame,分别包含学生信息和成绩信息,使用 merge 进行连接
  2. 使用 concat 将三个月的销售数据合并为一个 DataFrame
  3. 比较左连接和内连接的区别,并用代码演示

参考资源