跳到主要内容

实时订阅

Supabase Realtime 提供了基于 WebSocket 的实时数据同步功能,让客户端可以即时接收数据库变更通知。本章节介绍如何使用 Realtime 构建实时应用。

实时订阅概述

Supabase Realtime 基于 PostgreSQL 的逻辑复制功能,监听数据库的写入前日志(WAL),将数据变更实时推送给订阅的客户端。

支持的事件类型

  • INSERT:新数据插入
  • UPDATE:数据更新
  • DELETE:数据删除
  • ALL:所有变更事件

适用场景

  • 实时聊天应用
  • 协作编辑工具
  • 实时仪表盘
  • 在线游戏
  • 实时通知系统

启用 Realtime

在 Dashboard 中启用

  1. 进入 Database → Replication
  2. 找到要启用实时订阅的表
  3. 勾选 "Realtime" 选项

使用 SQL 启用

-- 为表启用实时订阅
alter publication supabase_realtime add table posts;

-- 查看已启用的表
select * from pg_publication_tables where pubname = 'supabase_realtime';

-- 移除表的实时订阅
alter publication supabase_realtime drop table posts;

基本订阅

订阅表变更

import { createClient } from '@supabase/supabase-js'

const supabase = createClient(
'https://your-project.supabase.co',
'your-anon-key'
)

// 订阅 posts 表的所有变更
const channel = supabase
.channel('posts-changes')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('变更事件:', payload)
}
)
.subscribe()

// payload 结构
// {
// eventType: 'INSERT' | 'UPDATE' | 'DELETE',
// schema: 'public',
// table: 'posts',
// new: { ... }, // 新数据(INSERT/UPDATE)
// old: { ... }, // 旧数据(UPDATE/DELETE)
// errors: null
// }

订阅特定事件

// 只订阅插入事件
const channel = supabase
.channel('posts-inserts')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('新文章:', payload.new)
}
)
.subscribe()

// 只订阅更新事件
const channel = supabase
.channel('posts-updates')
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('更新:', payload.old, '->', payload.new)
}
)
.subscribe()

// 只订阅删除事件
const channel = supabase
.channel('posts-deletes')
.on(
'postgres_changes',
{
event: 'DELETE',
schema: 'public',
table: 'posts'
},
(payload) => {
console.log('删除:', payload.old)
}
)
.subscribe()

取消订阅

// 取消订阅
channel.unsubscribe()

// 或使用 supabase 移除所有频道
supabase.removeAllChannels()

过滤订阅

按列值过滤

只订阅特定条件的数据变更:

// 只订阅 author_id = 1 的文章变更
const channel = supabase
.channel('author-posts')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts',
filter: 'author_id=eq.1'
},
(payload) => {
console.log('作者1的文章变更:', payload)
}
)
.subscribe()

过滤操作符

// 等于
filter: 'status=eq.published'

// 不等于
filter: 'status=neq.draft'

// 大于
filter: 'views=gt.100'

// 大于等于
filter: 'views=gte.100'

// 小于
filter: 'views=lt.1000'

// 小于等于
filter: 'views=lte.1000'

// 包含在列表中
filter: 'status=in.(published,featured)'

// 模糊匹配
filter: 'title=ilike.*supabase*'

组合过滤

目前不支持多个过滤条件,可以通过创建数据库视图解决:

-- 创建视图
create view published_posts as
select * from posts where status = 'published';

-- 启用实时订阅
alter publication supabase_realtime add view published_posts;
// 订阅视图
const channel = supabase
.channel('published-posts')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'published_posts'
},
(payload) => console.log(payload)
)
.subscribe()

广播和状态同步

除了数据库变更,Realtime 还支持客户端之间的直接通信。

广播消息

// 发送广播消息
const channel = supabase
.channel('room-1')
.on('broadcast', { event: 'cursor-move' }, (payload) => {
console.log('收到消息:', payload)
})
.subscribe()

// 发送消息
channel.send({
type: 'broadcast',
event: 'cursor-move',
payload: { x: 100, y: 200, userId: 'user-1' }
})

状态同步

共享状态给所有订阅者:

const channel = supabase
.channel('room-1')
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState()
console.log('当前在线用户:', state)
})
.on('presence', { event: 'join' }, ({ newPresences }) => {
console.log('用户加入:', newPresences)
})
.on('presence', { event: 'leave' }, ({ leftPresences }) => {
console.log('用户离开:', leftPresences)
})
.subscribe()

// 加入房间
channel.track({
user: 'user-1',
online_at: new Date().toISOString()
})

// 离开房间
channel.untrack()

React 集成

自定义 Hook

import { useEffect, useState } from 'react'
import { supabase } from '../lib/supabase'

export function useRealtimePosts(userId) {
const [posts, setPosts] = useState([])

useEffect(() => {
// 初始加载
fetchPosts()

// 订阅变更
const channel = supabase
.channel('posts-realtime')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'posts',
filter: userId ? `author_id=eq.${userId}` : undefined
},
(payload) => {
if (payload.eventType === 'INSERT') {
setPosts((prev) => [...prev, payload.new])
} else if (payload.eventType === 'UPDATE') {
setPosts((prev) =>
prev.map((p) =>
p.id === payload.new.id ? payload.new : p
)
)
} else if (payload.eventType === 'DELETE') {
setPosts((prev) =>
prev.filter((p) => p.id !== payload.old.id)
)
}
}
)
.subscribe()

async function fetchPosts() {
let query = supabase.from('posts').select('*')
if (userId) {
query = query.eq('author_id', userId)
}
const { data } = await query
setPosts(data || [])
}

return () => {
channel.unsubscribe()
}
}, [userId])

return posts
}

实时聊天组件

import { useEffect, useState, useRef } from 'react'
import { supabase } from '../lib/supabase'

export function ChatRoom({ roomId, userId }) {
const [messages, setMessages] = useState([])
const [input, setInput] = useState('')
const messagesEndRef = useRef(null)

useEffect(() => {
// 加载历史消息
fetchMessages()

// 订阅新消息
const channel = supabase
.channel(`room-${roomId}`)
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'messages',
filter: `room_id=eq.${roomId}`
},
(payload) => {
setMessages((prev) => [...prev, payload.new])
}
)
.subscribe()

return () => channel.unsubscribe()
}, [roomId])

useEffect(() => {
// 滚动到底部
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
}, [messages])

async function fetchMessages() {
const { data } = await supabase
.from('messages')
.select('*')
.eq('room_id', roomId)
.order('created_at', { ascending: true })
setMessages(data || [])
}

async function sendMessage() {
if (!input.trim()) return

await supabase.from('messages').insert({
room_id: roomId,
user_id: userId,
content: input.trim()
})

setInput('')
}

return (
<div className="chat-room">
<div className="messages">
{messages.map((msg) => (
<div key={msg.id} className={`message ${msg.user_id === userId ? 'own' : ''}`}>
<span className="user">{msg.user_id}</span>
<span className="content">{msg.content}</span>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
placeholder="输入消息..."
/>
<button onClick={sendMessage}>发送</button>
</div>
</div>
)
}

在线状态指示

import { useEffect, useState } from 'react'
import { supabase } from '../lib/supabase'

export function OnlineUsers({ roomId }) {
const [users, setUsers] = useState([])

useEffect(() => {
const channel = supabase.channel(`room-${roomId}-presence`)

channel
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState()
const users = Object.values(state).flat()
setUsers(users)
})
.on('presence', { event: 'join' }, ({ newPresences }) => {
setUsers((prev) => [...prev, ...newPresences])
})
.on('presence', { event: 'leave' }, ({ leftPresences }) => {
const leftIds = leftPresences.map((p) => p.user_id)
setUsers((prev) =>
prev.filter((u) => !leftIds.includes(u.user_id))
)
})
.subscribe(async () => {
// 加入房间
const user = (await supabase.auth.getUser()).data.user
await channel.track({
user_id: user.id,
name: user.user_metadata.name,
online_at: new Date().toISOString()
})
})

return () => channel.unsubscribe()
}, [roomId])

return (
<div className="online-users">
<span className="count">{users.length} 在线</span>
<div className="avatars">
{users.map((user) => (
<div key={user.user_id} className="avatar" title={user.name}>
{user.name?.charAt(0).toUpperCase()}
</div>
))}
</div>
</div>
)
}

实时协作编辑

构建协作编辑功能:

import { useEffect, useState, useRef } from 'react'
import { supabase } from '../lib/supabase'

export function CollaborativeEditor({ documentId, userId }) {
const [content, setContent] = useState('')
const [cursors, setCursors] = useState({})
const channelRef = useRef(null)
const textareaRef = useRef(null)

useEffect(() => {
// 加载文档
loadDocument()

// 创建频道
const channel = supabase.channel(`doc-${documentId}`)
channelRef.current = channel

// 监听内容变更
channel
.on('broadcast', { event: 'content-change' }, ({ payload }) => {
if (payload.userId !== userId) {
setContent(payload.content)
}
})
// 监听光标位置
.on('broadcast', { event: 'cursor-move' }, ({ payload }) => {
if (payload.userId !== userId) {
setCursors((prev) => ({
...prev,
[payload.userId]: payload.position
}))
}
})
.subscribe()

return () => channel.unsubscribe()
}, [documentId, userId])

async function loadDocument() {
const { data } = await supabase
.from('documents')
.select('content')
.eq('id', documentId)
.single()

if (data) setContent(data.content)
}

async function handleChange(e) {
const newContent = e.target.value
setContent(newContent)

// 广播变更
channelRef.current?.send({
type: 'broadcast',
event: 'content-change',
payload: { content: newContent, userId }
})

// 保存到数据库(防抖)
debouncedSave(newContent)
}

function handleCursorMove(e) {
const position = e.target.selectionStart
channelRef.current?.send({
type: 'broadcast',
event: 'cursor-move',
payload: { position, userId }
})
}

const debouncedSave = debounce(async (content) => {
await supabase
.from('documents')
.update({ content, updated_at: new Date() })
.eq('id', documentId)
}, 1000)

return (
<div className="editor">
<textarea
ref={textareaRef}
value={content}
onChange={handleChange}
onSelect={handleCursorMove}
onKeyUp={handleCursorMove}
/>
<div className="cursors">
{Object.entries(cursors).map(([uid, pos]) => (
<span
key={uid}
className="cursor-indicator"
style={{ left: `${pos % 50}ch` }}
>
{uid}
</span>
))}
</div>
</div>
)
}

function debounce(fn, delay) {
let timer
return (...args) => {
clearTimeout(timer)
timer = setTimeout(() => fn(...args), delay)
}
}

性能优化

批量更新

频繁的数据更新会产生大量事件,考虑批量处理:

let pendingUpdates = []
let updateTimer = null

channel.on('postgres_changes', { ... }, (payload) => {
pendingUpdates.push(payload)

if (!updateTimer) {
updateTimer = setTimeout(() => {
// 批量处理
processUpdates(pendingUpdates)
pendingUpdates = []
updateTimer = null
}, 100)
}
})

条件订阅

只在需要时订阅:

useEffect(() => {
if (!isActive) return

const channel = supabase.channel('...')
.on('postgres_changes', { ... }, handler)
.subscribe()

return () => channel.unsubscribe()
}, [isActive])

限制订阅范围

使用过滤条件减少不必要的事件:

// 不推荐:订阅所有变更
.on('postgres_changes', { table: 'messages' }, ...)

// 推荐:只订阅相关数据
.on('postgres_changes', {
table: 'messages',
filter: `room_id=eq.${roomId}`
}, ...)

错误处理

const channel = supabase
.channel('posts-changes')
.on('postgres_changes', { ... }, (payload) => {
if (payload.errors) {
console.error('订阅错误:', payload.errors)
return
}
// 处理正常数据
})
.subscribe((status) => {
if (status === 'SUBSCRIBED') {
console.log('订阅成功')
}
if (status === 'CLOSED') {
console.log('连接关闭')
}
if (status === 'CHANNEL_ERROR') {
console.error('频道错误')
}
})

下一步

掌握实时订阅后,你可以继续学习: