实际系统架构案例分析
本章节深入分析业界知名系统的架构设计,从实际案例中学习和理解大规模分布式系统的设计原则和最佳实践。
案例一:Twitter/X 社交网络架构
系统概况
Twitter/X 是一个全球性的实时社交网络和微博客平台,核心功能包括:
- 发布短消息(推文)
- 关注/粉丝关系管理
- 实时时间线(Timeline)
- 搜索和发现
- 通知系统
核心挑战
| 挑战 | 描述 |
|---|---|
| 写入扇出 | 一条推文可能需要推送给数百万粉丝 |
| 读取延迟 | 时间线必须在毫秒级返回 |
| 实时性 | 新推文需要实时推送给关注者 |
| 一致性 | 最终一致性下的用户体验保证 |
架构演进
Twitter 的架构经历了多次重大演进:
阶段1: 单体应用 (2006-2010)
┌─────────────────────────────────────┐
│ Ruby on Rails │
│ MySQL + Memcached │
└─────────────────────────────────────┘
阶段2: 服务拆分 (2010-2012)
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Tweet │ │ User │ │ Timeline│
│ Service │ │ Service │ │ Service │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────┴───────────┘
│
┌──────┴──────┐
│ MySQL │
│ Redis │
└─────────────┘
阶段3: 分布式架构 (2012-至今)
┌─────────────────────────────────────────────────────────────┐
│ Twitter 分布式架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API Gateway│ │ Timeline │ │ Search │ │
│ │ │ │ Service │ │ Service │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────┴─────────────────┴─────────────────┴──────┐ │
│ │ 服务网格 (Service Mesh) │ │
│ └──────┬─────────────────┬─────────────────┬──────┘ │
│ │ │ │ │
│ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │
│ │ Tweet │ │ User │ │ Graph │ │
│ │ Service │ │ Service │ │ Service │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────┴─────────────────┴─────────────────┴──────┐ │
│ │ 数据层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Manhattan│ │ Redis │ │Gizmoduck│ │ │
│ │ │ (NoSQL) │ │ (Cache) │ │(Graph) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
核心技术组件
1. 时间线服务(Timeline Service)
Twitter 使用混合模式处理时间线:
@Service
public class TimelineService {
private final RedisTemplate<String, Tweet> redisTemplate;
private final TweetRepository tweetRepository;
private final UserGraphService userGraphService;
/**
* 获取用户时间线 - 扇出读模式
*/
public List<Tweet> getHomeTimeline(Long userId, int count) {
String timelineKey = "timeline:" + userId;
// 1. 从 Redis 获取预计算的时间线
List<Tweet> cachedTweets = redisTemplate.opsForList()
.range(timelineKey, 0, count - 1);
if (cachedTweets != null && cachedTweets.size() >= count) {
return cachedTweets;
}
// 2. 缓存未命中,从数据库获取
List<Long> followingIds = userGraphService.getFollowingIds(userId);
List<Tweet> tweets = tweetRepository.findByUserIds(
followingIds,
PageRequest.of(0, count, Sort.by("createdAt").descending())
);
// 3. 回填缓存
cacheTimeline(userId, tweets);
return tweets;
}
/**
* 发布推文 - 扇出写模式
*/
public void postTweet(Long userId, String content) {
// 1. 保存推文
Tweet tweet = Tweet.builder()
.userId(userId)
.content(content)
.createdAt(Instant.now())
.build();
tweetRepository.save(tweet);
// 2. 推送到粉丝的时间线
fanoutTweet(userId, tweet);
}
/**
* 扇出推文到粉丝时间线
*/
private void fanoutTweet(Long userId, Tweet tweet) {
List<Long> followerIds = userGraphService.getFollowerIds(userId);
// 对于普通用户(粉丝<1000),直接同步推送
if (followerIds.size() < 1000) {
for (Long followerId : followerIds) {
String timelineKey = "timeline:" + followerId;
redisTemplate.opsForList().leftPush(timelineKey, tweet);
redisTemplate.opsForList().trim(timelineKey, 0, 999); // 保留最近1000条
}
} else {
// 对于大V用户,采用异步批量推送
asyncFanoutToFollowers(userId, tweet, followerIds);
}
}
}
2. 社交图谱服务(Graph Service)
Twitter 使用 Gizmoduck 管理社交关系:
@Service
public class SocialGraphService {
private final GraphDatabase graphDb;
/**
* 获取用户的关注列表
*/
public List<Long> getFollowingIds(Long userId) {
// 从图数据库查询
return graphDb.findNeighbors(userId, EdgeType.FOLLOWS);
}
/**
* 获取用户的粉丝列表
*/
public List<Long> getFollowerIds(Long userId) {
return graphDb.findReverseNeighbors(userId, EdgeType.FOLLOWS);
}
/**
* 关注用户
*/
public void follow(Long userId, Long targetUserId) {
graphDb.addEdge(userId, targetUserId, EdgeType.FOLLOWS);
// 发布关注事件
eventPublisher.publish(new FollowEvent(userId, targetUserId));
}
}
3. 推文存储(Manhattan)
Twitter 自研的分布式键值存储:
@Repository
public class ManhattanTweetRepository implements TweetRepository {
private final ManhattanClient manhattan;
@Override
public void save(Tweet tweet) {
// 主键:tweet_id
String key = "tweet:" + tweet.getId();
byte[] value = serialize(tweet);
manhattan.put(key, value, WriteConsistency.QUORUM);
}
@Override
public Optional<Tweet> findById(Long tweetId) {
String key = "tweet:" + tweetId;
byte[] value = manhattan.get(key, ReadConsistency.ONE);
return value != null ? Optional.of(deserialize(value)) : Optional.empty();
}
@Override
public List<Tweet> findByUserIds(List<Long> userIds, Pageable pageable) {
// 并行查询多个用户的推文
List<CompletableFuture<List<Tweet>>> futures = userIds.stream()
.map(userId -> CompletableFuture.supplyAsync(
() -> findByUserId(userId, pageable.getPageSize())
))
.collect(Collectors.toList());
// 合并结果并排序
return futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.sorted(Comparator.comparing(Tweet::getCreatedAt).reversed())
.limit(pageable.getPageSize())
.collect(Collectors.toList());
}
}
关键设计决策
1. 扇出策略
┌─────────────────────────────────────────────────────────────┐
│ 扇出策略选择 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 扇出读 (Fan-out on Read) │
│ ┌─────────┐ │
│ │ 用户请求 │──> 查询所有关注者的推文 ──> 合并排序 │
│ └─────────┘ │
│ 优点:写入快,节省存储 │
│ 缺点:读取慢,需要实时计算 │
│ │
│ 扇出写 (Fan-out on Write) │
│ ┌─────────┐ │
│ │ 发布推文 │──> 推送到所有粉丝的时间线 ──> 读取直接返回 │
│ └─────────┘ │
│ 优点:读取极快 │
│ 缺点:写入慢,存储需求大 │
│ │
│ Twitter 的混合策略: │
│ - 普通用户:扇出写 │
│ - 大V用户:扇出读 + 异步推送 │
│ │
└─────────────────────────────────────────────────────────────┘
2. 缓存策略
@Configuration
public class CacheConfig {
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.withCacheConfiguration("timelines",
config.entryTtl(Duration.ofMinutes(5)))
.withCacheConfiguration("userProfiles",
config.entryTtl(Duration.ofHours(1)))
.build();
}
}
性能指标
| 指标 | 数值 |
|---|---|
| 日活跃用户 | 2亿+ |
| 每日推文 | 5亿+ |
| 时间线读取延迟 | < 50ms (P99) |
| 推文传播延迟 | < 5秒 |
案例二:YouTube 视频流媒体架构
系统概况
YouTube 是全球最大的视频分享平台,核心功能包括:
- 视频上传和存储
- 视频转码和处理
- 流媒体播放
- 推荐系统
- 搜索和发现
核心挑战
| 挑战 | 描述 |
|---|---|
| 存储规模 | 每分钟上传 500+ 小时视频 |
| 带宽成本 | 每天 PB 级数据传输 |
| 全球分发 | 覆盖全球 100+ 国家/地区 |
| 多设备支持 | 从手机到 4K 电视 |
架构概览
┌─────────────────────────────────────────────────────────────────────────────┐
│ YouTube 系统架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 接入层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Web App │ │ Mobile App │ │ TV App │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ └─────────┼────────────────┼────────────────┼────────────────────────┘ │
│ │ │ │ │
│ ┌─────────┴────────────────┴────────────────┴────────────────────────┐ │
│ │ 负载均衡 (GCLB) │ │
│ └──────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┼─────────────────────────────────────┐ │
│ │ 服务层 │ │
│ │ ┌─────────────┐ ┌─────────┴─────────┐ ┌─────────────┐ │ │
│ │ │ Upload │ │ Playback │ │ Search │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ │
│ │ └──────┬──────┘ └─────────┬─────────┘ └──────┬──────┘ │ │
│ │ │ │ │ │ │
│ │ ┌──────┴──────┐ ┌─────────┴─────────┐ ┌──────┴──────┐ │ │
│ │ │ Transcode │ │ Recommendation │ │ Ad Insert │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ │
│ │ └─────────────┘ └───────────────────┘ └─────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┼─────────────────────────────────────┐ │
│ │ 数据层 │ │
│ │ ┌─────────────┐ ┌─────────┴─────────┐ ┌─────────────┐ │ │
│ │ │ Video │ │ Metadata │ │ CDN │ │ │
│ │ │ Storage │ │ (Bigtable) │ │ (Edge PoP) │ │ │
│ │ │ (GCS) │ │ │ │ │ │ │
│ │ └─────────────┘ └───────────────────┘ └─────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心技术组件
1. 视频上传和处理流程
@Service
public class VideoUploadService {
private final StorageService storageService;
private final TranscodeService transcodeService;
private final MetadataService metadataService;
/**
* 处理视频上传
*/
public UploadResult uploadVideo(UploadRequest request, InputStream videoStream) {
// 1. 生成视频ID
String videoId = generateVideoId();
// 2. 上传到临时存储
String tempPath = "uploads/temp/" + videoId;
storageService.upload(tempPath, videoStream);
// 3. 提取元数据
VideoMetadata metadata = extractMetadata(tempPath);
// 4. 保存元数据
metadataService.save(Video.builder()
.id(videoId)
.title(request.getTitle())
.description(request.getDescription())
.uploaderId(request.getUserId())
.duration(metadata.getDuration())
.resolution(metadata.getResolution())
.status(VideoStatus.PROCESSING)
.build());
// 5. 发送转码任务
transcodeService.submitTranscodeJob(videoId, tempPath);
return UploadResult.builder()
.videoId(videoId)
.status(VideoStatus.PROCESSING)
.estimatedTime(calculateEstimatedTime(metadata))
.build();
}
}
@Service
public class TranscodeService {
private final JobQueue jobQueue;
private final StorageService storageService;
/**
* 提交转码任务
*/
public void submitTranscodeJob(String videoId, String sourcePath) {
// 定义输出格式
List<TranscodeProfile> profiles = Arrays.asList(
new TranscodeProfile("1080p", 1920, 1080, 5000000), // 5Mbps
new TranscodeProfile("720p", 1280, 720, 2500000), // 2.5Mbps
new TranscodeProfile("480p", 854, 480, 1000000), // 1Mbps
new TranscodeProfile("360p", 640, 360, 500000) // 500Kbps
);
TranscodeJob job = TranscodeJob.builder()
.videoId(videoId)
.sourcePath(sourcePath)
.profiles(profiles)
.build();
jobQueue.submit(job);
}
/**
* 执行转码
*/
@JobWorker(queue = "transcode")
public void processTranscodeJob(TranscodeJob job) {
for (TranscodeProfile profile : job.getProfiles()) {
try {
// 转码视频
String outputPath = String.format("videos/%s/%s.mp4",
job.getVideoId(), profile.getName());
transcodeVideo(job.getSourcePath(), outputPath, profile);
// 生成缩略图
generateThumbnails(job.getSourcePath(), job.getVideoId());
} catch (Exception e) {
log.error("Transcode failed for profile: {}", profile.getName(), e);
}
}
// 更新视频状态
metadataService.updateStatus(job.getVideoId(), VideoStatus.READY);
}
}
2. 视频流媒体播放
@Service
public class PlaybackService {
private final CDNService cdnService;
private final MetadataService metadataService;
/**
* 获取视频播放信息
*/
public PlaybackInfo getPlaybackInfo(String videoId, ClientInfo client) {
Video video = metadataService.findById(videoId)
.orElseThrow(() -> new VideoNotFoundException(videoId));
// 选择最佳CDN节点
CDNNode cdnNode = cdnService.selectOptimalNode(client.getLocation());
// 根据客户端能力选择可用格式
List<StreamFormat> availableFormats = selectFormats(video, client);
// 生成流媒体URL
List<StreamUrl> streamUrls = availableFormats.stream()
.map(format -> StreamUrl.builder()
.quality(format.getQuality())
.url(generateStreamUrl(cdnNode, videoId, format))
.bandwidth(format.getBandwidth())
.build())
.collect(Collectors.toList());
return PlaybackInfo.builder()
.videoId(videoId)
.title(video.getTitle())
.duration(video.getDuration())
.streams(streamUrls)
.cdnNode(cdnNode.getId())
.build();
}
/**
* 生成自适应流媒体清单 (HLS/DASH)
*/
public String generateManifest(String videoId) {
Video video = metadataService.findById(videoId)
.orElseThrow(() -> new VideoNotFoundException(videoId));
StringBuilder manifest = new StringBuilder();
manifest.append("#EXTM3U\n");
for (TranscodeProfile profile : video.getTranscodedProfiles()) {
manifest.append(String.format(
"#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d\n",
profile.getBandwidth(),
profile.getWidth(),
profile.getHeight()
));
manifest.append(String.format("%s/%s/playlist.m3u8\n",
videoId, profile.getName()));
}
return manifest.toString();
}
}
3. CDN 内容分发
@Service
public class CDNService {
private final GeoDNS geoDNS;
private final CDNNodeRepository nodeRepository;
/**
* 选择最优 CDN 节点
*/
public CDNNode selectOptimalNode(Location clientLocation) {
// 获取距离最近的节点
List<CDNNode> nearbyNodes = nodeRepository.findNearbyNodes(
clientLocation.getLatitude(),
clientLocation.getLongitude(),
500 // 500km 范围内
);
// 考虑节点负载和健康状况
return nearbyNodes.stream()
.filter(node -> node.isHealthy() && node.getLoad() < 0.8)
.min(Comparator.comparingInt(node ->
calculateScore(node, clientLocation)))
.orElseGet(() -> getGlobalFallbackNode());
}
/**
* 预热热门视频到边缘节点
*/
public void prefetchPopularVideos() {
List<String> popularVideos = getPopularVideos(1000);
for (String videoId : popularVideos) {
List<CDNNode> edgeNodes = nodeRepository.findAllEdgeNodes();
for (CDNNode node : edgeNodes) {
if (!node.hasVideo(videoId)) {
node.prefetchVideo(videoId);
}
}
}
}
}
视频存储优化
@Service
public class VideoStorageOptimizer {
/**
* 视频存储分层策略
*/
public void optimizeStorage() {
// 1. 热数据:SSD 存储,最近7天上传的视频
List<Video> hotVideos = getVideosUploadedInLastDays(7);
moveToSSD(hotVideos);
// 2. 温数据:标准存储,7-90天的视频
List<Video> warmVideos = getVideosUploadedBetweenDays(7, 90);
moveToStandardStorage(warmVideos);
// 3. 冷数据:低频访问存储,90天-1年的视频
List<Video> coldVideos = getVideosUploadedBetweenDays(90, 365);
moveToNearlineStorage(coldVideos);
// 4. 归档数据:归档存储,1年以上的视频
List<Video> archiveVideos = getVideosOlderThanDays(365);
moveToColdlineStorage(archiveVideos);
}
/**
* 视频去重和压缩
*/
public void deduplicateAndCompress() {
// 使用感知哈希检测重复视频
List<VideoGroup> duplicateGroups = findDuplicateVideos();
for (VideoGroup group : duplicateGroups) {
// 保留最高质量的版本
Video bestQuality = group.getVideos().stream()
.max(Comparator.comparing(Video::getResolution))
.orElseThrow();
// 其他版本重定向到主版本
for (Video duplicate : group.getVideos()) {
if (!duplicate.getId().equals(bestQuality.getId())) {
createRedirect(duplicate.getId(), bestQuality.getId());
markForDeletion(duplicate);
}
}
}
}
}
性能指标
| 指标 | 数值 |
|---|---|
| 月活跃用户 | 25亿+ |
| 每日观看时长 | 10亿+ 小时 |
| 视频库规模 | 数十亿视频 |
| 支持分辨率 | 144p - 8K |
| 全球 CDN 节点 | 数千个 |
案例三:电商系统架构(Amazon/淘宝)
系统概况
电商平台核心功能包括:
- 商品目录管理
- 购物车与订单
- 支付处理
- 库存管理
- 推荐系统
- 物流跟踪
架构设计
┌─────────────────────────────────────────────────────────────────────────────┐
│ 电商系统架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 网关层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ WAF │ │ Rate │ │ CDN │ │ │
│ │ │ │ │ Limiter │ │ │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ └─────────┼────────────────┼────────────────┼────────────────────────┘ │
│ │ │ │ │
│ ┌─────────┴────────────────┴────────────────┴────────────────────────┐ │
│ │ API Gateway │ │
│ │ 认证 │ 路由 │ 限流 │ 熔断 │ 日志 │ │
│ └──────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┼─────────────────────────────────────┐ │
│ │ 业务服务层 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ User │ │ Product │ │ Order │ │ Payment │ │Inventory│ │ │
│ │ │ Service │ │ Service │ │ Service │ │ Service │ │ Service │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │ │
│ │ ┌────┴────────────┴────────────┴────────────┴────────────┘ │ │
│ │ │ 消息队列 (Kafka) │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┼─────────────────────────────────────┐ │
│ │ 数据层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ User │ │ Product │ │ Order │ │ Payment │ │Inventory│ │ │
│ │ │ DB │ │ DB │ │ DB │ │ DB │ │ DB │ │ │
│ │ │(MySQL) │ │(ES) │ │(MySQL) │ │(PG) │ │(Redis) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心服务实现
1. 订单服务
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final EventPublisher eventPublisher;
/**
* 创建订单
*/
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 1. 验证库存
for (OrderItemRequest item : request.getItems()) {
boolean available = inventoryService.checkAndReserve(
item.getProductId(),
item.getQuantity()
);
if (!available) {
throw new InsufficientStockException(item.getProductId());
}
}
// 2. 创建订单
Order order = Order.builder()
.id(generateOrderId())
.userId(request.getUserId())
.items(request.getItems().stream()
.map(this::toOrderItem)
.collect(Collectors.toList()))
.totalAmount(calculateTotal(request.getItems()))
.status(OrderStatus.CREATED)
.createdAt(Instant.now())
.build();
orderRepository.save(order);
// 3. 发送订单创建事件
eventPublisher.publish(new OrderCreatedEvent(order));
return order;
}
/**
* 处理支付回调
*/
@Transactional
public void handlePaymentCallback(String orderId, PaymentResult result) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
if (result.isSuccess()) {
order.confirmPayment(result.getTransactionId());
orderRepository.save(order);
// 扣减库存
for (OrderItem item : order.getItems()) {
inventoryService.deductStock(item.getProductId(), item.getQuantity());
}
eventPublisher.publish(new OrderPaidEvent(order));
} else {
order.cancel();
orderRepository.save(order);
// 释放库存
for (OrderItem item : order.getItems()) {
inventoryService.releaseStock(item.getProductId(), item.getQuantity());
}
}
}
}
2. 库存服务
@Service
public class InventoryService {
private final RedisTemplate<String, Integer> redisTemplate;
private final InventoryRepository inventoryRepository;
/**
* 检查并预留库存
*/
public boolean checkAndReserve(String productId, int quantity) {
String key = "inventory:" + productId;
// 使用 Redis Lua 脚本保证原子性
String luaScript =
"local stock = tonumber(redis.call('get', KEYS[1]));" +
"if stock == nil then return -1 end;" +
"if stock >= tonumber(ARGV[1]) then" +
" redis.call('decrby', KEYS[1], ARGV[1]);" +
" return 1;" +
"else" +
" return 0;" +
"end;";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(key),
String.valueOf(quantity)
);
return result != null && result == 1;
}
/**
* 异步同步数据库库存到 Redis
*/
@Scheduled(fixedRate = 60000) // 每分钟同步
public void syncInventoryToCache() {
List<Inventory> inventories = inventoryRepository.findAll();
for (Inventory inventory : inventories) {
String key = "inventory:" + inventory.getProductId();
redisTemplate.opsForValue().set(key, inventory.getQuantity());
}
}
}
秒杀系统优化
@Service
public class SeckillService {
private final RedisTemplate<String, Object> redisTemplate;
private final KafkaTemplate<String, OrderMessage> kafkaTemplate;
/**
* 秒杀下单
*/
public SeckillResult seckill(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userKey = "seckill:user:" + productId + ":" + userId;
// 1. 检查是否已购买
Boolean hasBought = redisTemplate.hasKey(userKey);
if (Boolean.TRUE.equals(hasBought)) {
return SeckillResult.fail("您已经参与过该商品的秒杀");
}
// 2. 扣减库存(原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("商品已售罄");
}
// 3. 标记用户已购买
redisTemplate.opsForValue().set(userKey, "1", Duration.ofHours(24));
// 4. 发送异步订单消息
OrderMessage message = OrderMessage.builder()
.userId(userId)
.productId(productId)
.timestamp(Instant.now())
.build();
kafkaTemplate.send("seckill-orders", message);
return SeckillResult.success("秒杀成功,正在处理订单");
}
}
案例四:即时通讯系统(WhatsApp/微信)
系统概况
即时通讯系统核心功能:
- 一对一聊天
- 群聊
- 消息同步
- 已读回执
- 文件传输
- 语音/视频通话
架构设计
┌─────────────────────────────────────────────────────────────────────────────┐
│ 即时通讯系统架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 接入层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Mobile │ │ Web │ │ Desktop │ │ Mini │ │ │
│ │ │ App │ │ App │ │ App │ │ Program │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ └───────┼────────────┼────────────┼────────────┼───────────────────────┘ │
│ │ │ │ │ │
│ ┌───────┴────────────┴────────────┴────────────┴───────────────────────┐ │
│ │ 长连接网关 (Gateway) │ │
│ │ WebSocket │ TCP │ QUIC │ 负载均衡 │ 连接管理 │ │
│ └──────────────────────────────┬───────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────┼───────────────────────────────────────┐ │
│ │ 业务服务层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Msg │ │ User │ │ Group │ │ Push │ │ Media │ │ │
│ │ │ Service │ │ Service │ │ Service │ │ Service │ │ Service │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ └───────┼────────────┼────────────┼────────────┼────────────┼─────────┘ │
│ │ │ │ │ │ │
│ ┌───────┴────────────┴────────────┴────────────┴────────────┘ │
│ │ 消息队列 (RocketMQ/Kafka) │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ 数据层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Msg │ │ User │ │ Group │ │ Media │ │ │
│ │ │ DB │ │ DB │ │ DB │ │ Storage │ │ │
│ │ │(HBase) │ │(MySQL) │ │(MySQL) │ │ (OSS) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Redis │ │ ES │ │ │
│ │ │ (Cache) │ │(Search) │ │ │
│ │ └─────────┘ └─────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心服务实现
1. 消息服务
@Service
public class MessageService {
private final MessageRepository messageRepository;
private final GatewayService gatewayService;
private final KafkaTemplate<String, MessageEvent> kafkaTemplate;
/**
* 发送消息
*/
public Message sendMessage(SendMessageRequest request) {
// 1. 生成消息ID
String messageId = generateMessageId();
// 2. 保存消息
Message message = Message.builder()
.id(messageId)
.fromUser(request.getFromUser())
.toUser(request.getToUser())
.content(request.getContent())
.type(request.getType())
.status(MessageStatus.SENDING)
.timestamp(Instant.now())
.build();
messageRepository.save(message);
// 3. 推送到接收方
boolean delivered = gatewayService.pushToUser(
request.getToUser(),
message
);
// 4. 更新状态
if (delivered) {
message.setStatus(MessageStatus.DELIVERED);
messageRepository.updateStatus(messageId, MessageStatus.DELIVERED);
}
// 5. 发送事件用于离线通知
kafkaTemplate.send("message-events", new MessageEvent(message));
return message;
}
/**
* 拉取历史消息
*/
public List<Message> pullMessages(Long userId, Long conversationId,
String lastMessageId, int limit) {
// 使用游标分页
return messageRepository.findByConversation(
conversationId,
lastMessageId,
limit
);
}
}
2. 消息ID生成
@Component
public class MessageIdGenerator {
private final Snowflake snowflake;
public MessageIdGenerator() {
// 使用雪花算法生成唯一ID
this.snowflake = new Snowflake(
getDatacenterId(), // 数据中心ID
getWorkerId() // 工作节点ID
);
}
public String generate() {
return String.valueOf(snowflake.nextId());
}
}
/**
* 雪花算法实现
*/
public class Snowflake {
private final long datacenterId;
private final long workerId;
private long sequence = 0L;
private long lastTimestamp = -1L;
// 时间戳偏移量
private final long timestampShift = 22;
private final long datacenterIdShift = 17;
private final long workerIdShift = 12;
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards");
}
if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & 4095; // 12位序列号
if (sequence == 0) {
timestamp = waitNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - 1288834974657L) << timestampShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
}
3. 消息同步机制
@Service
public class MessageSyncService {
private final MessageRepository messageRepository;
private final SyncCheckpointRepository checkpointRepository;
/**
* 增量同步消息
*/
public SyncResult syncMessages(Long userId, SyncRequest request) {
// 获取上次同步点
SyncCheckpoint checkpoint = checkpointRepository
.findByUserIdAndDevice(userId, request.getDeviceId())
.orElse(new SyncCheckpoint());
// 查询增量消息
List<Message> newMessages = messageRepository
.findMessagesAfter(userId, checkpoint.getLastSequence(), 1000);
// 更新同步点
if (!newMessages.isEmpty()) {
long newSequence = newMessages.get(newMessages.size() - 1).getSequence();
checkpoint.setLastSequence(newSequence);
checkpoint.setSyncTime(Instant.now());
checkpointRepository.save(checkpoint);
}
return SyncResult.builder()
.messages(newMessages)
.hasMore(newMessages.size() >= 1000)
.syncTime(Instant.now())
.build();
}
}
群聊实现
@Service
public class GroupChatService {
private final GroupRepository groupRepository;
private final MessageService messageService;
private final GatewayService gatewayService;
/**
* 发送群消息
*/
public void sendGroupMessage(Long groupId, Message message) {
// 1. 获取群成员
List<Long> members = groupRepository.getMemberIds(groupId);
// 2. 保存消息
messageService.saveMessage(message);
// 3. 批量推送(使用扇出)
for (List<Long> batch : Lists.partition(members, 100)) {
gatewayService.batchPush(batch, message);
}
// 4. 更新群消息游标
groupRepository.updateLastMessage(groupId, message.getId());
}
/**
* 获取群消息(支持漫游)
*/
public List<Message> getGroupMessages(Long groupId, Long userId,
String lastMessageId, int limit) {
// 检查用户是否在群中
if (!groupRepository.isMember(groupId, userId)) {
throw new NotGroupMemberException();
}
return messageService.getMessages(groupId, lastMessageId, limit);
}
}
架构模式总结
| 系统类型 | 核心挑战 | 关键技术 |
|---|---|---|
| 社交网络 | 扇出、实时性 | 时间线预计算、图数据库 |
| 视频流媒体 | 存储、带宽 | CDN、转码、自适应码率 |
| 电商 | 一致性、并发 | 分布式事务、库存预扣 |
| 即时通讯 | 实时推送、消息顺序 | 长连接、消息ID、漫游 |
学习建议
- 从简单开始:先理解单体架构,再学习分布式
- 关注数据流:理解数据如何在系统中流动
- 权衡取舍:没有完美的架构,只有适合的架构
- 实践验证:通过实际项目验证设计方案