跳到主要内容

Keras 模型构建

Keras 是 TensorFlow 的高级 API,提供了简洁的方式来构建和训练神经网络。本章介绍三种构建模型的方式。

Sequential API

Sequential API 适用于简单的层堆叠模型,是最简单的模型构建方式。

基本用法

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 创建 Sequential 模型
model = keras.Sequential([
layers.Dense(64, activation='relu', input_shape=(784,)),
layers.Dense(32, activation='relu'),
layers.Dense(10, activation='softmax')
])

# 查看模型结构
model.summary()

输出:

Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense (Dense) (None, 64) 50240

dense_1 (Dense) (None, 32) 2080

dense_2 (Dense) (None, 10) 330

=================================================================
Total params: 52,650
Trainable params: 52,650
Non-trainable params: 0
_________________________________________________________________

逐步添加层

model = keras.Sequential()

# 逐层添加
model.add(layers.Dense(64, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dense(10, activation='softmax'))

model.summary()

常用层类型

# 全连接层
layers.Dense(64, activation='relu')

# Dropout 层(防止过拟合)
layers.Dropout(0.5)

# 批归一化层
layers.BatchNormalization()

# 展平层
layers.Flatten()

# 输入层(用于定义输入形状)
layers.Input(shape=(28, 28, 1))

# 重塑层
layers.Reshape((784,))

# 激活层
layers.Activation('relu')

Functional API

Functional API 适用于复杂的模型结构,如多输入、多输出、共享层等。

基本用法

from tensorflow.keras import Model

# 定义输入
inputs = keras.Input(shape=(784,))

# 定义层
x = layers.Dense(64, activation='relu')(inputs)
x = layers.Dense(32, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)

# 创建模型
model = Model(inputs=inputs, outputs=outputs)
model.summary()

多输入模型

# 两个输入分支
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')

# 分支 A
x_a = layers.Dense(64, activation='relu')(input_a)
x_a = layers.Dense(32, activation='relu')(x_a)

# 分支 B
x_b = layers.Dense(64, activation='relu')(input_b)
x_b = layers.Dense(32, activation='relu')(x_b)

# 合并
concatenated = layers.Concatenate()([x_a, x_b])
outputs = layers.Dense(10, activation='softmax')(concatenated)

model = Model(inputs=[input_a, input_b], outputs=outputs)
model.summary()

多输出模型

inputs = keras.Input(shape=(784,))

# 共享层
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)

# 输出 1:分类
output_a = layers.Dense(10, activation='softmax', name='classification')(x)

# 输出 2:回归
output_b = layers.Dense(1, activation='linear', name='regression')(x)

model = Model(inputs=inputs, outputs=[output_a, output_b])
model.summary()

# 编译时为不同输出指定不同的损失函数
model.compile(
optimizer='adam',
loss={
'classification': 'categorical_crossentropy',
'regression': 'mse'
},
loss_weights={
'classification': 1.0,
'regression': 0.5
}
)

共享层

# 共享的嵌入层
shared_embedding = layers.Embedding(1000, 64)

input_a = keras.Input(shape=(10,))
input_b = keras.Input(shape=(10,))

# 两个输入共享同一个嵌入层
embedded_a = shared_embedding(input_a)
embedded_b = shared_embedding(input_b)

# 后续处理
x_a = layers.LSTM(32)(embedded_a)
x_b = layers.LSTM(32)(embedded_b)

# 合并
merged = layers.Concatenate()([x_a, x_b])
outputs = layers.Dense(1, activation='sigmoid')(merged)

model = Model(inputs=[input_a, input_b], outputs=outputs)

残差连接

inputs = keras.Input(shape=(64,))

# 主路径
x = layers.Dense(64, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)

# 残差连接(跳跃连接)
residual = layers.Dense(64)(inputs) # 如果维度不匹配需要投影
outputs = layers.Add()([x, residual])

model = Model(inputs=inputs, outputs=outputs)

子类化 Model

通过继承 Model 类可以完全自定义模型的行为。

基本用法

class MyModel(Model):
def __init__(self, hidden_units):
super(MyModel, self).__init__()
self.dense1 = layers.Dense(hidden_units, activation='relu')
self.dense2 = layers.Dense(hidden_units, activation='relu')
self.output_layer = layers.Dense(10, activation='softmax')

def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
return self.output_layer(x)

model = MyModel(64)
model.build(input_shape=(None, 784))
model.summary()

自定义训练逻辑

class CustomModel(Model):
def __init__(self):
super(CustomModel, self).__init__()
self.dense1 = layers.Dense(64, activation='relu')
self.dense2 = layers.Dense(10, activation='softmax')

def call(self, inputs):
x = self.dense1(inputs)
return self.dense2(x)

def train_step(self, data):
x, y = data

with tf.GradientTape() as tape:
y_pred = self(x, training=True)
loss = self.compiled_loss(y, y_pred)

gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}

model = CustomModel()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

模型编译

创建模型后需要编译,指定优化器、损失函数和评估指标:

model.compile(
optimizer='adam', # 优化器
loss='sparse_categorical_crossentropy', # 损失函数
metrics=['accuracy'] # 评估指标
)

常用优化器

# SGD
keras.optimizers.SGD(learning_rate=0.01, momentum=0.9)

# Adam(推荐)
keras.optimizers.Adam(learning_rate=0.001)

# RMSprop
keras.optimizers.RMSprop(learning_rate=0.001)

# AdamW(带权重衰减)
keras.optimizers.AdamW(learning_rate=0.001, weight_decay=0.01)

# 自定义优化器参数
optimizer = keras.optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-07
)

常用损失函数

# 分类
'sparse_categorical_crossentropy' # 整数标签
'categorical_crossentropy' # one-hot 编码标签
'binary_crossentropy' # 二分类

# 回归
'mse' # 均方误差
'mae' # 平均绝对误差
'huber_loss' # Huber 损失

# 使用函数形式
loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

评估指标

# 内置指标
metrics = ['accuracy', 'mae', 'mse']

# 自定义指标
metrics = [
keras.metrics.Precision(),
keras.metrics.Recall(),
keras.metrics.AUC()
]

# 自定义指标类
class F1Score(keras.metrics.Metric):
def __init__(self, name='f1_score', **kwargs):
super().__init__(name=name, **kwargs)
self.precision = keras.metrics.Precision()
self.recall = keras.metrics.Recall()

def update_state(self, y_true, y_pred, sample_weight=None):
self.precision.update_state(y_true, y_pred, sample_weight)
self.recall.update_state(y_true, y_pred, sample_weight)

def result(self):
p = self.precision.result()
r = self.recall.result()
return 2 * p * r / (p + r + 1e-7)

def reset_state(self):
self.precision.reset_state()
self.recall.reset_state()

模型训练

使用 fit 方法

# 基本训练
history = model.fit(
x_train, y_train,
epochs=10,
batch_size=32,
validation_data=(x_val, y_val)
)

# 使用验证集分割
history = model.fit(
x_train, y_train,
epochs=10,
batch_size=32,
validation_split=0.2
)

# 使用回调函数
history = model.fit(
x_train, y_train,
epochs=100,
batch_size=32,
validation_data=(x_val, y_val),
callbacks=[
keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
keras.callbacks.ModelCheckpoint('best_model.keras', save_best_only=True),
keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)
]
)

训练历史

import matplotlib.pyplot as plt

history = model.fit(x_train, y_train, epochs=10, validation_split=0.2)

# 查看历史数据
print(history.history.keys()) # dict_keys(['loss', 'accuracy', 'val_loss', 'val_accuracy'])

# 绘制训练曲线
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='val')
plt.title('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='val')
plt.title('Accuracy')
plt.legend()

plt.show()

使用 tf.data.Dataset

# 创建数据集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(10000).batch(32).prefetch(tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(32)

# 训练
history = model.fit(
train_dataset,
epochs=10,
validation_data=val_dataset
)

模型评估和预测

# 评估模型
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率: {test_acc:.4f}")

# 预测
predictions = model.predict(x_test)
print(predictions.shape) # (num_samples, 10)

# 获取预测类别
predicted_classes = tf.argmax(predictions, axis=1)

小结

本章介绍了使用 Keras 构建模型的三种方式:

  1. Sequential API:适用于简单的层堆叠模型
  2. Functional API:适用于复杂模型结构,支持多输入多输出
  3. Model 子类化:完全自定义模型行为

选择哪种方式取决于模型复杂度和需求:

  • 简单模型使用 Sequential API
  • 需要多输入输出或共享层使用 Functional API
  • 需要完全自定义训练逻辑使用 Model 子类化

下一章我们将学习卷积神经网络,用于图像分类任务。