跳到主要内容

实战案例

本章通过完整的实战案例,综合运用前面学到的 Pandas 知识,解决实际数据分析问题。

案例 1:电商销售数据分析

场景描述

分析电商平台的销售数据,了解销售趋势、用户行为和产品表现。

import pandas as pd
import numpy as np

# 设置随机种子以便复现
np.random.seed(42)

# 生成模拟数据
n_orders = 10000

# 订单数据
orders = pd.DataFrame({
'order_id': range(1, n_orders + 1),
'user_id': np.random.randint(1, 1001, n_orders),
'order_date': pd.date_range('2023-01-01', periods=n_orders, freq='H'),
'product_id': np.random.randint(1, 101, n_orders),
'quantity': np.random.randint(1, 10, n_orders),
'unit_price': np.random.uniform(50, 500, n_orders),
'payment_method': np.random.choice(['支付宝', '微信', '信用卡', '银行转账'], n_orders),
'status': np.random.choice(['已完成', '已取消', '退款'], n_orders, p=[0.85, 0.10, 0.05])
})

# 计算订单金额
orders['amount'] = orders['quantity'] * orders['unit_price']

# 添加时间特征
orders['year'] = orders['order_date'].dt.year
orders['month'] = orders['order_date'].dt.month
orders['day'] = orders['order_date'].dt.day
orders['hour'] = orders['order_date'].dt.hour
orders['weekday'] = orders['order_date'].dt.dayofweek
orders['is_weekend'] = orders['weekday'].isin([5, 6])

print("订单数据概览:")
print(orders.head())
print(f"\n数据量: {len(orders)} 条")
print(f"时间范围: {orders['order_date'].min()}{orders['order_date'].max()}")

分析 1:销售趋势分析

# 按月统计销售情况
monthly_sales = orders.groupby(orders['order_date'].dt.to_period('M')).agg({
'order_id': 'count',
'amount': 'sum',
'user_id': 'nunique'
}).rename(columns={
'order_id': '订单数',
'amount': '销售额',
'user_id': '活跃用户数'
})

print("月度销售统计:")
print(monthly_sales)

# 日销售趋势
daily_sales = orders.set_index('order_date').resample('D').agg({
'order_id': 'count',
'amount': 'sum'
})

# 计算移动平均平滑数据
daily_sales['amount_ma7'] = daily_sales['amount'].rolling(7).mean()
daily_sales['amount_ma30'] = daily_sales['amount'].rolling(30).mean()

# 小时分布分析
hourly_dist = orders.groupby('hour').agg({
'order_id': 'count',
'amount': 'sum'
})

print("\n小时分布(订单最多的前5个小时):")
print(hourly_dist.nlargest(5, 'order_id'))

分析 2:用户行为分析

# 用户消费统计
user_stats = orders.groupby('user_id').agg({
'order_id': 'count',
'amount': ['sum', 'mean'],
'order_date': ['min', 'max']
})
user_stats.columns = ['订单数', '总消费', '平均订单金额', '首次购买', '最近购买']

# 计算用户生命周期
user_stats['生命周期天数'] = (user_stats['最近购买'] - user_stats['首次购买']).dt.days

# 用户分层(RFM 模型)
# R: 最近一次购买距今天数
# F: 购买频率
# M: 消费金额
latest_date = orders['order_date'].max()
user_rfm = orders.groupby('user_id').agg({
'order_date': lambda x: (latest_date - x.max()).days, # Recency
'order_id': 'count', # Frequency
'amount': 'sum' # Monetary
})
user_rfm.columns = ['R', 'F', 'M']

# 用户分层
user_rfm['R_score'] = pd.qcut(user_rfm['R'], q=5, labels=[5, 4, 3, 2, 1]) # 越近越好
user_rfm['F_score'] = pd.qcut(user_rfm['F'], q=5, labels=[1, 2, 3, 4, 5], duplicates='drop')
user_rfm['M_score'] = pd.qcut(user_rfm['M'], q=5, labels=[1, 2, 3, 4, 5], duplicates='drop')

user_rfm['RFM_score'] = user_rfm['R_score'].astype(str) + user_rfm['F_score'].astype(str) + user_rfm['M_score'].astype(str)

def classify_user(score):
"""根据 RFM 分数分类用户"""
r, f, m = int(score[0]), int(score[1]), int(score[2])
if r >= 4 and f >= 4 and m >= 4:
return '重要价值客户'
elif r >= 4 and f < 4 and m >= 4:
return '重要发展客户'
elif r < 4 and f >= 4 and m >= 4:
return '重要保持客户'
elif r < 4 and f < 4 and m >= 4:
return '重要挽留客户'
else:
return '一般客户'

user_rfm['用户类型'] = user_rfm['RFM_score'].apply(classify_user)

print("用户分层统计:")
print(user_rfm['用户类型'].value_counts())

分析 3:产品分析

# 产品销售统计
product_stats = orders.groupby('product_id').agg({
'order_id': 'count',
'quantity': 'sum',
'amount': 'sum'
}).rename(columns={
'order_id': '订单数',
'quantity': '销售量',
'amount': '销售额'
})

# 产品分类(ABC 分类法)
product_stats['销售额占比'] = product_stats['销售额'] / product_stats['销售额'].sum()
product_stats = product_stats.sort_values('销售额', ascending=False)
product_stats['累计占比'] = product_stats['销售额占比'].cumsum()

def abc_classify(cumulative_ratio):
if cumulative_ratio <= 0.8:
return 'A'
elif cumulative_ratio <= 0.95:
return 'B'
else:
return 'C'

product_stats['ABC分类'] = product_stats['累计占比'].apply(abc_classify)

print("产品ABC分类统计:")
print(product_stats.groupby('ABC分类').agg({
'订单数': 'sum',
'销售额': 'sum'
}))

# 畅销产品 TOP 10
print("\n销售额 TOP 10 产品:")
print(product_stats.nlargest(10, '销售额')[['订单数', '销售量', '销售额']])

案例 2:数据清洗与预处理

场景描述

处理从多个来源收集的脏数据,进行清洗、转换和整合。

# 模拟脏数据
np.random.seed(42)

# 客户数据(包含各种问题)
customers = pd.DataFrame({
'customer_id': ['C001', 'C002', 'C003', 'C004', 'C005', 'C001', 'C006', 'C007'],
'name': ['张三', '李 四', '王五 ', ' 赵六', '钱七', '张三', '孙八', None],
'phone': ['13812345678', '139-5678-9012', '136 1234 5678', '1581234567',
'13812345678', '13812345678', '18600001111', '17799998888'],
'email': ['[email protected]', '[email protected]', '[email protected] ',
'invalid-email', '[email protected]', '[email protected]', '[email protected]', '[email protected]'],
'age': [25, -5, 150, 30, None, 25, 45, 35],
'register_date': ['2023-01-15', '2023/02/20', '15-03-2023', '2023-04-01',
'2023-05-01', '2023-01-15', '2023-06-01', None]
})

print("原始数据:")
print(customers)

清洗步骤

# 1. 处理重复数据
print(f"去重前: {len(customers)} 条")
customers = customers.drop_duplicates(subset=['customer_id'], keep='first')
print(f"去重后: {len(customers)} 条")

# 2. 清理字符串空白
customers['name'] = customers['name'].str.strip()
customers['email'] = customers['email'].str.strip()

# 3. 标准化电话号码
def clean_phone(phone):
if pd.isna(phone):
return None
# 移除所有非数字字符
cleaned = ''.join(filter(str.isdigit, str(phone)))
# 检查长度
if len(cleaned) == 11 and cleaned.startswith('1'):
return cleaned
return None # 无效号码

customers['phone_clean'] = customers['phone'].apply(clean_phone)

# 4. 标准化邮箱
customers['email_clean'] = customers['email'].str.lower()

# 验证邮箱格式
import re
def validate_email(email):
if pd.isna(email):
return False
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))

customers['email_valid'] = customers['email_clean'].apply(validate_email)

# 5. 处理异常值
# 年龄应该在合理范围内
customers['age_clean'] = customers['age'].clip(lower=0, upper=120)
# 用中位数填充缺失值
customers['age_clean'] = customers['age_clean'].fillna(customers['age_clean'].median())

# 6. 标准化日期格式
def parse_date(date_str):
if pd.isna(date_str):
return None
formats = ['%Y-%m-%d', '%Y/%m/%d', '%d-%m-%Y']
for fmt in formats:
try:
return pd.to_datetime(date_str, format=fmt)
except:
continue
return None

customers['register_date_clean'] = customers['register_date'].apply(parse_date)

# 7. 查看清洗结果
print("\n清洗后的数据:")
print(customers[['customer_id', 'name', 'phone_clean', 'email_clean',
'age_clean', 'register_date_clean']])

数据质量报告

# 生成数据质量报告
def data_quality_report(df):
report = pd.DataFrame({
'列名': df.columns,
'非空数': df.notna().sum().values,
'空值数': df.isna().sum().values,
'空值率': (df.isna().sum() / len(df) * 100).round(2).values,
'唯一值数': df.nunique().values,
'数据类型': df.dtypes.values
})
return report

print("数据质量报告:")
print(data_quality_report(customers))

案例 3:财务报表分析

场景描述

分析公司的财务数据,生成各种财务报表和指标。

# 模拟财务数据
np.random.seed(42)

# 交易流水
transactions = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=365, freq='D').repeat(5),
'type': np.random.choice(['收入', '支出'], 1825, p=[0.4, 0.6]),
'category': np.random.choice(
['销售收入', '服务收入', '工资', '房租', '办公费用', '差旅费', '其他'],
1825
),
'amount': np.random.uniform(1000, 50000, 1825)
})

# 根据类型调整金额范围
transactions.loc[transactions['type'] == '收入', 'amount'] = np.random.uniform(10000, 100000,
len(transactions[transactions['type'] == '收入']))
transactions.loc[transactions['type'] == '支出', 'amount'] = np.random.uniform(1000, 30000,
len(transactions[transactions['type'] == '支出']))

print("交易流水概览:")
print(transactions.head(10))

生成财务报表

# 月度收支汇总
monthly_summary = transactions.groupby(
[transactions['date'].dt.to_period('M'), 'type']
)['amount'].sum().unstack(fill_value=0)
monthly_summary.columns = ['支出', '收入']
monthly_summary['净利润'] = monthly_summary['收入'] - monthly_summary['支出']

print("月度收支汇总:")
print(monthly_summary)

# 按类别统计
category_summary = transactions.pivot_table(
index='category',
columns='type',
values='amount',
aggfunc='sum',
fill_value=0
)
print("\n按类别统计:")
print(category_summary)

# 计算财务指标
total_income = transactions[transactions['type'] == '收入']['amount'].sum()
total_expense = transactions[transactions['type'] == '支出']['amount'].sum()
net_profit = total_income - total_expense
profit_margin = net_profit / total_income * 100

print(f"\n年度财务指标:")
print(f"总收入: {total_income:,.2f}")
print(f"总支出: {total_expense:,.2f}")
print(f"净利润: {net_profit:,.2f}")
print(f"利润率: {profit_margin:.2f}%")

# 计算现金流趋势
daily_cashflow = transactions.groupby(['date', 'type'])['amount'].sum().unstack(fill_value=0)
daily_cashflow.columns = ['支出', '收入']
daily_cashflow['净现金流'] = daily_cashflow['收入'] - daily_cashflow['支出']
daily_cashflow['累计现金流'] = daily_cashflow['净现金流'].cumsum()

print("\n现金流趋势(最近10天):")
print(daily_cashflow.tail(10))

案例 4:日志分析

场景描述

分析服务器访问日志,提取有价值的信息。

# 模拟访问日志
np.random.seed(42)

# IP 地址池
ips = [f"192.168.{np.random.randint(1,255)}.{np.random.randint(1,255)}" for _ in range(100)]

# URL 路径
urls = ['/api/users', '/api/products', '/api/orders', '/api/login',
'/api/search', '/static/js/main.js', '/static/css/style.css']

# HTTP 状态码
status_codes = [200, 200, 200, 200, 201, 301, 400, 404, 500]

logs = pd.DataFrame({
'timestamp': pd.date_range('2024-01-15 00:00:00', periods=10000, freq='S'),
'ip': np.random.choice(ips, 10000),
'method': np.random.choice(['GET', 'POST', 'PUT', 'DELETE'], 10000, p=[0.7, 0.2, 0.08, 0.02]),
'url': np.random.choice(urls, 10000),
'status': np.random.choice(status_codes, 10000, p=[0.6, 0.15, 0.1, 0.05, 0.03, 0.03, 0.02, 0.01, 0.01]),
'response_time': np.random.exponential(0.1, 10000) * 1000 # 毫秒
})

print("日志数据概览:")
print(logs.head())

日志分析

# 1. 流量分析
# 按小时统计请求量
hourly_traffic = logs.set_index('timestamp').resample('H').size()
print("每小时请求量(前10个小时):")
print(hourly_traffic.head(10))

# 2. 状态码分布
status_dist = logs['status'].value_counts()
status_dist_pct = (status_dist / len(logs) * 100).round(2)

print("\n状态码分布:")
for status, count in status_dist.items():
print(f" {status}: {count} ({status_dist_pct[status]}%)")

# 3. 接口性能分析
api_performance = logs.groupby('url').agg({
'response_time': ['mean', 'median', 'max', 'count']
})
api_performance.columns = ['平均响应时间', '中位数响应时间', '最大响应时间', '请求次数']
api_performance = api_performance.sort_values('平均响应时间', ascending=False)

print("\n接口性能分析:")
print(api_performance)

# 4. 慢请求分析
slow_requests = logs[logs['response_time'] > 1000] # 超过1秒
print(f"\n慢请求数量: {len(slow_requests)}")
print("慢请求分布:")
print(slow_requests['url'].value_counts())

# 5. IP 访问分析
ip_stats = logs.groupby('ip').agg({
'timestamp': 'count',
'response_time': 'mean'
}).rename(columns={'timestamp': '请求次数', 'response_time': '平均响应时间'})

# 找出异常 IP(请求次数过多)
threshold = ip_stats['请求次数'].mean() + 3 * ip_stats['请求次数'].std()
suspicious_ips = ip_stats[ip_stats['请求次数'] > threshold]
print(f"\n可疑 IP (请求次数 > {threshold:.0f}):")
print(suspicious_ips)

# 6. 错误分析
errors = logs[logs['status'] >= 400]
error_by_url = errors.groupby('url').size().sort_values(ascending=False)

print("\n错误最多的接口:")
print(error_by_url.head(5))

案例 5:数据合并与整合

场景描述

整合来自不同数据源的数据,构建完整的数据视图。

# 模拟多个数据源

# 订单数据
orders = pd.DataFrame({
'order_id': ['O001', 'O002', 'O003', 'O004', 'O005'],
'customer_id': ['C001', 'C002', 'C001', 'C003', 'C002'],
'order_date': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04', '2024-01-05']),
'total_amount': [1000, 2500, 800, 3000, 1500]
})

# 客户信息
customers = pd.DataFrame({
'customer_id': ['C001', 'C002', 'C003', 'C004'],
'name': ['张三', '李四', '王五', '赵六'],
'city': ['北京', '上海', '广州', '深圳'],
'level': ['VIP', '普通', 'VIP', '普通']
})

# 订单明细
order_items = pd.DataFrame({
'order_id': ['O001', 'O001', 'O002', 'O003', 'O004', 'O004', 'O005'],
'product_id': ['P001', 'P002', 'P001', 'P003', 'P001', 'P002', 'P003'],
'quantity': [1, 2, 1, 1, 2, 1, 1],
'unit_price': [500, 250, 500, 800, 500, 250, 800]
})

# 产品信息
products = pd.DataFrame({
'product_id': ['P001', 'P002', 'P003', 'P004'],
'product_name': ['笔记本电脑', '鼠标', '键盘', '显示器'],
'category': ['电子产品', '配件', '配件', '电子产品']
})

print("订单数据:")
print(orders)
print("\n客户信息:")
print(customers)

数据整合

# 1. 合并订单和客户信息
orders_with_customer = pd.merge(orders, customers, on='customer_id', how='left')
print("订单 + 客户:")
print(orders_with_customer)

# 2. 合并订单明细和产品信息
items_with_product = pd.merge(order_items, products, on='product_id', how='left')
print("\n订单明细 + 产品:")
print(items_with_product)

# 3. 构建完整视图
# 先计算每个订单的明细汇总
order_summary = items_with_product.groupby('order_id').agg({
'quantity': 'sum',
'unit_price': lambda x: (x * items_with_product.loc[x.index, 'quantity']).sum()
}).reset_index()
order_summary.columns = ['order_id', '总数量', '明细金额']

# 合并到主表
full_view = pd.merge(orders_with_customer, order_summary, on='order_id', how='left')

# 验证数据一致性
full_view['金额差异'] = full_view['total_amount'] - full_view['明细金额']
print("\n完整视图:")
print(full_view[['order_id', 'name', 'total_amount', '明细金额', '金额差异']])

# 4. 生成分析报表
# 客户购买分析
customer_analysis = full_view.groupby(['customer_id', 'name', 'level']).agg({
'order_id': 'count',
'total_amount': ['sum', 'mean']
}).reset_index()
customer_analysis.columns = ['customer_id', 'name', 'level', '订单数', '总消费', '平均订单金额']

print("\n客户购买分析:")
print(customer_analysis)

# 产品销售分析
product_sales = items_with_product.groupby(['product_id', 'product_name', 'category']).agg({
'quantity': 'sum',
'unit_price': lambda x: (x * items_with_product.loc[x.index, 'quantity']).sum()
}).reset_index()
product_sales.columns = ['product_id', 'product_name', 'category', '销售数量', '销售金额']

print("\n产品销售分析:")
print(product_sales)

小结

本章节通过五个实战案例,展示了 Pandas 在实际数据分析中的应用:

案例 1:电商销售分析

  • 时间序列分析
  • 用户行为分析(RFM 模型)
  • 产品分析(ABC 分类)

案例 2:数据清洗

  • 重复数据处理
  • 字符串标准化
  • 异常值处理
  • 日期格式转换

案例 3:财务分析

  • 收支汇总
  • 财务指标计算
  • 现金流分析

案例 4:日志分析

  • 流量分析
  • 性能分析
  • 异常检测

案例 5:数据整合

  • 多表合并
  • 数据验证
  • 报表生成

关键技能

  1. 数据清洗与预处理
  2. 时间序列处理
  3. 分组聚合分析
  4. 数据合并与整合
  5. 异常检测与分析

练习

  1. 扩展电商分析案例,添加用户留存率分析
  2. 在数据清洗案例中,添加地址字段的标准化处理
  3. 为财务分析添加同比和环比分析
  4. 扩展日志分析,添加用户行为路径分析
  5. 设计一个完整的数据 ETL 流程

总结

恭喜你完成了 Pandas 教程的学习!现在你已经掌握了:

  • Pandas 核心数据结构(Series、DataFrame)
  • 数据读取与保存
  • 数据选择与过滤
  • 数据清洗与转换
  • 数据合并与整合
  • 分组聚合分析
  • 透视表与数据重塑
  • 时间序列处理
  • 高级索引技巧
  • 性能优化方法
  • 实际业务场景应用

继续实践,将 Pandas 应用到你的实际项目中!