实时订阅
Supabase Realtime 提供了基于 WebSocket 的实时数据同步功能,让客户端可以即时接收数据库变更通知。本章节介绍如何使用 Realtime 构建实时应用。
实时订阅概述
Supabase Realtime 基于 PostgreSQL 的逻辑复制功能,监听数据库的写入前日志(WAL),将数据变更实时推送给订阅的客户端。
支持的事件类型
- INSERT:新数据插入
- UPDATE:数据更新
- DELETE:数据删除
- ALL:所有变更事件
适用场景
- 实时聊天应用
- 协作编辑工具
- 实时仪表盘
- 在线游戏
- 实时通知系统
启用 Realtime
在 Dashboard 中启用
- 进入 Database → Replication
- 找到要启用实时订阅的表
- 勾选 "Realtime" 选项
使用 SQL 启用
-- 为表启用实时订阅
alter publication supabase_realtime add table posts;
-- 查看已启用的表
select * from pg_publication_tables where pubname = 'supabase_realtime';
-- 移除表的实时订阅
alter publication supabase_realtime drop table posts;
基本订阅
订阅表变更
import { createClient } from '@supabase/supabase-js'
const supabase = createClient(
'https://your-project.supabase.co',
'your-anon-key'
)
// 订阅 posts 表的所有变更
const channel = supabase
.channel('posts-changes')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('变更事件:', payload)
}
)
.subscribe()
// payload 结构
// {
// eventType: 'INSERT' | 'UPDATE' | 'DELETE',
// schema: 'public',
// table: 'posts',
// new: { ... }, // 新数据(INSERT/UPDATE)
// old: { ... }, // 旧数据(UPDATE/DELETE)
// errors: null
// }
订阅特定事件
// 只订阅插入事件
const channel = supabase
.channel('posts-inserts')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('新文章:', payload.new)
}
)
.subscribe()
// 只订阅更新事件
const channel = supabase
.channel('posts-updates')
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('更新:', payload.old, '->', payload.new)
}
)
.subscribe()
// 只订阅删除事件
const channel = supabase
.channel('posts-deletes')
.on(
'postgres_changes',
{
event: 'DELETE',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('删除:', payload.old)
}
)
.subscribe()
取消订阅
// 取消订阅
channel.unsubscribe()
// 或使用 supabase 移除所有频道
supabase.removeAllChannels()
过滤订阅
按列值过滤
只订阅特定条件的数据变更:
// 只订阅 author_id = 1 的文章变更
const channel = supabase
.channel('author-posts')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts',
filter: 'author_id=eq.1'
},
(payload) => {
console.log('作者1的文章变更:', payload)
}
)
.subscribe()
过滤操作符
// 等于
filter: 'status=eq.published'
// 不等于
filter: 'status=neq.draft'
// 大于
filter: 'views=gt.100'
// 大于等于
filter: 'views=gte.100'
// 小于
filter: 'views=lt.1000'
// 小于等于
filter: 'views=lte.1000'
// 包含在列表中
filter: 'status=in.(published,featured)'
// 模糊匹配
filter: 'title=ilike.*supabase*'
组合过滤
目前不支持多个过滤条件,可以通过创建数据库视图解决:
-- 创建视图
create view published_posts as
select * from posts where status = 'published';
-- 启用实时订阅
alter publication supabase_realtime add view published_posts;
// 订阅视图
const channel = supabase
.channel('published-posts')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'published_posts'
},
(payload) => console.log(payload)
)
.subscribe()
广播和状态同步
除了数据库变更,Realtime 还支持客户端之间的直接通信。
广播消息
// 发送广播消息
const channel = supabase
.channel('room-1')
.on('broadcast', { event: 'cursor-move' }, (payload) => {
console.log('收到消息:', payload)
})
.subscribe()
// 发送消息
channel.send({
type: 'broadcast',
event: 'cursor-move',
payload: { x: 100, y: 200, userId: 'user-1' }
})
状态同步
共享状态给所有订阅者:
const channel = supabase
.channel('room-1')
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState()
console.log('当前在线用户:', state)
})
.on('presence', { event: 'join' }, ({ newPresences }) => {
console.log('用户加入:', newPresences)
})
.on('presence', { event: 'leave' }, ({ leftPresences }) => {
console.log('用户离开:', leftPresences)
})
.subscribe()
// 加入房间
channel.track({
user: 'user-1',
online_at: new Date().toISOString()
})
// 离开房间
channel.untrack()
React 集成
自定义 Hook
import { useEffect, useState } from 'react'
import { supabase } from '../lib/supabase'
export function useRealtimePosts(userId) {
const [posts, setPosts] = useState([])
useEffect(() => {
// 初始加载
fetchPosts()
// 订阅变更
const channel = supabase
.channel('posts-realtime')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts',
filter: userId ? `author_id=eq.${userId}` : undefined
},
(payload) => {
if (payload.eventType === 'INSERT') {
setPosts((prev) => [...prev, payload.new])
} else if (payload.eventType === 'UPDATE') {
setPosts((prev) =>
prev.map((p) =>
p.id === payload.new.id ? payload.new : p
)
)
} else if (payload.eventType === 'DELETE') {
setPosts((prev) =>
prev.filter((p) => p.id !== payload.old.id)
)
}
}
)
.subscribe()
async function fetchPosts() {
let query = supabase.from('posts').select('*')
if (userId) {
query = query.eq('author_id', userId)
}
const { data } = await query
setPosts(data || [])
}
return () => {
channel.unsubscribe()
}
}, [userId])
return posts
}
实时聊天组件
import { useEffect, useState, useRef } from 'react'
import { supabase } from '../lib/supabase'
export function ChatRoom({ roomId, userId }) {
const [messages, setMessages] = useState([])
const [input, setInput] = useState('')
const messagesEndRef = useRef(null)
useEffect(() => {
// 加载历史消息
fetchMessages()
// 订阅新消息
const channel = supabase
.channel(`room-${roomId}`)
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'messages',
filter: `room_id=eq.${roomId}`
},
(payload) => {
setMessages((prev) => [...prev, payload.new])
}
)
.subscribe()
return () => channel.unsubscribe()
}, [roomId])
useEffect(() => {
// 滚动到底部
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
}, [messages])
async function fetchMessages() {
const { data } = await supabase
.from('messages')
.select('*')
.eq('room_id', roomId)
.order('created_at', { ascending: true })
setMessages(data || [])
}
async function sendMessage() {
if (!input.trim()) return
await supabase.from('messages').insert({
room_id: roomId,
user_id: userId,
content: input.trim()
})
setInput('')
}
return (
<div className="chat-room">
<div className="messages">
{messages.map((msg) => (
<div key={msg.id} className={`message ${msg.user_id === userId ? 'own' : ''}`}>
<span className="user">{msg.user_id}</span>
<span className="content">{msg.content}</span>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
placeholder="输入消息..."
/>
<button onClick={sendMessage}>发送</button>
</div>
</div>
)
}
在线状态指示
import { useEffect, useState } from 'react'
import { supabase } from '../lib/supabase'
export function OnlineUsers({ roomId }) {
const [users, setUsers] = useState([])
useEffect(() => {
const channel = supabase.channel(`room-${roomId}-presence`)
channel
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState()
const users = Object.values(state).flat()
setUsers(users)
})
.on('presence', { event: 'join' }, ({ newPresences }) => {
setUsers((prev) => [...prev, ...newPresences])
})
.on('presence', { event: 'leave' }, ({ leftPresences }) => {
const leftIds = leftPresences.map((p) => p.user_id)
setUsers((prev) =>
prev.filter((u) => !leftIds.includes(u.user_id))
)
})
.subscribe(async () => {
// 加入房间
const user = (await supabase.auth.getUser()).data.user
await channel.track({
user_id: user.id,
name: user.user_metadata.name,
online_at: new Date().toISOString()
})
})
return () => channel.unsubscribe()
}, [roomId])
return (
<div className="online-users">
<span className="count">{users.length} 在线</span>
<div className="avatars">
{users.map((user) => (
<div key={user.user_id} className="avatar" title={user.name}>
{user.name?.charAt(0).toUpperCase()}
</div>
))}
</div>
</div>
)
}
实时协作编辑
构建协作编辑功能:
import { useEffect, useState, useRef } from 'react'
import { supabase } from '../lib/supabase'
export function CollaborativeEditor({ documentId, userId }) {
const [content, setContent] = useState('')
const [cursors, setCursors] = useState({})
const channelRef = useRef(null)
const textareaRef = useRef(null)
useEffect(() => {
// 加载文档
loadDocument()
// 创建频道
const channel = supabase.channel(`doc-${documentId}`)
channelRef.current = channel
// 监听内容变更
channel
.on('broadcast', { event: 'content-change' }, ({ payload }) => {
if (payload.userId !== userId) {
setContent(payload.content)
}
})
// 监听光标位置
.on('broadcast', { event: 'cursor-move' }, ({ payload }) => {
if (payload.userId !== userId) {
setCursors((prev) => ({
...prev,
[payload.userId]: payload.position
}))
}
})
.subscribe()
return () => channel.unsubscribe()
}, [documentId, userId])
async function loadDocument() {
const { data } = await supabase
.from('documents')
.select('content')
.eq('id', documentId)
.single()
if (data) setContent(data.content)
}
async function handleChange(e) {
const newContent = e.target.value
setContent(newContent)
// 广播变更
channelRef.current?.send({
type: 'broadcast',
event: 'content-change',
payload: { content: newContent, userId }
})
// 保存到数据库(防抖)
debouncedSave(newContent)
}
function handleCursorMove(e) {
const position = e.target.selectionStart
channelRef.current?.send({
type: 'broadcast',
event: 'cursor-move',
payload: { position, userId }
})
}
const debouncedSave = debounce(async (content) => {
await supabase
.from('documents')
.update({ content, updated_at: new Date() })
.eq('id', documentId)
}, 1000)
return (
<div className="editor">
<textarea
ref={textareaRef}
value={content}
onChange={handleChange}
onSelect={handleCursorMove}
onKeyUp={handleCursorMove}
/>
<div className="cursors">
{Object.entries(cursors).map(([uid, pos]) => (
<span
key={uid}
className="cursor-indicator"
style={{ left: `${pos % 50}ch` }}
>
{uid}
</span>
))}
</div>
</div>
)
}
function debounce(fn, delay) {
let timer
return (...args) => {
clearTimeout(timer)
timer = setTimeout(() => fn(...args), delay)
}
}
性能优化
批量更新
频繁的数据更新会产生大量事件,考虑批量处理:
let pendingUpdates = []
let updateTimer = null
channel.on('postgres_changes', { ... }, (payload) => {
pendingUpdates.push(payload)
if (!updateTimer) {
updateTimer = setTimeout(() => {
// 批量处理
processUpdates(pendingUpdates)
pendingUpdates = []
updateTimer = null
}, 100)
}
})
条件订阅
只在需要时订阅:
useEffect(() => {
if (!isActive) return
const channel = supabase.channel('...')
.on('postgres_changes', { ... }, handler)
.subscribe()
return () => channel.unsubscribe()
}, [isActive])
限制订阅范围
使用过滤条件减少不必要的事件:
// 不推荐:订阅所有变更
.on('postgres_changes', { table: 'messages' }, ...)
// 推荐:只订阅相关数据
.on('postgres_changes', {
table: 'messages',
filter: `room_id=eq.${roomId}`
}, ...)
错误处理
const channel = supabase
.channel('posts-changes')
.on('postgres_changes', { ... }, (payload) => {
if (payload.errors) {
console.error('订阅错误:', payload.errors)
return
}
// 处理正常数据
})
.subscribe((status) => {
if (status === 'SUBSCRIBED') {
console.log('订阅成功')
}
if (status === 'CLOSED') {
console.log('连接关闭')
}
if (status === 'CHANNEL_ERROR') {
console.error('频道错误')
}
})
下一步
掌握实时订阅后,你可以继续学习: