第11章 数据增强
11-1 导学
什么是目标检测?
- 目标检测:用于定位图像中的目标,使用边框标记位置和大小
- 目标识别:用于识别目标的类别,如行人,车辆等
目标检测的挑战:光照变化、物体遮挡、背景复杂、尺度变化......
传统的方法无法有效解决上述挑战 深度学习才是终极大法
本章内容
- 迁移学习
- 数据增强
- 单目标物体的检测与识别
- R-CNN系列
- SSD与YOLO
- 锚框
- 多目标物体的检测与识别
- 其它算法...
11-2 迁移学习的工作原理
什么迁移学习?
对一个已经训练好的模型进行微调以适应新的任务
为什么目标检测需要使用迁移学习呢?
- 数据需求量大:几万张高质量的标注数据很难训练好
- 资源消耗大:需要很好的GPU、花很长时间才能训练好
- 底层特征的普遍性:迁移学习可以利用预训练模型学到底层特征
- 避免了重复训练和学习,加快的模型的收敛速度
什么迁移学习可以起作用
迁移学习可以在大规模数据集预训练的模型上学到通用特征,从而在目标检测任务上取得更好的性能,减少训练的时间和成本
工作原理

冻结参数

11-3 Tensorflow实现迁移学习
代码
# 导入必要的库
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import numpy as np
# 定义加载模型并替换全连接层的函数
def load_model(model_name, num_classes):
"""
加载预训练模型(VGG16或ResNet50),冻结卷积层,并替换顶层的全连接层。
参数:
model_name (str): 要使用的模型名称,可选 'VGG16' 或 'ResNet50'
num_classes (int): 分类任务的目标类别数
返回:
model (tf.keras.Model): 修改后的Keras模型
"""
if model_name == 'VGG16':
# 使用ImageNet预训练权重初始化VGG16模型
# include_top=False 表示不包含原始的全连接层
# input_shape=(224, 224, 3) 设置输入图像尺寸为224x224的RGB图像
# pooling='avg' 表示使用全局平均池化替代最后的空间维度
base_model = tf.keras.applications.VGG16(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3),
pooling='avg'
)
elif model_name == 'ResNet50':
# 使用ImageNet预训练权重初始化ResNet50模型
base_model = tf.keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3),
pooling='avg'
)
# 冻结base_model中的所有层,即在训练时不更新这些参数
base_model.trainable = False
# 构建新的模型:将base_model作为基础,在其顶部添加新的全连接层
model = models.Sequential([
base_model, # 基础模型部分(卷积层)
layers.Dense(num_classes, activation='softmax') # 新的输出层,用于分类
])
# 打印模型结构摘要
model.summary()
return model
# 超参数设置
num_classes = 3 # 分类数量,例如三分类问题
model_name = 'VGG16' # 使用的模型名称,也可以是'ResNet50'
learning_rate = 0.001 # 学习率
batch_size = 32 # 每个批次的数据量
image_size = (224, 224) # 图像输入尺寸,适配大多数预训练模型
# 加载并构建模型
model = load_model(model_name, num_classes)
# 接下来可以继续编译和训练模型,例如:
# model.compile(optimizer=optimizers.Adam(learning_rate=learning_rate),
# loss='categorical_crossentropy',
# metrics=['accuracy'])
# 然后准备数据集并调用 model.fit(...) 进行训练
Model: "sequential"
┌─────────────────────────────────┬────────────────────────┬───────────────┐
│ Layer (type) │ Output Shape │ Param # │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ vgg16 (Functional) │ (None, 512) │ 14,714,688 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ dense (Dense) │ (None, 3) │ 1,539 │
└─────────────────────────────────┴────────────────────────┴───────────────┘
Total params: 14,716,227 (56.14 MB)
Trainable params: 1,539 (6.01 KB)
Non-trainable params: 14,714,688 (56.13 MB)
11-4 Pytorch实现迁移学习
「实战」Pytorch实现迁移学习
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.models import VGG16_Weights, ResNet18_Weights
def load_model(model_name, num_classes):
"""
加载指定的预训练模型,并修改最后的分类层以适应新的分类任务。
参数:
model_name (str): 模型名称 ('vgg16' 或 'resnet18')
num_classes (int): 新的分类任务的类别数量
返回:
model (torch.nn.Module): 修改后的模型
"""
if model_name == 'vgg16':
# 加载预训练的 VGG16 模型
model = models.vgg16(weights=VGG16_Weights.DEFAULT)
# 冻结特征提取层(不计算梯度)
for param in model.features.parameters():
param.requires_grad = False
# 通过一次前向传播获取特征输出维度
with torch.no_grad():
dummy_input = torch.randn(1, 3, 224, 224)
features = model.features(dummy_input)
features_dim = features.view(features.size(0), -1).size(1) # 展平后的维度
# 替换最后的全连接层为自定义的分类器
model.classifier = nn.Sequential(
nn.Linear(features_dim, 256),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(256, num_classes)
)
elif model_name == 'resnet18':
# 加载预训练的 ResNet18 模型
model = models.resnet18(weights=ResNet18_Weights.DEFAULT)
# 冻结卷积层参数(只训练最后的全连接层)
for name, param in model.named_parameters():
if not isinstance(model._modules.get(name), nn.Linear):
param.requires_grad = False
# 获取最后一个全连接层的输入维度
in_features = model.fc.in_features
# 替换最后的全连接层为自定义的分类器
model.fc = nn.Sequential(
nn.Linear(in_features, 256),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(256, num_classes)
)
else:
raise ValueError(f'Unknown model name: {model_name}')
# 打印模型结构
print(model)
return model
# 超参数
num_classes = 3
model_name = 'vgg16'
model = load_model(model_name, num_classes)
11-5 Tensorflow数据增强-图片的导入与显示
数据增强
- 通过对现有训练数据进行各种变换,从而生成新的、与原始数据不同但相似的训练样本
为什么要使用图像增强?
- 解决数据量不足的问题:现实中,我们很难获取高质量数据
- 提高模型的鲁棒性:数据增强可以模拟真实世界的干扰
- 增加模型的泛化性
几何变换、颜色变换、添加噪声

如何进行数据增强
- 几何变换:翻转、旋转、缩放、平移
- 颜色变换:亮度、对比度、饱和度、色调
- 噪音:高斯噪音、椒盐噪音...
「实战」Tensorflow进行数据增强
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
def load_image(path):
"""
加载图像文件并将其转换为TensorFlow张量
参数:
path: 字符串,图像文件路径
返回:
归一化到[0,1]范围的float32类型图像张量
"""
# 读取图像文件
image = tf.io.read_file(path)
# 解码JPEG图像(3通道RGB)
decode_image = tf.image.decode_jpeg(image, channels=3)
# 将图像转换为float32类型并归一化到[0,1]
img = tf.image.convert_image_dtype(decode_image, tf.float32)
return img
def display_images(image_list=None, title=None):
"""
显示图像网格
参数:
image_list: 图像列表,支持多种嵌套格式
title: 图像显示的标题
"""
if image_list is None:
image_list = []
if title is None:
title = ""
# 检查是否是嵌套列表,如果不是则转换为嵌套列表
if not any(isinstance(i, list) for i in image_list):
image_list = [image_list] # 例如 [img, img, img] => [[img, img, img]]
# 计算行列数
rows = len(image_list)
cols = max(len(row) if isinstance(row, list) else 1 for row in image_list)
# 创建子图
plt.suptitle(title)
fig, ax = plt.subplots(rows, cols)
# 确保ax是2D数组(统一处理单行/单列情况)
ax = np.atleast_2d(ax)
# 遍历每个图像并显示
for i, row in enumerate(image_list):
if not isinstance(row, list):
row = [row]
for j, img in enumerate(row):
ax[i, j].imshow(img)
ax[i, j].axis("off") # 关闭坐标轴
# 填充空白子图
for j in range(len(row), cols):
ax[i, j].axis("off")
plt.tight_layout() # 自动调整子图间距
plt.show()
def resize_center_crop(img, target_size=224):
"""
保持宽高比调整大小后中心裁剪
参数:
img: 输入图像张量
target_size: 目标尺寸(正方形)
返回:
中心裁剪后的图像
"""
height, width, ch = img.shape
aspect_ratio = width / height
# 计算保持宽高比的临时尺寸
if aspect_ratio > 1: # 宽>高
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
else: # 高>=宽
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
# 调整图像大小
scale_img = tf.image.resize(img, [target_height, target_width])
# 计算裁剪偏移量
offset_height = (target_height - target_size) // 2
offset_width = (target_width - target_size) // 2
# 中心裁剪
crop_img = tf.image.crop_to_bounding_box(
scale_img,
offset_height,
offset_width,
target_size,
target_size
)
return crop_img
def resize_pad_crop(img, target_size=224):
"""
保持宽高比调整大小后填充为正方形
参数:
img: 输入图像张量
target_size: 目标尺寸(正方形)
返回:
填充后的图像
"""
height, width, ch = img.shape
aspect_ratio = width / height
# 计算保持宽高比的临时尺寸
if aspect_ratio > 1: # 宽>高
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
else: # 高>=宽
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
# 调整图像大小
scale_img = tf.image.resize(img, [target_height, target_width])
# 计算需要填充的像素数
pad_height = target_size - target_height
pad_width = target_size - target_width
# 计算上下左右的填充量
pad_top = pad_height // 2
pad_bottom = pad_height - pad_top
pad_left = pad_width // 2
pad_right = pad_width - pad_left
print(f"填充量: 上{pad_top}, 下{pad_bottom}, 左{pad_left}, 右{pad_right}")
# 定义填充模式(只在高度和宽度维度填充)
padding = [[pad_top, pad_bottom], [pad_left, pad_right], [0, 0]]
# 执行填充(用0填充)
pad_img = tf.pad(scale_img, padding, "CONSTANT", constant_values=0)
return pad_img
def gaussian_noise(img, mean=0, stddev=0.1):
"""
添加高斯噪声
参数:
img: 输入图像
mean: 噪声均值
stddev: 噪声标准差
返回:
添加噪声后的图像
"""
# 生成高斯噪声
noise = tf.random.normal(
shape=tf.shape(img),
mean=mean,
stddev=stddev,
dtype=tf.float32
)
# 添加噪声并裁剪到[0,1]范围
noise_img = tf.clip_by_value(img + noise, 0, 1)
return noise_img
def salt_and_pepper_noise(img, prob=0.1):
"""
添加椒盐噪声
参数:
img: 输入图像
prob: 噪声像素比例
返回:
添加噪声后的图像
"""
height, width, chs = img.shape
# 计算噪声像素数量
num_noise_pixels = int(height * width * prob)
# 随机生成噪声像素坐标
y_coords = np.random.randint(0, height, num_noise_pixels)
x_coords = np.random.randint(0, width, num_noise_pixels)
# 随机决定是盐噪声(1)还是胡椒噪声(0)
salt_or_pepper = np.random.choice([0, 1], size=num_noise_pixels * chs)
salt_or_pepper = salt_or_pepper.reshape(num_noise_pixels, chs)
# 创建图像副本并添加噪声
noise_img = img.numpy().copy()
for i in range(num_noise_pixels):
noise_img[y_coords[i], x_coords[i]] = salt_or_pepper[i]
return noise_img
def random_mask(img, mask_size=50, num_blocks=1):
"""
随机添加黑色遮挡块
参数:
img: 输入图像
mask_size: 遮挡块大小
num_blocks: 遮挡块数量
返回:
添加遮挡后的图像
"""
height, width, chs = img.shape
img_np = img.numpy()
# 添加指定数量的遮挡块
for _ in range(num_blocks):
# 随机生成遮挡块左上角坐标
y = np.random.randint(0, height - mask_size)
x = np.random.randint(0, width - mask_size)
# 将区域设置为黑色(0)
img_np[y:y + mask_size, x:x + mask_size] = 0
return img_np
img = load_image("./kpbl.PNG")
#水平翻转
img_flipped = tf.image.flip_left_right(img)
#上下翻转
img_updown = tf.image.flip_up_down(img)
#旋转
img_rotated = tf.image.rot90(img, k=4)
#缩放
img_resized = tf.image.resize(img, size=[300, 300])
#按中心裁剪
img_cropped = tf.image.central_crop(img, central_fraction=0.5)
#按尺寸进行裁剪
img_cropped = tf.image.crop_to_bounding_box(img, offset_height=100, offset_width=100, target_height=200, target_width=200)
#随机裁剪,使用随机种子
img_cropped = tf.image.random_crop(img, size=[400, 400, 3], seed=1)
# 颜色相关的增强
# 增加亮度
img_bright = tf.image.adjust_brightness(img, delta=0.3) #delta > 0,它会增加亮度,delta < 0,它会减少亮度
# 增加对比度
img_contrast1 = tf.image.adjust_contrast(img, contrast_factor=2)# contrast_factor > 1,增加对比度,contrast_factor < 1,减少对比度
img_contrast = tf.clip_by_value(img_contrast1, 0, 1)
# 色调
img_hue = tf.image.adjust_hue(img, delta=0.1) #delta > 0,增加色调,delta < 0,减少色调
# 饱和度
img_saturation = tf.image.adjust_saturation(img, saturation_factor=2) #saturation_factor > 1,增加饱和度,saturation_factor < 1,减少饱和度
# rc =resize_center_crop(img)
# rp = resize_pad_crop(img)
rp = tf.image.resize_with_pad(img, 300, 300)
# g_noise = gaussian_noise(img, 0, 0.1)
# noise = salt_and_pepper_noise(img)
# mask_img = random_mask(img, mask_size=200, num_blocks=3)
display_images([img,rp])
display_images([img_flipped])
display_images([img_updown])
display_images([img_rotated])
display_images([img_resized])
display_images([img_cropped])
display_images([img_bright])
display_images([img_contrast])
display_images([img_hue])
display_images([img_saturation])
# display_images([[img], [img, img],[img, img, img]], "Original Image")
# display_images([[img],[img, img]], "Original Image")
11-6 Tensorflow数据增强-基本变换操作
#水平翻转
img_flipped = tf.image.flip_left_right(img)
#上下翻转
img_updown = tf.image.flip_up_down(img)
#旋转
img_rotated = tf.image.rot90(img, k=4)
#缩放
img_resized = tf.image.resize(img, size=[300, 300])
#按中心裁剪
img_cropped = tf.image.central_crop(img, central_fraction=0.5)
#按尺寸进行裁剪
img_cropped = tf.image.crop_to_bounding_box(img, offset_height=100, offset_width=100, target_height=200, target_width=200)
#随机裁剪,使用随机种子
img_cropped = tf.image.random_crop(img, size=[400, 400, 3], seed=1)
# 颜色相关的增强
# 增加亮度
img_bright = tf.image.adjust_brightness(img, delta=0.3) #delta > 0,它会增加亮度,delta < 0,它会减少亮度
# 增加对比度
img_contrast1 = tf.image.adjust_contrast(img, contrast_factor=2)# contrast_factor > 1,增加对比度,contrast_factor < 1,减少对比度
img_contrast = tf.clip_by_value(img_contrast1, 0, 1)
# 色调
img_hue = tf.image.adjust_hue(img, delta=0.1) #delta > 0,增加色调,delta < 0,减少色调
# 饱和度
img_saturation = tf.image.adjust_saturation(img, saturation_factor=2) #saturation_factor > 1,增加饱和度,saturation_factor < 1,减少饱和度
# rc =resize_center_crop(img)
# rp = resize_pad_crop(img)
11-7 Tensorflow数据增强-高级裁剪
「实战」Tensorflow高级裁剪
def resize_center_crop(img, target_size=224):
"""
保持宽高比调整大小后中心裁剪
参数:
img: 输入图像张量
target_size: 目标尺寸(正方形)
返回:
中心裁剪后的图像
"""
height, width, ch = img.shape
aspect_ratio = width / height
# 计算保持宽高比的临时尺寸
if aspect_ratio > 1: # 宽>高
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
else: # 高>=宽
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
# 调整图像大小
scale_img = tf.image.resize(img, [target_height, target_width])
# 计算裁剪偏移量
offset_height = (target_height - target_size) // 2
offset_width = (target_width - target_size) // 2
# 中心裁剪
crop_img = tf.image.crop_to_bounding_box(
scale_img,
offset_height,
offset_width,
target_size,
target_size
)
return crop_img
def resize_pad_crop(img, target_size=224):
"""
保持宽高比调整大小后填充为正方形
参数:
img: 输入图像张量
target_size: 目标尺寸(正方形)
返回:
填充后的图像
"""
height, width, ch = img.shape
aspect_ratio = width / height
# 计算保持宽高比的临时尺寸
if aspect_ratio > 1: # 宽>高
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
else: # 高>=宽
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
# 调整图像大小
scale_img = tf.image.resize(img, [target_height, target_width])
# 计算需要填充的像素数
pad_height = target_size - target_height
pad_width = target_size - target_width
# 计算上下左右的填充量
pad_top = pad_height // 2
pad_bottom = pad_height - pad_top
pad_left = pad_width // 2
pad_right = pad_width - pad_left
print(f"填充量: 上{pad_top}, 下{pad_bottom}, 左{pad_left}, 右{pad_right}")
# 定义填充模式(只在高度和宽度维度填充)
padding = [[pad_top, pad_bottom], [pad_left, pad_right], [0, 0]]
# 执行填充(用0填充)
pad_img = tf.pad(scale_img, padding, "CONSTANT", constant_values=0)
return pad_img
11-8 tensorflow数据增强-噪音增强
高斯噪音/椒盐噪音/遮挡块

def gaussian_noise(img, mean=0, stddev=0.1):
"""
添加高斯噪声
参数:
img: 输入图像
mean: 噪声均值
stddev: 噪声标准差
返回:
添加噪声后的图像
"""
# 生成高斯噪声
noise = tf.random.normal(
shape=tf.shape(img),
mean=mean,
stddev=stddev,
dtype=tf.float32
)
# 添加噪声并裁剪到[0,1]范围
noise_img = tf.clip_by_value(img + noise, 0, 1)
return noise_img
def salt_and_pepper_noise(img, prob=0.1):
"""
添加椒盐噪声
参数:
img: 输入图像
prob: 噪声像素比例
返回:
添加噪声后的图像
"""
height, width, chs = img.shape
# 计算噪声像素数量
num_noise_pixels = int(height * width * prob)
# 随机生成噪声像素坐标
y_coords = np.random.randint(0, height, num_noise_pixels)
x_coords = np.random.randint(0, width, num_noise_pixels)
# 随机决定是盐噪声(1)还是胡椒噪声(0)
salt_or_pepper = np.random.choice([0, 1], size=num_noise_pixels * chs)
salt_or_pepper = salt_or_pepper.reshape(num_noise_pixels, chs)
# 创建图像副本并添加噪声
noise_img = img.numpy().copy()
for i in range(num_noise_pixels):
noise_img[y_coords[i], x_coords[i]] = salt_or_pepper[i]
return noise_img
def random_mask(img, mask_size=50, num_blocks=1):
"""
随机添加黑色遮挡块
参数:
img: 输入图像
mask_size: 遮挡块大小
num_blocks: 遮挡块数量
返回:
添加遮挡后的图像
"""
height, width, chs = img.shape
img_np = img.numpy()
# 添加指定数量的遮挡块
for _ in range(num_blocks):
# 随机生成遮挡块左上角坐标
y = np.random.randint(0, height - mask_size)
x = np.random.randint(0, width - mask_size)
# 将区域设置为黑色(0)
img_np[y:y + mask_size, x:x + mask_size] = 0
return img_np
11-9 Pytorch数据增强-基本变换
「实战」Pytorch数据增强的基本操作
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.io import read_image
import numpy as np
import matplotlib.pyplot as plt
def load_image(image_path):
image = read_image(image_path) / 255.0
return image
# image_list, [img1, img2, img3, ...]
# [[img1, img2], [img3, img4]]
# [[img1], [img2, img3]]
# [img1, [img2, img2]]
# [img1]
def display_images(image_list=None, title=None):
if image_list is None:
image_list = []
if title is None:
title = ""
# (False, True)
if not any(isinstance(i, list) for i in image_list):
image_list = [image_list] # [img, img, img] => [[img, img, img]]
rows = len(image_list)
cols = max(len(row) if isinstance(row, list) else 1 for row in image_list)
plt.suptitle(title)
fig, ax = plt.subplots(rows, cols)
# 确保ax是2D数组
# ax => [[ax]], [ax]=>[[ax]]
ax = np.atleast_2d(ax)
for i, row in enumerate(image_list):
if not isinstance(row, list):
row = [row]
for j, img in enumerate(row):
ax[i, j].imshow(img)
ax[i, j].axis("off")
for j in range(len(row), cols):
ax[i, j].axis("off")
# plt.tight_layout()
plt.show()
image = load_image("./kpbl.PNG").permute(1, 2, 0)
print(image)
display_images(image, "Original Image")
# 基本变换
# tans_h_flip = transforms.RandomHorizontalFlip(p=1)
# img_h_flip = tans_h_flip(img).permute(1, 2, 0)
# tans_v_flip = transforms.RandomVerticalFlip(p=1)
# img_h_flip = tans_v_flip(img).permute(1, 2, 0)
# 缩放
# resize_img = transforms.Resize(size=(300, 300))
# img = resize_img(img).permute(1, 2, 0)
# 旋转
# rotate_img = transforms.RandomRotation(degrees=90, expand=True)
# img = rotate_img(img).permute(1, 2, 0)
# print(img_h_flip.shape)
# old_img = img.permute(1, 2, 0)
11-10 Pytorch数据增强-基本裁剪
# 裁剪
# crop_img = transforms.CenterCrop(size=(300, 300))
# c_img = crop_img(img).permute(1, 2, 0)
# 随机裁剪
# random_crop = transforms.RandomCrop(size=(300, 300))
# c_img = random_crop(img).permute(1, 2, 0)
# 随机尺寸裁剪
#scale:其含义是:裁剪框的面积相对于原图像面积的比例范围
#ratio:其含义是:裁剪框的宽高比范围
# random_resized_crop = transforms.RandomResizedCrop(size=(300, 300), scale=(0.5, 1.0), ratio=(0.75, 1.333))
# c_img = random_resized_crop(img).permute(1, 2, 0)
11-11 Pytorch数据增强-高级裁剪
def resize_center_crop(img, target_size=224):
chs, height, width = img.shape
aspect_ratio = width / height
if aspect_ratio > 1:
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
else:
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
resize_transform = transforms.Resize((target_height, target_width))
resized_image = resize_transform(img)
center_crop_transform = transforms.CenterCrop(target_size)
cropped_image = center_crop_transform(resized_image)
return cropped_image
def resize_pad_crop(img, target_size=224):
chs, height, width = img.shape
aspect_ratio = width / height
if aspect_ratio > 1:
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
else:
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
resize_transform = transforms.Resize((target_height, target_width))
resized_image = resize_transform(img)
# Center crop
center_crop_transform = transforms.CenterCrop(target_size)
cropped_image = center_crop_transform(resized_image)
# RGB, RGBA,如果A全为0,表示透明,如果A全为255,表示不透明【0-255】
#【0-1.0】,0表示示透明,1表示不透明
if chs == 4:
cropped_image[3,:,:]=1.0
return cropped_image
# fivecrop
# five_crop = transforms.FiveCrop(size=(300, 300))
# ten crop
# ten_crop = transforms.TenCrop(size=(300, 300))
# c_img = ten_crop(img)
# img_list = []
# for i, img in enumerate(c_img):
# img_list.append(img.permute(1, 2, 0))
# crop_img = resize_pad_crop(img).permute(1,2,0)
# gau_img = gaussian_noise(img).permute(1,2,0)
# sp_img = salt_and_pepper_noise(img)
11-12 Pytorch数据增强-噪音增强
def gaussian_noise(img, mean=0, stddev=0.1):
noise = torch.randn(img.size()) * stddev + mean
noisy_image = img + noise
torch.clamp(noisy_image, 0, 1)
return noisy_image
def salt_and_pepper_noise(img, prob=0.1):
chs, height, width = img.shape
num_noise_pixels = int(height * width * prob)
y_coords = np.random.randint(0, height, num_noise_pixels) #参数分别:为最小值,最大值,采样个数
x_coords = np.random.randint(0, width, num_noise_pixels)
salt_or_pepper = np.random.choice([0, 1], size=num_noise_pixels*chs)
salt_or_pepper = salt_or_pepper.reshape(num_noise_pixels, chs)
noise_img = img.permute(1,2,0).numpy().copy()
for i in range(num_noise_pixels):
noise_img[y_coords[i], x_coords[i]] = salt_or_pepper[i]
return noise_img
def random_mask(img, mask_size=50, num_blocks=1):
chs, height, width = img.shape
img_np = img.permute(1,2,0).numpy()
for _ in range(num_blocks):
y = np.random.randint(0, height - mask_size)
x = np.random.randint(0, width - mask_size)
img_np[y:y+mask_size, x:x+mask_size] = 0
if chs == 4:
img_np[:,:,3]=1.0
return img_np
11-13 目标检测数据集
数据集
- ImageNet:李飞飞团队发起,主要用于图像分类
- Pascal VoC:Markus Everingham团队发起,用于目标识别
- COCO:微软发起,也用于目标识别,比voc大得多
VOC
- Pascal VOC 是一系列数据集
- VoC2007包括了20个类别
- voC2012包话了更多的类别
VOC数据集目录结构

COCO
- COCO是一系列数据集
- CoCo2014
- CoCO2017

11-14 下载VOC数据集
下载数据集
- VoC数据集可以通过pytorch下载
- 也可通过tensorflow_datasets下载,但下载的是非标准格式
- VOC官网已经不能访问,除非你知道具体的下载地址
- COCO只能手工下载 cocodataset.org
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
from torchvision.datasets import VOCDetection
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
transform = transforms.Compose([
transforms.ToTensor(),
])
#下载VOC 2007 数据集
train_data = VOCDetection(root='./datae',
year='2007',
image_set='train',
download=True,
transform=transform)
def display_images(image_list=None, title=None):
if image_list is None:
image_list = []
if title is None:
title = ""
# (False, True)
if not any(isinstance(i, list) for i in image_list):
image_list = [image_list] # [img, img, img] => [[img, img, img]]
rows = len(image_list)
cols = max(len(row) if isinstance(row, list) else 1 for row in image_list)
plt.suptitle(title)
fig, ax = plt.subplots(rows, cols)
# 确保ax是2D数组
# ax => [[ax]], [ax]=>[[ax]]
ax = np.atleast_2d(ax)
for i, row in enumerate(image_list):
if not isinstance(row, list):
row = [row]
for j, img in enumerate(row):
ax[i, j].imshow(img)
ax[i, j].axis("off")
for j in range(len(row), cols):
ax[i, j].axis("off")
# plt.tight_layout()
plt.show()
images = []
for i in range(5):
image, ann = train_data[i]
images.append(image.permute(1,2,0))
# print(image.shape, ann)
display_images(images)
11-15 tensorflow+albumentations实现数据增强(一)
albumentations
tf与pt图形变换的问题
- 对于目标检测图像进行变换时无法同步更新其标注
「实战」使用albumentations进行数据增强
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
import tensorflow as tf
from torchvision.datasets import VOCDetection
import torchvision.transforms as transforms
import albumentations as A
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# 数据集根目录以及子目录定义
VOC_ROOT_DIR = './data/VOCdevkit/VOC2007/'
ANNOTATIONS_DIR = 'Annotations'
IMAGE_DIR = 'JPEGImages'
# 构建注释和图像目录路径
anno_dir = os.path.join(VOC_ROOT_DIR, ANNOTATIONS_DIR) # 修正了变量名和拼写错误
image_dir = os.path.join(VOC_ROOT_DIR, IMAGE_DIR) # 修正了变量名和拼写错误
# 打印路径以检查正确性
print(anno_dir)
print(image_dir)
def load_image(path):
image = tf.io.read_file(path)
decode_image = tf.image.decode_jpeg(image, channels=3)
img = tf.image.convert_image_dtype(decode_image, tf.float32)
return img
# %%
# image_list, [img1, img2, img3, ...]
# [[img1, img2], [img3, img4]]
# [[img1], [img2, img3]]
# [img1, [img2, img2]]
# [img1]
def display_images(image_list=None, title=None):
if image_list is None:
image_list = []
if title is None:
title = ""
# (False, True)
if not any(isinstance(i, list) for i in image_list):
image_list = [image_list] # [img, img, img] => [[img, img, img]]
rows = len(image_list)
cols = max(len(row) if isinstance(row, list) else 1 for row in image_list)
plt.suptitle(title)
fig, ax = plt.subplots(rows, cols)
# 确保ax是2D数组
# ax => [[ax]], [ax]=>[[ax]]
ax = np.atleast_2d(ax)
for i, row in enumerate(image_list):
if not isinstance(row, list):
row = [row]
for j, img in enumerate(row):
ax[i, j].imshow(img)
ax[i, j].axis("off")
for j in range(len(row), cols):
ax[i, j].axis("off")
plt.tight_layout()
plt.show()
# %%
def resize_center_crop(img, target_size=224):
height, width, ch = img.shape
aspect_ratio = width / height
if aspect_ratio > 1:
target_height = target_size
target_width = int(round(target_size * aspect_ratio))
else:
target_width = target_size
target_height = int(round(target_size / aspect_ratio))
scale_img = tf.image.resize(img, [target_height, target_width])
offset_height = (target_height - target_size) // 2
offset_width = (target_width - target_size) // 2
crop_img = tf.image.crop_to_bounding_box(scale_img,
offset_height,
offset_width,
target_size,
target_size)
return crop_img
img_path = os.path. join(image_dir,'000005.jpg')
print(img_path)
load_image(image_dir)
display_images([image_dir])
11-16 tensorflow+albumentations实现数据增强(二)
第12章 目标检测算法与原理
12-1 目标检测的基本原理
目标检测的基本步骤
先将目标物体框出来
逻辑回归(硬train一发)
使用滑动窗口
基于卷积的滑动窗口
通过分类器判断框中的物体是什么
- 对于单目标检测
- 多目标检测,要对所有框出的物体进行分类
「实战」最最简单的单目标检测
什么是单目标检测?
- 图片中只有一个目标物体,其它都认为是背景
如何进行最简单的单目标检测?
硬train一发即可!
卷积+逻辑回归算法+训练数据集,训练一个可以找到目标物体bbox的模型
卷积用于获得图片中的各种特征 将各种特征送给逻辑回归找具体的特征点
图片归一化
首先我们要对图片进行归一化处理

训练数据集
- voc,coco 数据集比较大,不利于课程的讲解
- banana数据集(李沐)
banana数据集的结构

12-2 加载数据
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from pathlib import Path
from utils import display_images
import albumentations as A
from sklearn.model_selection import train_test_split
BASE_PATH=Path("./banana-detection/banana-detection/bananas_train/")
IMAGES_PATH= BASE_PATH/"images"
LABEL_PATH = BASE_PATH/"label.csv"
print(IMAGES_PATH, LABEL_PATH)
#df(DataFrame)
df = pd.read_csv(LABEL_PATH)
filenames = df['img_name'].tolist()
print(filenames)
#通过该函数读取的图片都是归一化处理后的图片
def load_image(path):
image = tf.io.read_file(path)
decode_image = tf.image.decode_jpeg(image, channels=3)
img = tf.image.convert_image_dtype(decode_image, tf.float32)
return img
images = []
bbox_list = []
for i in range(5):
if i == 4:
image_path = IMAGES_PATH / filenames[i]
img = load_image(str(image_path))
images.append(img)
bbox = [(df['xmin'][i],df['ymin'][i],df['xmax'][i],df['ymax'][i])]
bbox_list.append(bbox)
#bboxes_list = [[(), ()...]]
display_images(images, bboxes_list=bbox_list)
12-3 构建神经网络
「实战」构建单目标检测的神经网络
VGG+全连接层

def load_model(model_name, num_classes):
if model_name == 'VGG16':
# weights='imagenet'表示加载VGG16在ImageNet上预训练的权重
# include_top=False表示不包含全连接层
base_model = tf.keras.applications.VGG16(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
elif model_name == 'ResNet50':
base_model = tf.keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 冻结卷积层的参数
base_model.trainable = False
# 替换全连接层
model = models.Sequential([
base_model,
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32, activation='relu'),
layers.Dense(num_classes, activation='sigmoid')
])
model.summary()
return model
model_name = 'VGG16'
num_classes = 4
#
model = load_model(model_name, num_classes)
12-4 构造训练数据
统一训练数据格式
神经网络使用的预训练数据集是imagenet
其训练数据的宽度是224x224x3
banana训练数据的宽度是256x256x3
load_image,图片已经做了归一化
处理标注数据也要做归一化处理.
Python的 zip 函数
- zip([1,2,3...],[A,B,C...])
- 输出:[(1,A),(2,B),(3,C),...]
- zip((1, A), (2,B), (3,C),...]
- 输出:[(1,2,3,...),(A,B,C,...)]
zip总是输出一个迭代器
DataFrame的apply方法
- DataFrame相当于一个Excel表格
- 其apply方法可以对其中的每一行、每一列进行处理
- 相当于一个for循环,但比for高效,底层做了并行处理
# 导入必要的库
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from pathlib import Path
import albumentations as A
from sklearn.model_selection import train_test_split
# 定义基础路径和子目录
BASE_PATH = Path("./banana-detection/banana-detection/bananas_train/")
IMAGES_PATH = BASE_PATH / "images"
LABEL_PATH = BASE_PATH / "label.csv"
# 打印图像和标签文件夹路径以确认路径正确
print(IMAGES_PATH, LABEL_PATH)
# 从CSV文件中读取标签信息
df = pd.read_csv(LABEL_PATH)
filenames = df['img_name'].tolist() # 获取所有图像文件名列表
print(filenames)
# 定义函数用于加载并归一化图像
def load_image(path):
image = tf.io.read_file(path) # 读取图像文件
decode_image = tf.image.decode_jpeg(image, channels=3) # 解码JPEG格式的图像
img = tf.image.convert_image_dtype(decode_image, tf.float32) # 将图像转换为浮点类型并归一化到[0,1]
return img
# 初始化图像和边界框列表
images = []
bbox_list = []
# 加载前5个图像及其对应的边界框
for i in range(5):
if i == 4:
image_path = IMAGES_PATH / filenames[i]
img = load_image(str(image_path))
images.append(img)
bbox = [(df['xmin'][i], df['ymin'][i], df['xmax'][i], df['ymax'][i])]
bbox_list.append(bbox)
# 使用display_images函数显示图像和边界框(假设已定义)
# display_images(images, bboxes_list=bbox_list)
# 定义函数来加载预训练模型(VGG16或ResNet50)
def load_model(model_name, num_classes):
if model_name == 'VGG16':
base_model = tf.keras.applications.VGG16(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
elif model_name == 'ResNet50':
base_model = tf.keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
base_model.trainable = False # 冻结卷积层参数
# 添加自定义顶层
model = models.Sequential([
base_model,
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32, activation='relu'),
layers.Dense(num_classes, activation='sigmoid')
])
model.summary()
return model
model_name = 'VGG16'
num_classes = 4
model = load_model(model_name, num_classes)
# 处理每行数据以提取图像和边界框
def process_row(row):
image_path = IMAGES_PATH / row['img_name']
image = load_image(str(image_path))
bboxes = [(row['xmin'], row['ymin'], row['xmax'], row['ymax'])]
return image, bboxes
image_bboxes = df.apply(lambda row: process_row(row), axis=1)
images, all_bboxes = zip(*image_bboxes)
# 数据增强函数
def data_augmentation(image, bboxes):
transform = A.Compose([A.Resize(224, 224)], bbox_params=A.BboxParams(format='pascal_voc', label_fields=[]))
transformed = transform(image=image.numpy(), bboxes=bboxes)
return transformed['image'], transformed['bboxes']
# 归一化边界框
def normalize_bbox(bbox, image_width, image_height):
x_min, y_min, x_max, y_max = bbox
return [x_min / image_width, y_min / image_height, x_max / image_width, y_max / image_height]
# 应用数据增强和归一化
new_images, new_bbox_list = [], []
for i, img in enumerate(images):
bboxes = all_bboxes[i]
new_img, new_bboxes = data_augmentation(img, bboxes)
new_images.append(new_img)
bbox = new_bboxes[0]
new_bbox = normalize_bbox(bbox, new_img.shape[1], new_img.shape[0])
new_bbox_list.append(new_bbox)
# 转换为numpy数组并划分训练集和测试集
new_images = np.array(new_images, dtype=np.float32)
new_bbox_list = np.array(new_bbox_list, dtype=np.float32)
split = train_test_split(new_images, new_bbox_list, test_size=0.1, random_state=42)
train_images, test_images, train_labels, test_labels = split
# 编译模型
learning_rate = 1e-3
batch_size = 32
epochs = 25
opt = Adam(learning_rate=learning_rate)
model.compile(optimizer=opt, loss='mse', metrics=['accuracy'])
# 训练模型
H = model.fit(train_images, train_labels, validation_data=(test_images, test_labels),
batch_size=batch_size, epochs=epochs, verbose=1)
# 绘制训练过程中的损失变化
plt.plot(range(epochs), H.history['loss'], label='train loss')
plt.plot(range(epochs), H.history['val_loss'], label='val loss')
plt.title('Loss')
plt.xlabel('Epoch #')
plt.ylabel('Loss')
plt.legend(loc='upper right')
# 保存模型
BASE_OUTPUT = Path("./output")
MODEL_PATH = BASE_OUTPUT / "single_obj_detect.keras"
model.save(MODEL_PATH)
# 加载模型并进行预测
model = tf.keras.models.load_model(MODEL_PATH, compile=False)
TEST_BASE_PATH = Path("./data/banana-detection/bananas_val")
TEST_IMAGE_PATH = TEST_BASE_PATH / "images"
test_image_path = TEST_IMAGE_PATH / "15.png"
test_image = load_image(str(test_image_path))
resized_image = tf.image.resize(test_image, (224, 224))
predict_image = tf.expand_dims(resized_image, axis=0)
preds = model.predict(predict_image)
(startX, startY, endX, endY) = preds[0]
startX, startY, endX, endY = int(startX * resized_image.shape[1]), int(startY * resized_image.shape[0]), int(endX * resized_image.shape[1]), int(endY * resized_image.shape[0])
# 显示预测结果
display_images(resized_image, bboxes_list=[[(startX, startY, endX, endY)]])
12-5 模型训练(一)
「实战」单目标检测的训练
训练参数
- 损失函数是最简单的均方差(mse)
- 学习率 0.001(1e-3)
- 优化器是 Adam
- 批量训练大小是 32
- 训练轮次 25
12-6 模型训练(二)
# 转换为numpy数组并划分训练集和测试集
new_images = np.array(new_images, dtype=np.float32)
new_bbox_list = np.array(new_bbox_list, dtype=np.float32)
split = train_test_split(new_images, new_bbox_list, test_size=0.1, random_state=42)
train_images, test_images, train_labels, test_labels = split
# 编译模型
learning_rate = 1e-3
batch_size = 32
epochs = 25
opt = Adam(learning_rate=learning_rate)
model.compile(optimizer=opt, loss='mse', metrics=['accuracy'])
# 训练模型
H = model.fit(train_images, train_labels, validation_data=(test_images, test_labels),
batch_size=batch_size, epochs=epochs, verbose=1)
# 绘制训练过程中的损失变化
plt.plot(range(epochs), H.history['loss'], label='train loss')
plt.plot(range(epochs), H.history['val_loss'], label='val loss')
plt.title('Loss')
plt.xlabel('Epoch #')
plt.ylabel('Loss')
plt.legend(loc='upper right')
# 保存模型
BASE_OUTPUT = Path("./output")
MODEL_PATH = BASE_OUTPUT / "single_obj_detect.keras"
model.save(MODEL_PATH)
# 加载模型并进行预测
model = tf.keras.models.load_model(MODEL_PATH, compile=False)
TEST_BASE_PATH = Path("./data/banana-detection/bananas_val")
TEST_IMAGE_PATH = TEST_BASE_PATH / "images"
test_image_path = TEST_IMAGE_PATH / "15.png"
test_image = load_image(str(test_image_path))
resized_image = tf.image.resize(test_image, (224, 224))
predict_image = tf.expand_dims(resized_image, axis=0)
preds = model.predict(predict_image)
(startX, startY, endX, endY) = preds[0]
startX, startY, endX, endY = int(startX * resized_image.shape[1]), int(startY * resized_image.shape[0]), int(endX * resized_image.shape[1]), int(endY * resized_image.shape[0])
# 显示预测结果
display_images(resized_image, bboxes_list=[[(startX, startY, endX, endY)]])
12-7 使用模型
TEST_BASE_PATH = Path("./data/banana-detection/bananas_val")
TEST_IMAGE_PATH = TEST_BASE_PATH / "images"
test_image_path = TEST_IMAGE_PATH / "15.png"
print(test_image_path)
test_image = load_image(str(test_image_path))
resized_image = tf.image.resize(test_image, (224, 224))
predict_image = tf.expand_dims(resized_image, axis=0)
#predict要求的输出是【batch_size, (height, width, channels)】
preds = model.predict(predict_image)
(startX, startY, endX, endY) = preds[0]
startX = int(startX * resized_image.shape[1])
startY = int(startY * resized_image.shape[0])
endX = int(endX * resized_image.shape[1])
endY = int(endY * resized_image.shape[0])
display_images(resized_image, bboxes_list=[[(startX, startY, endX, endY)]])
12-8 Pytorch加载Dataset
「实战」Pytorch单目标检测-Dataset
# 设置环境变量以避免潜在的库冲突
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
# 导入必要的PyTorch模块
import torch
import torch.nn as nn # 神经网络模块
import torch.optim as optim # 优化算法
# 导入预训练模型及其权重
from torchvision import models
from torchvision.models import VGG16_Weights, ResNet18_Weights
# 数据加载工具
from torch.utils.data import DataLoader, Dataset
# 路径处理
from pathlib import Path
# 数据处理
import pandas as pd
# 图像读取
from torchvision.io import read_image
# 图像变换和增强
import albumentations as A
from albumentations.pytorch import ToTensorV2
def normalize_bbox(bbox, image_width, image_height):
"""
将边界框坐标归一化到[0,1]范围(相对于图像尺寸)
参数:
bbox: 边界框坐标 (x_min, y_min, x_max, y_max)
image_width: 图像宽度
image_height: 图像高度
返回:
归一化后的边界框坐标 [x_min, y_min, x_max, y_max]
"""
x_min, y_min, x_max, y_max = bbox
x_min = x_min / image_width
x_max = x_max / image_width
y_min = y_min / image_height
y_max = y_max / image_height
return [x_min, y_min, x_max, y_max]
# 使用Albumentations定义图像变换
transform = A.Compose([
A.Resize(224, 224), # 将图像调整为224x224(许多CNN模型的标准输入尺寸)
ToTensorV2() # 转换为PyTorch张量
], bbox_params=A.BboxParams(
format='pascal_voc', # 边界框格式 (xmin, ymin, xmax, ymax)
label_fields=[] # 不需要额外的标签字段
))
class CustomDataset(Dataset):
"""
自定义数据集类,用于加载图像和边界框
参数:
root_dir: 包含图像的根目录
csv_file: 包含标注的CSV文件(img_name, label, xmin, ymin, xmax, ymax)
transform: 要应用的图像变换
"""
def __init__(self, root_dir, csv_file, transform=None):
self.root_dir = root_dir # 图像根目录
self.annotations = pd.read_csv(csv_file) # 读取标注CSV文件
self.transform = transform # 图像变换
def __len__(self):
"""返回数据集中的样本数量"""
return len(self.annotations)
def __getitem__(self, idx):
"""
获取单个样本
参数:
idx: 样本索引
返回:
image: 图像张量
bbox: 归一化后的边界框坐标
"""
# 获取图像文件名并构建完整路径
img_name = self.annotations.iloc[idx, 0]
img_path = Path(self.root_dir) / img_name
# 读取图像并归一化到[0,1]范围
image = read_image(str(img_path)) / 255.0
# 获取边界框坐标并归一化
bbox = self.annotations.iloc[idx, 2:].values.astype(float)
bbox = normalize_bbox(bbox, image.shape[2], image.shape[1])
bboxes = [bbox] # 转换为列表形式(Albumentations要求)
# 应用图像变换(如果有)
if self.transform:
transformed = self.transform(
image=image.permute(1, 2, 0).numpy(), # 将CHW转为HWC并转为numpy
bboxes=bboxes
)
image = transformed['image'] # 变换后的图像
bboxes = transformed['bboxes'] # 变换后的边界框
return image, torch.tensor(bboxes[0], dtype=torch.float32)
class CustomModel(nn.Module):
"""
自定义模型类,基于预训练模型构建
参数:
model_name: 模型名称('vgg16'或'resnet18')
num_classes: 输出类别数(这里用于边界框回归,输出4个坐标值)
"""
def __init__(self, model_name, num_classes):
super(CustomModel, self).__init__()
if model_name == 'vgg16':
# 加载预训练的VGG16模型
self.model = models.vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
# 冻结特征提取层的参数(不更新权重)
for param in self.model.features.parameters():
param.requires_grad = False
# 计算特征维度
with torch.no_grad():
features = self.model.features(torch.randn(1, 3, 224, 224))
features_dim = features.view(features.size(0), -1).size(1)
# 替换分类器部分
self.model.classifier = nn.Sequential(
nn.Linear(features_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes), # 输出4个坐标值
nn.Sigmoid() # 将输出限制在[0,1]范围
)
elif model_name == 'resnet18':
# 加载预训练的ResNet18模型
self.model = models.resnet18(weights=ResNet18_Weights.DEFAULT)
# 冻结除全连接层外的所有参数
for name, param in self.model.named_parameters():
if not isinstance(param, nn.Linear):
param.requires_grad = False
# 获取全连接层的输入特征数
in_features = self.model.fc.in_features
# 替换全连接层
self.model.fc = nn.Sequential(
nn.Linear(in_features, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes), # 输出4个坐标值
nn.Sigmoid() # 将输出限制在[0,1]范围
)
else:
raise ValueError('未知模型名称: {}'.format(model_name))
print(self.model) # 打印模型结构
def forward(self, x):
"""前向传播"""
return self.model(x)
# 创建训练数据集和数据加载器
dataset = CustomDataset(
root_dir='./data/banana-detection/bananas_train/images/',
csv_file='./data/banana-detection/bananas_train/label.csv',
transform=transform
)
# 批量加载数据(批量大小32,打乱顺序)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 检测并设置设备(优先使用GPU,其次是MPS,最后是CPU)
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
print(f"使用设备: {device}")
# 模型配置
model_name = 'vgg16' # 选择模型架构
num_classes = 4 # 输出4个坐标值(xmin, ymin, xmax, ymax)
model = CustomModel(model_name=model_name, num_classes=num_classes)
model.to(device) # 将模型移动到指定设备
# 定义损失函数(均方误差损失)和优化器(Adam)
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# 训练参数
num_epochs = 25 # 训练轮数
# 训练循环
for epoch in range(num_epochs):
for images, bboxes in dataloader:
# 将数据移动到指定设备
images, bboxes = images.to(device), bboxes.to(device)
# 前向传播
outputs = model(images) # 预测边界框
loss = loss_fn(outputs, bboxes) # 计算损失
# 反向传播和优化
optimizer.zero_grad() # 清空梯度
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 打印每个epoch的损失
print(f'轮次 [{epoch+1}/{num_epochs}], 损失: {loss.item():.4f}')
# 创建输出目录
output_dir = Path('./output')
output_dir.mkdir(exist_ok=True) # 如果目录不存在则创建
# 保存模型权重
model_save_path = output_dir/'single_obj_detect.pth'
torch.save(model.state_dict(), model_save_path)
print(f"模型已保存到: {model_save_path}")
# 测试模型效果
from utils import display_images # 导入可视化工具
# 加载测试图像
test_image_path = './data/banana-detection/bananas_val/images/0.png'
test_image = read_image(test_image_path) / 255.0 # 读取并归一化
test_image = transform(image=test_image.permute(1, 2, 0).numpy())['image'] # 应用变换
# 准备模型输入
model.eval() # 设置为评估模式
pre_image = test_image.unsqueeze(0).to(device) # 添加批次维度并移动到设备
# 进行预测
with torch.no_grad():
outputs = model(pre_image)
(startX, startY, endX, endY) = outputs[0].cpu().numpy()
# 将归一化坐标转换回图像尺寸
height, width = test_image.shape[1], test_image.shape[2]
startX = int(startX * width)
startY = int(startY * height)
endX = int(endX * width)
endY = int(endY * height)
# 可视化结果
display_images(
test_image.permute(1, 2, 0), # 将CHW转为HWC
bboxes_list=[[(startX, startY, endX, endY)]] # 预测的边界框
)
12-9 Pytorch神经网络
12-10 Pytorch实现最简单的单目录检测-模型训练
# 创建训练数据集和数据加载器
dataset = CustomDataset(
root_dir='./data/banana-detection/bananas_train/images/',
csv_file='./data/banana-detection/bananas_train/label.csv',
transform=transform
)
# 批量加载数据(批量大小32,打乱顺序)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 检测并设置设备(优先使用GPU,其次是MPS,最后是CPU)
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
print(f"使用设备: {device}")
# 模型配置
model_name = 'vgg16' # 选择模型架构
num_classes = 4 # 输出4个坐标值(xmin, ymin, xmax, ymax)
model = CustomModel(model_name=model_name, num_classes=num_classes)
model.to(device) # 将模型移动到指定设备
# 定义损失函数(均方误差损失)和优化器(Adam)
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# 训练参数
num_epochs = 25 # 训练轮数
# 训练循环
for epoch in range(num_epochs):
for images, bboxes in dataloader:
# 将数据移动到指定设备
images, bboxes = images.to(device), bboxes.to(device)
# 前向传播
outputs = model(images) # 预测边界框
loss = loss_fn(outputs, bboxes) # 计算损失
# 反向传播和优化
optimizer.zero_grad() # 清空梯度
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 打印每个epoch的损失
print(f'轮次 [{epoch+1}/{num_epochs}], 损失: {loss.item():.4f}')
12-11 Pytorch-模型的使用
# 创建输出目录
output_dir = Path('./output')
output_dir.mkdir(exist_ok=True) # 如果目录不存在则创建
# 保存模型权重
model_save_path = output_dir/'single_obj_detect.pth'
torch.save(model.state_dict(), model_save_path)
print(f"模型已保存到: {model_save_path}")
# 保存模型权重
model_save_path = output_dir / 'single_obj_detect.pth'
torch.save(model.state_dict(), model_save_path)
print(f"模型已保存到: {model_save_path}")
# 测试模型效果
from utils import display_images # 导入可视化工具
# 加载测试图像
test_image_path = './data/banana-detection/bananas_val/images/0.png'
test_image = read_image(test_image_path) / 255.0 # 读取并归一化
test_image = transform(image=test_image.permute(1, 2, 0).numpy())['image'] # 应用变换
# 准备模型输入
model.eval() # 设置为评估模式
pre_image = test_image.unsqueeze(0).to(device) # 添加批次维度并移动到设备
# 进行预测
with torch.no_grad():
outputs = model(pre_image)
(startX, startY, endX, endY) = outputs[0].cpu().numpy()
# 将归一化坐标转换回图像尺寸
height, width = test_image.shape[1], test_image.shape[2]
startX = int(startX * width)
startY = int(startY * height)
endX = int(endX * width)
endY = int(endY * height)
# 可视化结果
display_images(
test_image.permute(1, 2, 0), # 将CHW转为HWC
bboxes_list=[[(startX, startY, endX, endY)]] # 预测的边界框
)
12-12 核心算法-滑动窗口
滑动窗口


滑动窗口的缺点
- 滑动窗口太小,计算成本会显著增加
- 滑动窗口太大,预测的会不准确
- 多个滑动窗口中有可能会预测到同一个目标
基于卷积滑动窗口

基于卷积的滑窗




基于滑动窗口的优点
- 通过卷积核就可以控制滑动窗口的大小
- 图片在卷积的过程中无感知的就进行了扫描
- 大大提高了计算的效率
12-13 核心算法-AnchorBox
滑动窗口的问题
- 检测物体位置的准确度不够
- 严重依赖窗口的大小
为了解决滑窗的问题,引入了Anchor Box
什么是Anchor Box?
它是在每个特征图像素上预先定义好的一个框


特征图与Anchor Box
- 每个特征图像素都预制了多个锚框
- 每个像素的锚框形状、大小都不一样
- 锚框是由k-means(聚类算法)决定的
- 同尺寸特征图中每个像素的锚框都一样
- 不同尺寸特征图中像素的锚框可能不一同

Anchor Box为什么可以检测的更准?
- Anchor Box 提供了不同形状和大小的“候选框”
- 通过预测与真实框的偏移量,更容易框住不同形状的物体
- 相当于我们有了基准,在这个基准上做微调
- 因此,通过它可以提高找到物体位置的准确性
Anchor Box的重要性
- Anchor Box 是特别关键的技术
- 是我们理解YOLO算法的关键点
- 它既用于训练阶段也用于推理阶段
12-14 核心算法-AnchorBox的工作原理
Anchor Box是如何工作的?
获得Anchor Box的不同方法
- k-means是对真实框进行聚类得到Anchor Box
- 采用组合法:不同尺寸、比例进行组合

- 手工设置
IoU (Intersection over Union)

例子

IoU越大,锚框与真实框越接近
IoU越大,锚框中存在物体的可能性越大
IoU 0.5或0.6有效
IoU计算的问题
- Anchor Box是在特征图上定义的
- Bounding Box是标注数据,表示的是原始图像的大小
- 两者不能直接进行IoU操作
解决办法
- 将Bounding Box(Ground Truth Box)映射到特征图上

- 是否进行平移(如果卷积时使用padding需要平移)

让Anchor Box与BoundingBox做IoU可以轻松判断Anchor Box中是否有物体
12-15 目标检测的技术发展路线
关于Anchor Box还有很多疑问
- 比说AnchorBox在训练和推理阶段的不同点
- 多个AnchoBox框住同一目标物体
要想将它讲清楚,我们需要再从宏观上聊一聊.
目标检测的发展路线
这几个模型称为:两阶段目标检测

这几个模型称为:单阶段目标检测

详细信息



R-CNN, Fast R-CNN, Faster R-CNN
12-16 双阶段目标检测模型-RCNN-FastRCNN与FasterRCNN
R-CNN, Fast R-CNN, Faster R-CNN
- CNN

- R-CNN: Regions with CNN features

- Fast R-CNN

- Faster R-CNN

小结
- Fast R-CNN对每张图片使用一次卷积替代了多次卷积
- Fast R-CNN使用全连接+softmax代替了SVM
- Faster R-CNN使用RPN+AnchoBox代替selective search
12-17 SelectiveSearch算法(一)
Efficient Graph-Based Image Segmentation

工作原理

- 将图中的每个像素当作一个结点建立一张图
- 相邻结点之间通过边的权重来确定它们之间差异的大小
- 差异越小,边的权重值也就越小
- 边权重的计算可通过多种方式计算 如颜色、亮度、纹理...
- 将所有边权重按从小到大进行排序
- 使用最小生成树,决定两个区域之间的合并
- 每个像素就是一个最小的可合并的区域
- 判定条件:两个区域内部边的最大值大于等于区域间边的权重
边权重的计算公式

合并条件公式

12-18 SelectiveSearch算法(二)
Selective Search算法

核心算法
- 使用Efficient Graph-Based Image Segment 进行初始化
- 进行相似性计算:颜色相似性,纹理相似性,大小相似性….
- 区域合并,根据相似分数,逐步合并相邻区域
- 生成候选区,合成过程中记录区域的边框作为最后输出
颜色相似性计算
- 计算每个区域的颜色直方图,如每种颜色25个bins
- 25bins是指25个柱子,也就是每10(255/25)个一个跨度
- 将3个通道连在一起,形成75维的直方图
- 同相邻区域的直方图做交叉距离计算,得到相似性
直方图交叉计算

- H1和 H2 bin相同
- 其交叉值为同一位置bin的最小值
- 如H1=[2,3,1],H2=[1,4,2]
- 两者交叉值为[1,3,1]

纹理相似性计算
- 可以使用Sobel这种传统算子来计算纹理
- 当然对于纹理相似性计算来说一般会选Prewitt算子
- 拿到纹理后构建10个bins的直方图
- 同相邻区域的直方图做交叉距离计算,得到相似性
大小相似性计算
- 这个公式有利用两上小块区域的合并

填充相似性计算

最终的相似性计算

小结
- 使用Efficient Graph-Based Image Segment进行初始化
- 进行相似性计算:颜色相似性,纹理相似性,大小相似性…..
- 区域合并,根据相似分数,逐步合并相邻区域
- 生成候选区,合成过程中记录区域的边框作为最后输出
12-19 支持向量机
SVM(Support Vector Machine)
核心思想
- 找一个超平面(二维是一条线,三维是面,三维以上是超平面)
- 让离超平面最近的点到超平面的距离最大化

- 只有距离超平面最近的点对超平面位置有影响,称为支持向量
- 对于线性可分的数据,SVM直接找最大间隔的超平面
- 对于线性不可分数据,引入了软间隔和核技巧(核函数)

软/硬间隔:允许一定程度的误分类.

核技巧(核函数)
将低维空间数据映射到高维空间

算法


其中X是已知的,要求出W 和 b 就可以得到这个超平面



12-20 Faster-RCNN实现-RPN
Faster R-CNN的实现
- 主干网络CONV
- RPN
- 前景、背景的classifier
- 偏移量
- Proposals(RPN结果,anchorbox)
- RoI pooling
- Classifier
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, ops
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# 简单的 Backbone 网络
class SimpleBackbone(nn.Module):
def __init__(self):
super(SimpleBackbone, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
self.stride = 4 #1 * 2 * 2
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
return x
# RPN(区域提议网络)
class RPN(nn.Module):
def __init__(self, in_channels, mid_channels=256, num_anchors=9):
super(RPN, self).__init__()
self.conv = nn.Conv2d(in_channels, mid_channels, kernel_size=3, stride=1, padding=1)
self.cls_layer = nn.Conv2d(mid_channels, num_anchors * 2, kernel_size=1)
self.reg_layer = nn.Conv2d(mid_channels, num_anchors * 4, kernel_size=1)
def forward(self, x):
x = F.relu(self.conv(x))
cls_scores = self.cls_layer(x)
bbox_preds = self.reg_layer(x)
return cls_scores, bbox_preds
# 生成 Anchor Box
def generate_anchors(feature_map_size, stride=4, scales=[8, 16, 32], ratios=[0.5, 1, 2]):
anchors = []
h, w = feature_map_size
base_size = stride
for i in range(h):
for j in range(w):
cx = j * stride + stride / 2
cy = i * stride + stride / 2
for scale in scales:
for ratio in ratios:
w_box = base_size * scale * (ratio ** 0.5)
h_box = base_size * scale / (ratio ** 0.5)
anchors.append([cx - w_box / 2, cy - h_box / 2, cx + w_box / 2, cy + h_box / 2])
return torch.tensor(anchors, dtype=torch.float32)
# Faster R-CNN 主模型
class FasterRCNN(nn.Module):
def __init__(self, num_classes):
super(FasterRCNN, self).__init__()
self.backbone = SimpleBackbone()
self.rpn = RPN(in_channels=256, num_anchors=9)
self.roi_align = ops.RoIAlign(output_size=(7, 7), spatial_scale=1/4, sampling_ratio=-1)
self.fc1 = nn.Linear(256 * 7 * 7, 1024)
self.fc2 = nn.Linear(1024, 1024)
self.cls_score = nn.Linear(1024, num_classes)
self.bbox_pred = nn.Linear(1024, num_classes * 4)
self.stride = self.backbone.stride
#每个batch调用一次,batch_size=1
def forward(self, images, targets=None):
feature_map = self.backbone(images)
batch_size, _, h, w = feature_map.shape
#rpn_cls_scores的形状是(batch_size, 9 * 2, 224/4, 224/4),
#rpn_bbox_preds的形状是(batch_size, 9 * 4, 224/4, 224/4)
rpn_cls_scores, rpn_bbox_preds = self.rpn(feature_map)
anchors = generate_anchors((h, w), stride=self.stride).to(images.device)
if self.training:
assert targets is not None, "Targets must be provided during training"
losses = {}
for i in range(batch_size):
rpn_loss_cls, rpn_loss_bbox = self.compute_rpn_loss(
rpn_cls_scores[i], rpn_bbox_preds[i], anchors, targets[i]
)
#rpn_cls_scores[i:i+1]的形状是(i, 9 * 2, 224/4, 224/4)
#proposals的形状是(n, 4),其中n是proposals的数量,4是[x1, y1, x2, y2]
proposals = self.generate_proposals(rpn_cls_scores[i:i+1], rpn_bbox_preds[i:i+1], anchors)
#torch.full((proposals.shape[0], 1), i, device=proposals.device)表示创建一个形状为(n,1)的tensor,每一项为i
#然后将新创建的tensor与proposals进行按列拼接,得到rois,其形状为:(n,5)
#其中n是proposals的数量,5是[batch_index, x1, y1, x2, y2]
rois = torch.cat([torch.full((proposals.shape[0], 1), i, device=proposals.device), proposals], dim=1)
roi_features = self.roi_align(feature_map, rois)
#roi_features.size(0)表示获得roi_features的行数,也就是roi的数量
# 下面这行代码的意思是,将roi_features的形状从(N, 256, 7, 7)变成(N, 256 * 7 * 7)
x = roi_features.view(roi_features.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
cls_scores = self.cls_score(x)
bbox_deltas = self.bbox_pred(x)
det_loss_cls, det_loss_bbox = self.compute_detection_loss(cls_scores, bbox_deltas, targets[i], proposals)
losses[f"rpn_loss_cls_{i}"] = rpn_loss_cls
losses[f"rpn_loss_bbox_{i}"] = rpn_loss_bbox
losses[f"det_loss_cls_{i}"] = det_loss_cls
losses[f"det_loss_bbox_{i}"] = det_loss_bbox
return losses
else:
predictions = []
for i in range(batch_size):
proposals = self.generate_proposals(rpn_cls_scores[i:i+1], rpn_bbox_preds[i:i+1], anchors)
rois = torch.cat([torch.full((proposals.shape[0], 1), i, device=proposals.device), proposals], dim=1)
roi_features = self.roi_align(feature_map, rois)
x = roi_features.view(roi_features.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
cls_scores = self.cls_score(x)
bbox_deltas = self.bbox_pred(x)
boxes = self.apply_bbox_deltas(proposals, bbox_deltas, cls_scores.argmax(dim=1))
scores = cls_scores.softmax(dim=1)
max_scores, pred_labels = scores.max(dim=1)
predictions.append({
"boxes": boxes,
"labels": pred_labels,
"scores": max_scores
})
return predictions
#这个函数作用计算RPN的loss,包括分类损失和回归损失。
def compute_rpn_loss(self, cls_scores, bbox_preds, anchors, target):
#cls_scores.view类似于numpy中的reshpae,其中第一个参数-1表示自己计算,第二个参数指明列数
cls_scores = cls_scores.view(-1, 2)
bbox_preds = bbox_preds.view(-1, 4)
gt_boxes = target["boxes"]
gt_labels = target["labels"]
ious = ops.box_iou(anchors, gt_boxes)
max_ious, max_idx = ious.max(dim=1)
labels = torch.zeros(anchors.shape[0], dtype=torch.int64, device=cls_scores.device) - 1
labels[max_ious > 0.7] = 1
labels[max_ious < 0.3] = 0
valid_mask = labels >= 0
#valid_mask表示删除掉anchors中与gt_boxes不匹配的anchors。
cls_scores = cls_scores[valid_mask] #这行代码的含义是删除掉anchors中与gt_boxes不匹配的anchors。
labels = labels[valid_mask]
bbox_preds = bbox_preds[valid_mask]
anchors = anchors[valid_mask]
rpn_loss_cls = F.cross_entropy(cls_scores, labels, ignore_index=-1) if labels.numel() > 0 else torch.tensor(0.0, device=cls_scores.device)
pos_mask = labels == 1 #将所有正样本找到,其结果为pas_mask[0]=true?
if pos_mask.sum() > 0:
pos_anchors = anchors[pos_mask]
pos_preds = bbox_preds[pos_mask]
pos_gt = gt_boxes[max_idx[pos_mask]]
target_deltas = self.encode_boxes(pos_anchors, pos_gt) #得到真实位置与AnchorBox的偏移量。
rpn_loss_bbox = F.smooth_l1_loss(pos_preds, target_deltas, reduction="sum") / pos_mask.sum() #计算预测的偏移量与真实偏移量的差异。
else:
rpn_loss_bbox = torch.tensor(0.0, device=cls_scores.device)
return rpn_loss_cls, rpn_loss_bbox
def compute_detection_loss(self, cls_scores, bbox_deltas, target, proposals):
gt_boxes = target["boxes"]
gt_labels = target["labels"]
#box_iou 函数计算 proposals 中的每个 box 与 gt_boxes 中的每个 box 之间的 IoU值
#返回的是(N,M)的张量,dim=1表示按行寻找最大值
ious = ops.box_iou(proposals, gt_boxes)
#找出每个 proposal 的最大 IoU 及对应的 ground-truth box 索引
max_ious, max_idx = ious.max(dim=1)
#创建一个与proposals相同维度的tensor,并将其初始化为0。
labels = torch.full((proposals.shape[0],), 0, dtype=torch.int64, device=cls_scores.device)
pos_mask = max_ious >= 0.5
labels[pos_mask] = gt_labels[max_idx[pos_mask]]
det_loss_cls = F.cross_entropy(cls_scores, labels)
if pos_mask.sum() > 0:
pos_proposals = proposals[pos_mask]
pos_deltas = bbox_deltas[pos_mask]
pos_gt_boxes = gt_boxes[max_idx[pos_mask]]
target_deltas = self.encode_boxes(pos_proposals, pos_gt_boxes)
det_loss_bbox = F.smooth_l1_loss(pos_deltas, target_deltas, reduction="sum") / pos_mask.sum()
else:
det_loss_bbox = torch.tensor(0.0, device=cls_scores.device)
return det_loss_cls, det_loss_bbox
def encode_boxes(self, proposals, gt_boxes):
#proposals中的数据格式是[x1, y1, x2, y2],gt_boxes中的数据格式是[x1, y1, x2, y2]。
proposals_w = proposals[:, 2] - proposals[:, 0]
proposals_h = proposals[:, 3] - proposals[:, 1]
proposals_cx = proposals[:, 0] + proposals_w / 2
proposals_cy = proposals[:, 1] + proposals_h / 2
gt_w = gt_boxes[:, 2] - gt_boxes[:, 0]
gt_h = gt_boxes[:, 3] - gt_boxes[:, 1]
gt_cx = gt_boxes[:, 0] + gt_w / 2
gt_cy = gt_boxes[:, 1] + gt_h / 2
dx = (gt_cx - proposals_cx) / proposals_w
dy = (gt_cy - proposals_cy) / proposals_h
#这里使用log的原因是为了让dw,dh发生微小的变化,从而更容易通过损失函数的计算更新参数
#当gt_w 等于 proposals_w时,log(gt_w / proposals_w) = 0,所以dw = 0
#当gt_w 大于 proposals_w时,log(gt_w / proposals_w) 是一个很小的正值
#当gt_w 小于 proposals_w时,log(gt_w / proposals_w) 是一个很小的负值
dw = torch.log(gt_w / proposals_w)
dh = torch.log(gt_h / proposals_h)
return torch.stack([dx, dy, dw, dh], dim=1)
def apply_bbox_deltas(self, proposals, deltas, labels=None):
#proposals中的数据格式是[x1, y1, x2, y2],
# deltas中的数据格式是[dx, dy, dw, dh]。
proposals_w = proposals[:, 2] - proposals[:, 0]
proposals_h = proposals[:, 3] - proposals[:, 1]
proposals_cx = proposals[:, 0] + proposals_w / 2
proposals_cy = proposals[:, 1] + proposals_h / 2
if labels is not None:
#labels是一个一维张量,其长度等于proposals的长度。
#labels中的每个元素表示对应的proposal的类别标签。
batch_size = proposals.shape[0]
indices = torch.arange(batch_size, device=deltas.device) * self.cls_score.out_features + labels
dx = deltas.view(-1, 4)[indices, 0]
dy = deltas.view(-1, 4)[indices, 1]
dw = deltas.view(-1, 4)[indices, 2]
dh = deltas.view(-1, 4)[indices, 3]
else:
dx = deltas[:, 0]
dy = deltas[:, 1]
dw = deltas[:, 2]
dh = deltas[:, 3]
#dx 表示预测的候选框的偏移比例,比如0.1表示向右移动10%的宽度
pred_cx = dx * proposals_w + proposals_cx
pred_cy = dy * proposals_h + proposals_cy
pred_w = torch.exp(dw) * proposals_w
pred_h = torch.exp(dh) * proposals_h
return torch.stack([pred_cx - pred_w / 2, pred_cy - pred_h / 2, pred_cx + pred_w / 2, pred_cy + pred_h / 2], dim=1)
def generate_proposals(self, cls_scores, bbox_preds, anchors):
#cls_scores原始形状是(batch_size, 9 * 2, 224/4, 224/4)
#cls_scores.view(-1, 2)相当于将一个tensor reshape 成二维tensor, 即转成n行2列
#dim=-1表示按前面参数的最后一个维度计算softmax, 最后一个维度是2
#[:, 1]表示取c前面参数ls_scores的第二列
cls_probs = torch.softmax(cls_scores.view(-1, 2), dim=-1)[:, 1]
bbox_preds = bbox_preds.view(-1, 4)
proposals = self.apply_bbox_deltas(anchors, bbox_preds)
scores = cls_probs
keep = ops.nms(proposals, scores, iou_threshold=0.7)
proposals = proposals[keep]
scores = scores[keep]
#proposals是候选区,共有4列,其中第0列表示cx, 1列表示cy, 2列表示w, 3列表示h
#为了不让预测的框超出图像范围,所以对proposals进行裁剪
proposals[:, 0] = torch.clamp(proposals[:, 0], min=0)
proposals[:, 1] = torch.clamp(proposals[:, 1], min=0)
proposals[:, 2] = torch.clamp(proposals[:, 2], max=224)
proposals[:, 3] = torch.clamp(proposals[:, 3], max=224)
#numel函数的作用是返回张量中元素的总数
if scores.numel() == 0:
return torch.empty((0, 4), device=anchors.device)
#scores = torch.tensor([0.2, 0.8, 0.1, 0.9, 0.5, 0.7, 0.3, 0.6, 0.4, 0.95, 0.85, 0.15])
#取值最大的10个值的索引值
top_n = torch.topk(scores, min(10, scores.shape[0])).indices
return proposals[top_n]
# 数据集
class CustomDataset(Dataset):
def __init__(self):
self.images = [Image.open("example.jpg").convert("RGB").resize((224, 224))]
self.targets = [{"boxes": torch.tensor([[50, 50, 150, 150]], dtype=torch.float32),
"labels": torch.tensor([1], dtype=torch.int64)}]
def __len__(self):
return len(self.images)
def __getitem__(self, idx):
img = transforms.ToTensor()(self.images[idx])
return img, self.targets[idx]
# 训练函数
def train_model(model, data_loader, optimizer, num_epochs, device):
model.train()
for epoch in range(num_epochs):
total_loss = 0.0
for images, targets in data_loader:
images = images.to(device)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
losses = model(images, targets)
loss = sum(loss for loss in losses.values())
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")
# 推理函数
def inference(model, image_path, device):
model.eval()
img = transforms.ToTensor()(Image.open(image_path).convert("RGB").resize((224, 224))).unsqueeze(0).to(device)
with torch.no_grad():
predictions = model(img)[0]
img = transforms.ToPILImage()(img.squeeze(0).cpu())
fig, ax = plt.subplots(1)
ax.imshow(img)
for box, label, score in zip(predictions["boxes"], predictions["labels"], predictions["scores"]):
if score > 0.5:
x_min, y_min, x_max, y_max = box.cpu().numpy()
rect = patches.Rectangle((x_min, y_min), x_max - x_min, y_max - y_max, linewidth=2, edgecolor="r", facecolor="none")
ax.add_patch(rect)
plt.text(x_min, y_min, f"Label: {label.item()}, Score: {score.item():.2f}", color="white", fontsize=12,
bbox=dict(facecolor="red", alpha=0.5))
plt.axis("off")
plt.show()
# 主函数
def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FasterRCNN(num_classes=2).to(device)
dataset = CustomDataset()
data_loader = DataLoader(dataset, batch_size=1, shuffle=True)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
train_model(model, data_loader, optimizer, num_epochs=5, device=device)
inference(model, "example.jpg", device)
if __name__ == "__main__":
main()
RPN
- 一层3x3卷积,增强对周围的感知能力,适配功能
- 两个检测头,1x1卷积
- Classifier header,分为前景和背景
- box header,用于预测box偏移量
12-21 Faster-RCNN实现-FasterRCNN类
FasterRCNN类
RPN中的3x3卷积是否可以移到主干网络?

FasterRCNN类

12-22 Faste的-RCNN实现-为什么BBox的预测是线性回归
BBox的预测为什么是线性回归问题?
线性回归



12-23 Faster-RCNN实现-AnchorBox的实现
Anchor box

12-24 Faster-RCNN实现-生成候选框
12-25 计算机视频核心算法-RoIPooling
Rol Pooling技术

Rol Pooling核心算法

import torch
from math import floor, ceil
class RoIPooling:
def __init__(self, pooled_height, pooled_width, spatial_scale):
"""
初始化ROI Pooling层
:param pooled_height: 池化输出的高度
:param pooled_width: 池化输出的宽度
:param spatial_scale: 特征图相对于输入图像的缩放比例
"""
self.pooled_height = pooled_height
self.pooled_width = pooled_width
self.spatial_scale = spatial_scale
def forward(self, feature_map, rois):
"""
执行ROI Pooling前向传播
:param feature_map: 输入特征图,形状为 (batch_size, channels, height, width)
:param rois: 输入ROI,形状为 (num_rois, 5),每行格式为 [batch_idx, x1, y1, x2, y2]
:return: 池化后的输出,形状为 (num_rois, channels, pooled_height, pooled_width)
"""
batch_size, channels, height, width = feature_map.size()
num_rois = rois.size(0)
# 初始化输出张量
output = torch.zeros(num_rois, channels, self.pooled_height, self.pooled_width,
device=feature_map.device, dtype=feature_map.dtype)
for i in range(num_rois):
roi = rois[i]
batch_idx = int(roi[0]) # ROI对应的batch索引
x1, y1, x2, y2 = roi[1:] * self.spatial_scale # 由原始图缩放到特征图坐标系
roi_width = max(x2 - x1, 1.0) # 确保宽度至少为1
roi_height = max(y2 - y1, 1.0) # 确保高度至少为1
# 计算每个池化网格的大小
bin_height = roi_height / self.pooled_height
bin_width = roi_width / self.pooled_width
for h in range(self.pooled_height):
for w in range(self.pooled_width):
# 计算当前网格在特征图上的坐标范围
h_start = floor(y1 + h * bin_height)
h_end = ceil(y1 + (h + 1) * bin_height)
w_start = floor(x1 + w * bin_width)
w_end = ceil(x1 + (w + 1) * bin_width)
print(y1, x1, h, w, bin_height, bin_width, h_start, h_end, w_start, w_end)
# 边界检查,确保不越界
h_start = min(max(h_start, 0), height - 1)
h_end = min(max(h_end, 0), height)
w_start = min(max(w_start, 0), width - 1)
w_end = min(max(w_end, 0), width)
# 执行最大池化
if h_start < h_end and w_start < w_end:
region = feature_map[batch_idx, :, h_start:h_end, w_start:w_end]
if region.numel() > 0: # 确保区域非空
output[i, :, h, w] = torch.max(region.reshape(channels, -1), dim=1)[0]
else:
output[i, :, h, w] = 0 # 空区域填充0
else:
output[i, :, h, w] = 0 # 无效网格填充0
return output
# 示例用法
if __name__ == "__main__":
# 假设输入数据
batch_size, channels, height, width = 2, 3, 10, 10
feature_map = torch.randn(batch_size, channels, height, width) # 随机特征图
rois = torch.tensor([
[0, 10, 10, 30, 30], # batch_idx=0, (x1, y1, x2, y2)
[1, 20, 20, 40, 40], # batch_idx=1, (x1, y1, x2, y2)
], dtype=torch.float32)
# 初始化ROI Pooling层
roi_pool = RoIPooling(pooled_height=2, pooled_width=2, spatial_scale=0.25)
# 执行前向传播
output = roi_pool.forward(feature_map, rois)
print("Output shape:", output.shape) # 应为 (2, 3, 2, 2)
print("Output:", output)
def roi_align(input, rois, spatial_scale, pooled_height, pooled_width, sampling_ratio, aligned):
# 保存输入的原始数据类型,以便最后将输出转换回该类型
orig_dtype = input.dtype
# 可能对输入张量和 RoI 张量进行类型转换(例如转为浮点型)
input = maybe_cast(input) # 输入特征图,形状为 [N, C, H, W](批次大小,通道数,高度,宽度)
rois = maybe_cast(rois) # RoI 坐标,形状为 [K, 5](RoI 数量,每行格式为 [batch_idx, x1, y1, x2, y2])
# 获取输入特征图的高度和宽度
_, _, height, width = input.size()
# 创建池化高度和宽度的索引张量,用于后续计算采样点位置
ph = torch.arange(pooled_height, device=input.device) # [PH],池化高度的坐标 [0, 1, ..., pooled_height-1]
pw = torch.arange(pooled_width, device=input.device) # [PW],池化宽度的坐标 [0, 1, ..., pooled_width-1]
# 从 RoI 张量中提取批次索引和坐标,并根据 spatial_scale 缩放
roi_batch_ind = rois[:, 0].int() # [K],每个 RoI 所属的批次索引
offset = 0.5 if aligned else 0.0 # 如果 aligned=True,则偏移 0.5 以对齐像素中心,否则无偏移
roi_start_w = rois[:, 1] * spatial_scale - offset # [K],RoI 的起始宽度坐标(x1)
roi_start_h = rois[:, 2] * spatial_scale - offset # [K],RoI 的起始高度坐标(y1)
roi_end_w = rois[:, 3] * spatial_scale - offset # [K],RoI 的结束宽度坐标(x2)
roi_end_h = rois[:, 4] * spatial_scale - offset # [K],RoI 的结束高度坐标(y2)
# 计算每个 RoI 的宽度和高度
roi_width = roi_end_w - roi_start_w # [K],RoI 的宽度
roi_height = roi_end_h - roi_start_h # [K],RoI 的高度
if not aligned:
# 如果不对齐,则确保宽度和高度至少为 1,避免后续除零错误
roi_width = torch.clamp(roi_width, min=1.0) # [K]
roi_height = torch.clamp(roi_height, min=1.0) # [K]
# 计算每个池化单元格(bin)的高度和宽度
bin_size_h = roi_height / pooled_height # [K],每个高度单元的尺寸
bin_size_w = roi_width / pooled_width # [K],每个宽度单元的尺寸
# 判断是否使用精确采样(sampling_ratio > 0 表示指定采样点数量)
exact_sampling = sampling_ratio > 0
# 计算每个池化单元内的采样点数量(高度和宽度方向)
# 如果 exact_sampling=True,则直接使用 sampling_ratio;否则根据 RoI 大小自适应计算
roi_bin_grid_h = sampling_ratio if exact_sampling else torch.ceil(roi_height / pooled_height) # 标量或 [K]
roi_bin_grid_w = sampling_ratio if exact_sampling else torch.ceil(roi_width / pooled_width) # 标量或 [K]
if exact_sampling:
# 精确采样模式:固定采样点数量
count = max(roi_bin_grid_h * roi_bin_grid_w, 1) # 标量,总采样点数
iy = torch.arange(roi_bin_grid_h, device=input.device) # [IY],高度方向采样点索引
ix = torch.arange(roi_bin_grid_w, device=input.device) # [IX],宽度方向采样点索引
ymask = None # 无需掩码
xmask = None
else:
# 自适应采样模式:采样点数量取决于 RoI 大小
count = torch.clamp(roi_bin_grid_h * roi_bin_grid_w, min=1) # [K],每个 RoI 的采样点数
iy = torch.arange(height, device=input.device) # [IY],最大可能的高度采样点索引
ix = torch.arange(width, device=input.device) # [IX],最大可能的宽度采样点索引
# 创建掩码,标记有效的采样点(避免超出 RoI 大小的采样)
ymask = iy[None, :] < roi_bin_grid_h[:, None] # [K, IY],高度方向的有效性掩码
xmask = ix[None, :] < roi_bin_grid_w[:, None] # [K, IX],宽度方向的有效性掩码
# 辅助函数,将张量从 [K] 扩展为 [K, 1, 1],便于广播计算
def from_K(t):
return t[:, None, None]
# 计算采样点的 y 和 x 坐标
y = (
from_K(roi_start_h) # [K, 1, 1],RoI 的起始高度
+ ph[None, :, None] * from_K(bin_size_h) # [1, PH, 1] * [K, 1, 1],加上池化单元的偏移
+ (iy[None, None, :] + 0.5).to(input.dtype) * from_K(bin_size_h / roi_bin_grid_h) # 采样点在单元内的偏移
) # 结果形状:[K, PH, IY]
x = (
from_K(roi_start_w) # [K, 1, 1],RoI 的起始宽度
+ pw[None, :, None] * from_K(bin_size_w) # [1, PW, 1] * [K, 1, 1],加上池化单元的偏移
+ (ix[None, None, :] + 0.5).to(input.dtype) * from_K(bin_size_w / roi_bin_grid_w) # 采样点在单元内的偏移
) # 结果形状:[K, PW, IX]
# 使用双线性插值计算采样点的特征值
val = _bilinear_interpolate(input, roi_batch_ind, y, x, ymask, xmask) # [K, C, PH, PW, IY, IX]
# 如果是自适应采样,应用掩码以排除无效采样点
if not exact_sampling:
val = torch.where(ymask[:, None, None, None, :, None], val, 0) # [K, C, PH, PW, IY, IX]
val = torch.where(xmask[:, None, None, None, None, :], val, 0) # [K, C, PH, PW, IY, IX]
# 对采样点的值求和,去除 IY 和 IX 维度,得到池化结果
output = val.sum((-1, -2)) # [K, C, PH, PW]
# 归一化:除以采样点数量
if isinstance(count, torch.Tensor):
output /= count[:, None, None, None] # [K, 1, 1, 1]
else:
output /= count # 标量
# 将输出转换回原始数据类型并返回
output = output.to(orig_dtype)
return output
12-26 计算机视觉核心算法-NMS
NMS (Non-Maximum Suppression)
12-27 YOLO的整体架构
回顾Faster R-CNN

回顾LeNet5

回顾单目标检测-硬train一发

结论
- 卷积网络是可以直接做分类的-手写字识别
- 卷积网络是可以输出位置信息的-硬train一发
- 卷积网络是否同时输出分类和位置信息呢?当然也可以
YOLO整体架构

12-28 YOLO的输出
YOLO的输出

Anchor Box

输出

12-29 YOLO输出中位置信息的具体含义
YOLO输出中的dx,dy,dw,dh
dx,dy的含义

dw,dh的含义

YOLO输出中的(dx,dy,dw,dh)
- 刚开始训练时,dx,dy,dw,dh的值是"瞎”预测的,可以是任意值

小结
- dx,dy是预测的预测框中心点相对于其所在的小格子的偏移
- 它不依赖AnchorBox做计算,但每个AnchorBox都有dx,dy
- dw,dh是预测AnchorBox缩放多少倍才能得到真实框
- dw,dh是根据AnchorBox进行预测的
- 一开始预测的值不准,但通过损失函数会让它们越来越准
12-30 YOLO输出中AnchorBox与IoU的作用
每个小格子都要预测dx,dy,dw,dh吗?

IoU

关于PC的预测

小结
- YOLO神经网络会为每个小格子中的每个Anchor预测数值
- 生成AnchorBox时,会通过IoU过滤掉不必要的Anchor
- 因此即使含Anchor的预测值也将被丢弃不进行计算
- 关于P默认全为负样本,当生成AnchorBox时再更新
- 最终将正负样本交给损失函数更新P。的预测参数
YOLOv3



12-31 YOLOv1网络架构
YOLO-网络架构
YOLOv1-网络架构

如何理解channel
- 感知上,通道可以认为是我们观察同一事物的不同角度

卷积与池化
- 卷积与池化是卷积神经网路最常见的组合
- 最大池化是最常见的
回顾1x1卷积的作用
- 它可以对通道进行上采样或下采样
- 增加神经网络的深度,起到全连接的作用
- 增加非线性变换
1x1与3x3组合增加网络深度
- 深度学习的核心是深度,层级越深效果越好
- 通过1x1降采样,防止channel爆炸
- 通过3x3获得不同层级周围像素的表征
- 同时增加channel数,获得更多观察角度

12-32 YOLOv2网络架构
YOLOv2训练分为两个阶段
- 使用ImageNet数据集进行分类训练
- 使用VOC进行目标检测训练
YOLOv2-分类网络架构

- Darknet-19网络
- Darknet架构(API、库,C/C++)
- Convolutional(CBL)
- Conv2d
- BatchNormalize
- Leaky RELU
YOLOv2-识别网络架构

- 采用全新的Darknet网络架构
- 去掉全连接层,支持多尺寸训练
- 采用AnchorBox技术,识别更准确
- 支持简单的多尺寸识别
全连接层为什么不能进行多尺寸训练?

全卷积层为什么能进行多尺寸训练?

小结
- 卷积+池化
- CBL=卷积+批量数据归一化+LeakyRELU
- 1x1+3x3增加网络深度
- YOLOv2采用全新的网络架构(Darknet-19)
- 去掉全连接,使用全卷积
- 引入了AnchorBox
- 支持简单的多尺寸识别
12-33 YOLOv3网络架构
YOLOv3-网络架构(Darknet-53)


特征图金字塔(FPN)

concat

上采样+concat

为什么要使用上采样?
- 通道是我们观察同一事物的不同角度
- 特征图越小,观察的越粗,但受视野变大了
- 小特征图上采样与大特征图拼接在一起可以增大观察的范围
- 同时仍然保持对小目标的敏感度,这样识别准确率会更高
12-34 YOLOv4及其以后的网络架构
YOLOv4及其以后的网络架构
- 将网络分成三大块:主干网络、Neck、检测头
- 主干网络用于提取特征图
- Neck用于构造特征图金字塔
- 检测头用于目标的识别

SPP(Spatial Pyramid Pooling)

Mish激活函数
- f(x)=x-tanh(In(1+ex))
- 输出更加平滑
- 由于计算量大,性能略差
YOLOv5-网络架构

Bottleneck

YOLOv8-网络架构

12-35 YOLO损失函数
YOLOv3损失函数

边界框坐标损失

置信度标损失

分类损失

第13章 YOLO实战与应用
13-1 导学
YOLO实战
YOLO简史
- 2015年发布第一版YOLO,因速度快,精度高非广受欢迎
- 2016年YOLQ2采用新的网络架构,批量归一化,引入锚框
- 2018年YOLO3使用Darknet53,多锚框,passthrough
- 2020年YOLO4,引入Mosaic数据增强技术
- 2020年YOLO5,性能有了大幅提升
- 2022年YOLO6,由美团发布
- 2022年YOLO7增加了更多的任务,如姿势识别(YOLOv4)
- 2023年YQLO8,性能进一步提升,支持全方位的视觉任务
- 2024年YOLO9,支持可编程梯度信息等新技术
- 2024年YOLO10,由清华大学发布,在实时性方面大幅进步imooc
- 2024年YOLO11,在计算机视觉方面有了全方位的提升
- 2025年YOLO12,采用了(大语言模型的)自注意力机制
YOLO能做什么?
- 分类、目标检测
- 目标追踪、姿态识别、物体分割
- 既可以理解图片内容,也可以理解视频内容
- 除了可以运行在PC上,它还能运行在移动设备上
本章主要内容
- YOLO环境的搭建 目标追踪 YOLO目标检测 姿态估计 图像分类 模型转换 物体分割
使用YOLO的方式
- 以命令行的方式运行
- 通过编码方式运行YOLO
YOLO命令基本格式
yolo [TASK] MODE ARGS
TASK
- detect:用于目标检测
- segment:用于分割
- classify:用于图像分类
- pose:用于姿态评估
- obb:定向检测(可以确定物体的角度)
MODE
- train:训练模式
- val:评估模式
- predict:推理模式
- export:导出模式
- track:追踪模式
- benchmark:评估模型性能
13-2 命令行方式进行目标识别
安装最新版本的YOLO
- 从YOLOv3之后,YOLO各版本由不同的公司发布
- ultralytic发布的YOLOv5,YOLOv8,YOLOv11最知名
- 安装YOLO命令:pip install ultralytics
- 依赖>=python3.8,Pytorch>=1.8
YOLO模型的不同尺寸

YOLO目标识别命令
mkdir yolo11
yolo predict
model=...
source=...
device=[cuda | mps]
hide_conf =True#隐藏置信度
hide_labels =True#隐藏标签
yolo predict model=yolo11n.pt source=./bus.jpg
13-3 编程方式进行目标识别(一)
# 设置环境变量以避免潜在的库冲突
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
from ultralytics import YOLO
from torchvision.io import read_image
from utils import display_images
MODEL_PATH = 'yolo11n.pt'
IMAGE_PATH = 'bus.jpg'
CONFIDENCE_THRESHOLD = 0.5
print('Loading model...')
model = YOLO(MODEL_PATH)
print('Model loaded.')
# Load image
image = read_image(IMAGE_PATH)
display_images(image.permute(1, 2, 0))
print('Running inference...')
results = model(IMAGE_PATH)
print('Inference done.')
results[0].show()
#results是一个集合,对于每一张图片,推理后的结果都放在result中
#对于单张图片,它的检测结果都放在result[0]
for result in results:
print(result)
boxes = []
roi_images = []
for result in results:
#print(result.boxes)
xyxy = result.boxes.xyxy
confidences = result.boxes.conf
masks = confidences > CONFIDENCE_THRESHOLD
filtered_xyxy = xyxy[masks]
#boxes.append(xyxy)
for i, box in enumerate(filtered_xyxy):
x1,y1,x2,y2 = box[:4].int()
roi = image[:, y1:y2, x1:x2]
roi_images.append(roi.permute(1, 2, 0))
display_images(roi_images)
13-4 编程方式进行目标识别(二)
13-5 进行实时目标识别
「实战二」使用YOLO进行实时检测
# 禁止OpenMP库冲突警告
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
# 导入依赖库
import torch
import cv2
from ultralytics import YOLO
import time
from datetime import datetime
# ======================
# 配置参数
# ======================
MODEL_PATH = 'yolo11n.pt'
SAVE_DIR = 'detected'
RTSP_URL = 'rtsp://admin:VWEDcc123@@192.168.86.150:554/media/video'
SHOW_FPS = True
SHOW_CONSOLE_OUTPUT = True
TARGET_WIDTH = 1280
CONFIDENCE_THRESHOLD = 0.5
# 检测模式配置 (只能启用一种)
DETECT_BY_TIME = True # 按时间间隔检测
DETECT_INTERVAL_SEC = 2 # 每2秒检测一次
DETECT_BY_FRAMES = False # 按帧间隔检测
DETECT_INTERVAL_FRAMES = 30 # 每30帧检测一次
# ======================
# 工具函数
# ======================
def create_save_dir(directory):
"""创建保存目录"""
if not os.path.exists(directory):
os.makedirs(directory)
print(f"Created directory: {directory}")
def select_device():
"""自动选择可用设备"""
device = 'cpu'
if torch.cuda.is_available():
device = 'cuda'
elif torch.backends.mps.is_available():
device = 'mps'
print(f'DEVICE: {device}')
return device
def load_model(model_path):
"""加载YOLO模型"""
print('Loading model...')
model = YOLO(model_path)
print('Model loaded.')
return model
def open_video_stream(rtsp_url):
"""打开视频流"""
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
print("Failed to open RTSP stream.")
exit()
return cap
def get_target_resolution(orig_width, orig_height, target_width):
"""根据目标宽度计算目标高度(保持宽高比)"""
target_height = int(target_width * orig_height / orig_width)
print(f'Target processing size: {target_width}x{target_height}')
return target_height
def save_detected_object(frame, box, orig_size, save_dir, class_name):
"""保存检测到的对象图像"""
x1, y1, x2, y2 = box.xyxy[0].tolist()
orig_w, orig_h = orig_size
resized_w, resized_h = TARGET_WIDTH, get_target_resolution(orig_w, orig_h, TARGET_WIDTH)
# 坐标转换回原始尺寸
orig_x1 = int(x1 * orig_w / resized_w)
orig_y1 = int(y1 * orig_h / resized_h)
orig_x2 = int(x2 * orig_w / resized_w)
orig_y2 = int(y2 * orig_h / resized_h)
# 边界检查
orig_x1 = max(0, orig_x1)
orig_y1 = max(0, orig_y1)
orig_x2 = min(orig_w - 1, orig_x2)
orig_y2 = min(orig_h - 1, orig_y2)
# 创建类别目录
class_dir = os.path.join(save_dir, class_name)
create_save_dir(class_dir)
# 裁剪并保存对象图像
obj_img = frame[orig_y1:orig_y2, orig_x1:orig_x2]
if obj_img.size > 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
filename = f"{class_dir}/{class_name}_{timestamp}.jpg"
cv2.imwrite(filename, obj_img)
print(f"Saved detected {class_name} to: {filename}")
if SHOW_CONSOLE_OUTPUT:
confidence = float(box.conf)
print(f"Detected: {class_name} ({confidence:.2f}) at [{orig_x1}, {orig_y1}, {orig_x2}, {orig_y2}]")
def save_annotated_frame(annotated_frame, save_dir, class_name=None):
"""保存带有检测框的完整帧图像"""
if class_name:
save_dir = os.path.join(save_dir, class_name)
create_save_dir(save_dir)
if annotated_frame.size > 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
prefix = f"detected_{class_name}_" if class_name else "detected_frame_"
filename = f"{save_dir}/{prefix}{timestamp}.jpg"
cv2.imwrite(filename, annotated_frame)
print(f"Saved annotated frame to: {filename}")
def draw_fps_and_info(frame, prev_time, target_size, fps, detection_active):
"""在帧上绘制FPS和尺寸信息"""
cv2.putText(frame, f'FPS: {fps:.1f}', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, f'Size: {target_size[0]}x{target_size[1]}', (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 显示当前检测状态
status = "DETECTING" if detection_active else "MONITORING"
color = (0, 255, 0) if detection_active else (0, 0, 255)
cv2.putText(frame, f'Status: {status}', (10, 110),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# ======================
# 主程序入口
# ======================
if __name__ == "__main__":
# 初始化
create_save_dir(SAVE_DIR)
DEVICE = select_device()
model = load_model(MODEL_PATH)
cap = open_video_stream(RTSP_URL)
# 获取原始视频尺寸
orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f'Original stream size: {orig_width}x{orig_height}')
target_height = get_target_resolution(orig_width, orig_height, TARGET_WIDTH)
# 性能统计变量
frame_count = 0
start_time = time.time()
prev_time = start_time
last_detect_time = start_time
# 主循环
while True:
ret, frame = cap.read()
if not ret:
print('Failed to get frame or stream ended.')
break
frame_count += 1
current_time = time.time()
# 计算FPS
fps = 1 / (current_time - prev_time + 1e-5) # 防止除零错误
prev_time = current_time
# 缩放图像用于显示
resized_frame = cv2.resize(frame, (TARGET_WIDTH, target_height))
display_frame = resized_frame.copy()
# 检测逻辑
detection_active = False
# 按时间间隔检测
if DETECT_BY_TIME and (current_time - last_detect_time) >= DETECT_INTERVAL_SEC:
detection_active = True
last_detect_time = current_time
# 按帧间隔检测
elif DETECT_BY_FRAMES and frame_count % DETECT_INTERVAL_FRAMES == 0:
detection_active = True
# 执行检测
if detection_active:
# 模型推理
results = model(resized_frame, device=DEVICE, verbose=False)
# 绘制检测结果
display_frame = results[0].plot()
# 处理检测结果
for result in results:
boxes = result.boxes
if len(boxes) == 0:
continue
for box in boxes:
class_id = int(box.cls)
class_name = model.names[class_id]
confidence = float(box.conf)
if confidence > CONFIDENCE_THRESHOLD:
# 保存裁剪的对象图像
save_detected_object(frame, box, (orig_width, orig_height), SAVE_DIR, class_name)
# 保存带标注的完整帧
save_annotated_frame(display_frame, os.path.join(SAVE_DIR, "annotated_frames"), class_name)
# 显示 FPS 和状态信息
draw_fps_and_info(display_frame, current_time, (TARGET_WIDTH, target_height), fps, detection_active)
# 显示画面
cv2.imshow('RTSP Object Detection', display_frame)
# 按 Q 键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 结束统计
end_time = time.time()
avg_fps = frame_count / (end_time - start_time)
print(f"\nAverage FPS: {avg_fps:.2f}")
print(f"Processed frames: {frame_count}")
# 释放资源
cap.release()
cv2.destroyAllWindows()
print('Done.')
13-6 使用YOLO进行目标追踪
基本原理

追踪器
- bytetrack
- BoTSort
命令
yolo track model=...
source=…..(视频/摄像头)
tracker=bytetrack.yaml
...
yolo track model=yolo11n. pt source=/Users/lichao/Downloads/video1. mp4 show=True classes=0,1,2 conf=0.5
注意事项
- 我们能不能通过ID来追踪物体? persist
- 当一个物体从视频中消失,再出现时它还能被追踪到吗?
「实战四」使用YOLO进行姿态评估

命令
yolo predict model=yolo11n-pose.pt source=...
姿态AIGym参数
姿态参数
gym = solutions. AlGym(model = yolo11n-pose.pt, view_img=False, line_width=2, pose_type="pushup" kpts=[6,8,10])

实操
# 禁止OpenMP库冲突警告
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
import cv2
from ultralytics import YOLO, solutions
#定义一些变量
MODEL_PATH="yolo11n-pose.pt"
VIDEO_PATH="第13章/fuwocheng.mp4"
#创建GYM对象
gym=solutions.AIGym(
model=MODEL_PATH,
pose_type="pushup",
kpts_to_check=[5,7,9],
view_img=False,
line_width=2
)
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
print("Error: Could not open video.")
exit()
while cap.isOpened:
success, frame = cap.read()
if not success:
break
results = gym.process(frame)
processed_frame = results.plot_im
cv2.imshow("Processed Frame", processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
print("Video processing completed.")
13-8 分割-分类与OBB
分割与目标检测的区别

命令
yolo predict model=yolo11n-seg.pt source=...
整体效果
- mediapipe
- SAM1/SAM2
「实战六」分类
yolo predict model=yolo11n-cls.pt source=…..
「实战七」obb
yolo predict model=yolo11n-obb.pt source=boats.jpg
13-9 export与benchmark
「实战八」export
命令
yolo export model=yolo11n.pt format=[onnx...]
yolo export model=yolo11n.pt format=onnx
常见的模型格式

OpenCV使用onnx的基本步骤
- ·调用cv2.dnn.readNetFromONNX加载模型
- 调用cv2.imread读入原始图片
- 调用cv2.dnn.blobFromimage将图片转成Blob
- 调用net.setlnput(blob)将数据传给模型
- 调用 net.forward()进行推理
「实战九」benchmark
其作用是评估和比较不同格式下 YOLO在特定硬件上的性能
命令
yolo benchmark model=yolo11n.pt device=0
第14章 YOLO高阶知识-训练与部署
14-1 导学
YOLO的训练与部署
Fine-tuning:在原来的模型上通过微调参数来完成某项具体的任务
训练最最重要的事儿

两个实战项目
- 火焰的检测(找数据、使用云服务进行训练)
- 车牌识别(自己收集数据、标注、本机训练)
本章主要内容
- 数据集的准备(收集、标注)
- 标注工具的选择与安装
- 如何标注数据
- 如何训练模型
- 部署模型并测试
- 车牌检测项目
14-2 火焰检测项目-获取数据集
火焰检测的基本步骤
- 获取数据集
- 训练
- 部署与测试
数据集的获取
- 自己采集数据,进行标注
- 网络搜索已知数据集(如COCO)
- 能从网上找到的尽量从网上找
网上获取数据集的方法
- Roboflow.com
- Kaggle.com
- 询问GPT或其它大语言模型
14-3 火焰检测项目-模型训练1
训练
!pip install roboflow
from roboflow import Roboflow
rf = Roboflow(api_key="LwJOcFmzQ7bmj4K5Yf7h")
project = rf.workspace("fire-test-w38ww").project("fire-dji3l")
version = project.version(4)
dataset = version.download("yolov11")
!pip install ultralytics
!nvidia-smi
import numpy as np
import torch
from torchvision.io import read_image
from ultralytics import YOLO
import matplotlib.pyplot as plt
import matplotlib.patches as patches
def display_images(image_list=None, title=None, bboxes_list=None):
if image_list is None:
image_list = []
if title is None:
title = ""
if bboxes_list is None:
bboxes_list = []
#(False, True)
if not any(isinstance(i, list) for i in image_list):
image_list = [image_list] #[img, img, img] => [[img, img, img]]
rows = len(image_list)
cols = max(len(row) if isinstance(row , list) else 1 for row in image_list)
plt.suptitle(title)
fig, ax = plt.subplots(rows, cols)
#确保ax是2D数组
#ax => [[ax]], [ax]=>[[ax]]
ax = np.atleast_2d(ax)
for i, row in enumerate(image_list):
if not isinstance(row, list):
row = [row]
for j, img in enumerate(row):
ax[i, j].imshow(img)
ax[i, j].axis("off")
bbox_index = i * cols + j
if bbox_index < len(bboxes_list):
for bbox in bboxes_list[bbox_index]:
x_min, y_min, x_max, y_max = bbox
width = x_max - x_min
height = y_max - y_min
rect = patches.Rectangle(
(x_min, y_min),
width,
height,
linewidth=2,
edgecolor="r",
facecolor="none"
)
ax[i, j].add_patch(rect)
for j in range(len(row), cols):
ax[i, j].axis("off")
#plt.tight_layout()
plt.show()
org_img_path = '/content/fire--4/train/images/0050_jpg.rf.735bdf9c97cead8499836ed82c003320.jpg'
img = read_image(org_img_path)
display_images(img.permute(1,2,0))
MODEL_PATH='yolo11n.pt'
print('Loading model....')
model = YOLO(MODEL_PATH)
print('Model loaded!')
print('Running inference...')
results = model(org_img_path,device=0)
print('Interence done!')
boxes= []
for result in results:
xyxy = result.boxes.xyxy
boxes.append(xyxy.cpu())
display_images(img.permute(1,2,0), bboxes_list=boxes)
train_results = model.train(data="/content/fire--4/data.yaml", epochs=100, imgsz=640, device=0)
14-4 火焰检测项目-模型训练2
火焰检测的基本步骤
训练
train_results = model.train(data="/content/fire--4/data.yaml", epochs=100, imgsz=640, device=0)
14-5 火焰检测项目-部署与测试
14-6 车牌识别项目-采集数据
这个项目我们全部在本机实现!
车牌识别的基本步骤
- 采集数据集
- 标注数据
- 训练模型
- 部署与测试
采集数据
- 使用手机拍摄
- 专业相机拍摄
- 摄像头拍摄

我用1个小时拍了100多张照片 :再手动将.HEIC转成JPG又花了半个小时
标注数据
- roboflow.com
- label studio(本地)
- labellmg(已不更新,建议使用label studio)
14-7 车牌识别项目-数据标注1
实战标注数据
label-studio的安装与部署
pip install label-studio
label-studio
14-8 车牌识别项目-数据标注2
Roboflow标注数据
14-9 车牌识别项目-模型训练
yolo detect train data=/mnt/d/YOLO11_custom/dataset_custom.yaml model=yololln.pt epochs=100 ingsz=640 device=0
yolo predict model=./runs/detect/train2/weights/best.pt source=/mnt/d/YOLO11_custom/IMG_7461.mp4 show=True
14-10 【实战】车牌识别
「实战」车牌识别
- 从车辆上识别出车牌区域
- 将识别出的区域截取出来
- 使用PaddleOCR识别其中的文字
PaddleOCR
- 百度出品
- 目前是我测试过最好用的OCR
pip install -U paddleocr
使用PaddleOCR
引入paddleocr,from paddleocr import PaddleOCR
创建PaddleOCR对象
文字识别
输出结果
**第15章 注意力机制
15-1 什么是注意力机制
注意力机制与自注意力机制
目前对人工智能影响巨大的算法非Transfomer莫属! 无论是自然语言还是视觉,当前研究的主要方向都是Transfomer!
一点历史
2017年以前,处理序列数据(自然语言文本、时间序列信号、音频等)、主流模型是RNN
- LSTM(Long Short-Term Memory)
- GRU(gated Recurrent Unit)
RNN/LSTM的核心思想
- 像人阅读一样,按顺序处理序列中的每一个元素(比如一个词)它维护一个“记忆”(称为隐藏状态),这个记忆在处理完当前元素后会被更新,并传递给下一步为处理下一个元素做准备
RNN/LSTM的两大挑战
- 难以捕捉长距离依赖关系
- 一篇很长的文章,要理解文末某个代词"它"、指代的是文章开头的某个概念,它需要一步一步传递,序列越长信息越容易“丢失”。像玩“传话游戏”,信息到最后已经面目全非
- 并行计算性差
- RNN的顺序处理机制决定了它必须完成上一步才能执行下一步,无法并行计算
Transfomer正是为解决上面两大痛点而生的!
Transfomer的核心是注意力机制
注意力机制允许输出序列中的某个元素 直接关注输入序列中任何其它位置的元素
- 解决了长距离依赖
- 解决了串行计算
刺激驱动的注意力机制

目标导向的注意力

注意力机制

什么是注意力机制?
注意力机制是一种动态地为输入信息分配注意力权重,从而让输出聚焦于最相关输入部分的机制
注意力机制的输出
对于每个目标token,都会得到一个输出向量,该向量中保存了目标token与源序列中每个token的上下文,其目标是为预测目标序列的下一个token作好准备
15-2 注意力机制的一些细节
注意力机制中的三个关键信息
- Query:告诉注意力机制,“我”对哪项最感兴趣
- Key:指明了有哪些可供查询的项(位置编码后)
- Value:指明了这些项实际包含的信息或内容
来看一个例子
- 输入一句话,如 this is a dog,其中每个词是一个「token」
- 每个token产生一个「Key,Val」作为输入
- 在机器翻译中,目标序列是“这是一只狗”
- 此时,“这”发出「query」,查询源序列中谁与它关系最大
- 得到一个向量,该向量保存了“这”与源序列中各token的关系
注意力机制

- 根据query的变化,动态调整对不同输入的关注度
- 突出重要信息,抑制不重要信息
- 从而让模型做出更好的决策的系统
CNN、FC与注意力机制核心区别

15-3 自注意力机制与注意力机制的区别
什么是自注意力机制?
- 在同一个输入序列中,每个输入产生自己的query,来查询它与其它输入之间的关系,从而找出谁与自己最近,或者说“我”更关注谁。
注意力机制与自注意力机制区别
- 自注意力机制的Query是每个输入自己提供的
- 注意力机制的Query是由外部(目标序列)提供的
- 比如机器翻译,在训练时有源序列和目标序列
- 找到目标序列中某个输入与源序列中哪个输入更密切用注意力机制
- 在同一个句子中,找每个词之间的关系用自注意力机制
举个例子

15-4 注意力机制中的注意力分数
注意力分数
注意力机制的计算公式

如何计算注意力分数呢?

注意力分数等于Qurey与Key的内积
注意力分数的作用
- 注意力分数用于判断查询者与各个输入的密切程度,分值越高密切度越高
举个例子

15-5 注意力机制中的缩放因子

注意力机制的计算公式

为什么注意力分数要除以/dk?
- 如果Q、K向量维度很大,它们的点积结果抖动就会很大


Softmax对输入的大小很敏感
- 如果得分很近,,如0.5,0.2,-0.1,Softmax分配较平滑(0.5,0.3,0.2)
- 如果得分抖动大,如25,3,-1,Softmax几乎把所有权重都给了最高分,其它为0
- 这种情况称为梯度消失,模型就无法进行训练了

- 假设Q和K中的元素是均值为0,方差为1的独立随机变量
- 那么每个qi・ki其方差为1
- 所以Q・K的方差为dk(也就是k的维度)
- 为了让Q・K的方差在1左右,要除以dk,做一下缩放
15-6 自注意力机制的具体操作过程
自注意力机制的计算公式

详细解释
- 创建Wq、Wk、Wv三个权重矩阵
- 这三个权重矩阵是自注意力机制要通过神经网络学习到的
- 对于任意输入a',分别让它同时乘以Wq、Wk、Wv
- 得到Q、K、V三个向量
- 然后让Q・KT得到注意力分数
- 对所有输入的注意力分数执行 softmax
- 求出各输入与其它输入之间的关注度α(α之和为1)
- 最后用α・V,得到最终的结果
自注意力机制的输入与输出

计算α的方法


将注意力分数转为权重

输出向量的计算

自注意力机制的具体实现
15-7 自注意力机制的矩阵化
输入矩阵化

Q、K、V的产生




注意力分数

最终结果

