OrangePi上运行Stable Diffusion

Stable Diffusion是目前主流的AI绘图模型,但一般要求有英伟达的独立显卡,难以在OrangePi这种arm架构的设备上运行。我尝试了在我的OrangePi5Plus上运行了huggingface上的sd-1.5模型,经过到onnx再到rknn的模型转化,最后还做了一个webUI。下面给出流程,方便参考。

提醒:该方案最后精度实际上不高,没有太大的实用价值,因此只用于学习参考。

材料准备

主机阶段

在x86Linux主机上进行,最好预留至少90GB的硬盘空间,保证至少16GB的运行内存(不然可能被kill进程).
orange_pi_sd_project/
├── scripts/ # 存放所有转换脚本
│ ├── 01_merge_models.py # 模型合并脚本
│ ├── 02_convert_to_onnx.py # ONNX转换脚本
│ └── 03_convert_to_rknn.py # RKNN转换脚本
├── working/ # 工作目录(存放中间文件)
│ ├── merged_models/ # 合并后的模型
│ ├── onnx_models/ # ONNX格式模型
│ └── rknn_models/ # 最终RKNN模型
|── calib_data/ # 校准数据集

模型下载

从huggingface上下载sd-v1.5原始模型和LCM-LoRA适配器。

合并模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from diffusers import StableDiffusionPipeline
import torch
import os
LOCAL_SD15_PATH = "./models/stable-diffusion-v1-5" # SD1.5原始模型目录
LOCAL_LCM_LORA_PATH = "./models/lcm-lora-sdv1-5" # LCM-LoRA适配器目录
MERGED_PATH = "./lcm_sd15_merged" # 合并后模型保存路径
def validate_model_directories():
"""只检查必要的核心文件"""
# SD1.5 必须的文件
sd_required = [
"model_index.json",
"unet/config.json",
"unet/diffusion_pytorch_model.bin",
"vae/config.json",
"text_encoder/config.json"
]
# LCM-LoRA 只需要权重文件
lcm_required = [
"unet/diffusion_pytorch_model.safetensors"
]
missing_files = []
# 验证 SD1.5
for file in sd_required:
path = os.path.join(LOCAL_SD15_PATH, file)
if not os.path.exists(path):
missing_files.append(path)
# 验证 LCM-LoRA
for file in lcm_required:
path = os.path.join(LOCAL_LCM_LORA_PATH, file)
if not os.path.exists(path):
missing_files.append(path)
if missing_files:
print("❌ 错误: 以下必需文件缺失:")
for file in missing_files:
print(f" - {file}")
print("请确保模型已正确下载")
exit(1)
else:
print("✅ 所有必需模型文件存在")
print("🔍 验证本地模型文件...")
validate_model_directories()
print("⏳ 加载Stable Diffusion 1.5基础模型...")
try:
pipe = StableDiffusionPipeline.from_pretrained(
LOCAL_SD15_PATH,
torch_dtype=torch.float32,
safety_checker=None
)
print("✅ SD1.5模型加载成功")
except Exception as e:
print(f"❌ SD1.5模型加载失败: {str(e)}")
exit(1)
print("⏳ 加载并应用LCM-LoRA适配器...")
try:
# 直接加载LCM-LoRA权重到管道
pipe.load_lora_weights(LOCAL_LCM_LORA_PATH)
# 重要:融合LoRA权重到主模型
pipe.fuse_lora()
print("✅ LCM-LoRA适配器加载成功")
# 设置LCM调度器
from diffusers import LCMScheduler
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config)
print("✅ LCM调度器设置完成")
except Exception as e:
print(f"❌ LCM-LoRA加载失败: {str(e)}")
exit(1)
print("⏳ 保存合并后的模型...")
try:
# 仅保存必要的组件
pipe.save_pretrained(
MERGED_PATH,
safe_serialization=True
)
print(f"✅ 合并模型已保存到: {MERGED_PATH}")
except Exception as e:
print(f"❌ 模型保存失败: {str(e)}")
exit(1)

这是01_merge_models.py,不赘述

转化到ONNX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import torch
import os
import sys
from pathlib import Path
from diffusers import StableDiffusionPipeline
import onnx
import time
import warnings
import logging
import numpy as np
warnings.filterwarnings("ignore", category=torch.jit.TracerWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("diffusers").setLevel(logging.ERROR)
model_path = "../working/merged_models" # 合并后模型路径
onnx_path = "../working/onnx_models" # ONNX模型输出路径
device = "cuda" if torch.cuda.is_available() else "cpu"
Path(onnx_path).mkdir(parents=True, exist_ok=True)
print("🔍 验证模型目录...")
if not os.path.exists(model_path):
print(f"❌ 错误: 模型目录不存在 - {model_path}")
print("请先运行模型合并脚本")
sys.exit(1)
print(f"✅ 模型目录验证成功: {model_path}")
print("⏳ 加载合并后的模型管道...")
start_time = time.time()
try:
# 使用更安全的方式加载模型
pipe = StableDiffusionPipeline.from_pretrained(
model_path,
torch_dtype=torch.float16,
use_safetensors=True,
safety_checker=None,
feature_extractor=None,
requires_safety_checker=False
).to(device)
# 设置模型为评估模式
pipe.text_encoder.eval()
pipe.unet.eval()
pipe.vae.eval()
load_time = time.time() - start_time
print(f"✅ 模型加载成功! 耗时: {load_time:.1f}秒")
except Exception as e:
print(f"❌ 模型加载失败: {str(e)}")
sys.exit(1)
def safe_onnx_export(model, args, f, **kwargs):
"""处理TracerWarning的安全导出函数"""
print(f" 开始导出模型...")
try:
# 临时禁用警告
with warnings.catch_warnings():
warnings.simplefilter("ignore")
torch.onnx.export(model, args, f, **kwargs)
return True
except Exception as e:
print(f" 导出过程中出错: {str(e)}")
return False
print("\n" + "="*50)
print("🔄 开始转换Text Encoder到ONNX格式...")
try:
text_encoder = pipe.text_encoder
text_encoder.eval()
# 准备输入
input_ids = torch.ones((1, 77), dtype=torch.long, device=device)
# 导出ONNX
encoder_path = f"{onnx_path}/text_encoder.onnx"
if not safe_onnx_export(
text_encoder,
(input_ids,),
encoder_path,
input_names=["input_ids"],
output_names=["text_embeddings"],
dynamic_axes={
"input_ids": {0: "batch"},
"text_embeddings": {0: "batch"}
},
opset_version=14
):
raise RuntimeError("Text Encoder导出失败")
# 验证输出
print(f"✅ Text Encoder转换成功! 保存到: {encoder_path}")
print(f" - 模型大小: {os.path.getsize(encoder_path)/1024/1024:.1f} MB")
except Exception as e:
print(f"❌ Text Encoder转换失败: {str(e)}")
sys.exit(1)
print("\n" + "="*50)
print("🔄 开始转换UNet到ONNX格式...")
try:
unet = pipe.unet
unet.eval()
# 准备输入 - 使用更小的batch size避免内存问题
sample = torch.randn(1, 4, 64, 64).to(device, torch.float16)
timestep = torch.tensor([1]).to(device, torch.float16)
encoder_hidden_states = torch.randn(1, 77, 768).to(device, torch.float16)
# 导出ONNX
unet_path = f"{onnx_path}/unet.onnx"
if not safe_onnx_export(
unet,
(sample, timestep, encoder_hidden_states),
unet_path,
input_names=["sample", "timestep", "encoder_hidden_states"],
output_names=["noise_pred"],
dynamic_axes={
"sample": {0: "batch"},
"encoder_hidden_states": {0: "batch"},
"noise_pred": {0: "batch"}
},
opset_version=14
):
raise RuntimeError("UNet导出失败")
# 验证输出
print(f"✅ UNet转换成功! 保存到: {unet_path}")
print(f" - 模型大小: {os.path.getsize(unet_path)/1024/1024:.1f} MB")
except Exception as e:
print(f"❌ UNet转换失败: {str(e)}")
sys.exit(1)
print("\n" + "="*50)
print("🔄 开始转换VAE解码器到ONNX格式...")
try:
vae = pipe.vae
vae.eval()
# 准备输入 - 修正通道问题
# 注意:VAE解码器实际期望输入为 [batch, 4, height, width]
latent = torch.randn(1, 4, 64, 64).to(device, torch.float16)
# 创建VAE解码器的包装类,解决通道问题
class VAEDecoderWrapper(torch.nn.Module):
def __init__(self, vae):
super().__init__()
self.decoder = vae.decoder
self.post_quant_conv = vae.post_quant_conv
def forward(self, latent):
# 正确的处理流程
latent = self.post_quant_conv(latent)
return self.decoder(latent)
# 使用包装后的模型
vae_decoder_wrapper = VAEDecoderWrapper(vae).to(device).eval()
print("✅ 已创建VAE解码器包装器解决通道问题")
# 导出ONNX
vae_path = f"{onnx_path}/vae_decoder.onnx"
if not safe_onnx_export(
vae_decoder_wrapper,
latent,
vae_path,
input_names=["latent"],
output_names=["image"],
opset_version=14
):
raise RuntimeError("VAE解码器导出失败")
# 验证输出
print(f"✅ VAE解码器转换成功! 保存到: {vae_path}")
print(f" - 模型大小: {os.path.getsize(vae_path)/1024/1024:.1f} MB")
# 额外验证输出形状
with torch.no_grad():
output = vae_decoder_wrapper(latent)
print(f" - 输出形状: {output.shape} (应为 [1, 3, 512, 512])")
except Exception as e:
print(f"❌ VAE解码器转换失败: {str(e)}")
# 提供详细错误信息
import traceback
traceback.print_exc()
sys.exit(1)
print("\n" + "="*50)
print("🛠️ 开始优化ONNX模型...")
def optimize_onnx_model(input_path, output_path):
"""优化ONNX模型并转换为FP16"""
try:
# 简化模型
print(f" 正在简化模型: {os.path.basename(input_path)}")
model = onnx.load(input_path)
# 使用ONNX Simplifier
try:
import onnxsim
model_simp, check = onnxsim.simplify(model)
if not check:
print(" 警告: 模型简化验证失败,但将继续使用简化模型")
simplified_model = model_simp
except ImportError:
print(" 警告: onnxsim未安装,跳过简化步骤")
simplified_model = model
# 保存简化后的模型
simplified_path = input_path.replace(".onnx", "_sim.onnx")
onnx.save(simplified_model, simplified_path)
# 转换为FP16
print(f" 正在转换为FP16: {os.path.basename(input_path)}")
try:
from onnxconverter_common import float16
fp16_model = float16.convert_float_to_float16(simplified_model, keep_io_types=True)
except ImportError:
print(" 警告: onnxconverter-common未安装,跳过FP16转换")
fp16_model = simplified_model
# 保存优化后的模型
onnx.save(fp16_model, output_path)
# 验证优化后的模型
try:
onnx.checker.check_model(output_path)
print(" ✅ 模型验证通过")
return True
except onnx.checker.ValidationError as e:
print(f" ⚠️ 模型验证警告: {str(e)}")
return True # 即使有警告也继续
except Exception as e:
print(f" 优化失败: {str(e)}")
return False
success = True
for model_name in ["text_encoder", "unet", "vae_decoder"]:
input_path = f"{onnx_path}/{model_name}.onnx"
output_path = f"{onnx_path}/{model_name}_fp16.onnx"
print(f"\n🔧 优化 {model_name}...")
if optimize_onnx_model(input_path, output_path):
print(f"✅ {model_name}优化成功! 保存到: {output_path}")
print(f" - 优化后大小: {os.path.getsize(output_path)/1024/1024:.1f} MB")
else:
print(f"❌ {model_name}优化失败")
success = False
print("\n" + "="*50)
total_time = time.time() - start_time
if success:
print(f"🎉 所有模型转换完成! 总耗时: {total_time/60:.1f}分钟")
print("下一步: 运行RKNN转换脚本")
else:
print(f"⚠️ 转换完成但有错误! 总耗时: {total_time/60:.1f}分钟")
print("建议: 检查错误信息并尝试重新运行")
print("="*50)

这是02_convert_to_onnx.py脚本,有注释和充足的输出提示,不赘述

转化到RKNN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import os
import numpy as np
from rknn.api import RKNN
import time
import logging
import sys
import shutil
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('rknn_conversion.log')
])
logger = logging.getLogger("RKNN_Converter")
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ONNX_MODEL_DIR = os.path.abspath(os.path.join(BASE_DIR, "./onnx_models"))
RKNN_MODEL_DIR = os.path.abspath(os.path.join(BASE_DIR, "./rknn_models"))
CALIB_DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "./calib_data"))
TARGET_PLATFORM = "rk3588"
os.makedirs(RKNN_MODEL_DIR, exist_ok=True)
os.makedirs(CALIB_DATA_DIR, exist_ok=True)
SUPPORTED_QUANT_TYPES = {
"rk3588": ["asymmetric_quantized-8", "dynamic_fixed_point-8", "float16"]
}
MODEL_CONFIGS = {
"text_encoder": {
"input_nodes": ["input_ids"],
"input_shapes": [[1, 77]],
"calib_data": {
"input_ids": os.path.join(CALIB_DATA_DIR, "text_encoder_input.npy")
},
"quantized_dtype": "asymmetric_quantized-8", # RK3588支持的量化类型
"optimization_level": 3,
"quantized_method": "channel" # 量化方法
},
"unet": {
"input_nodes": ["sample", "timestep", "encoder_hidden_states"],
"input_shapes": [[1, 4, 64, 64], [1], [1, 77, 768]],
"calib_data": {
"sample": os.path.join(CALIB_DATA_DIR, "unet_sample.npy"),
"encoder_hidden_states": os.path.join(CALIB_DATA_DIR, "unet_emb.npy")
},
"quantized_dtype": "asymmetric_quantized-8", # RK3588支持的量化类型
"optimization_level": 2, # 降低优化级别避免内存问题
"quantized_method": "layer" # 量化方法
},
"vae_decoder": {
"input_nodes": ["latent"],
"input_shapes": [[1, 4, 64, 64]],
"calib_data": {
"latent": os.path.join(CALIB_DATA_DIR, "unet_sample.npy") # 复用UNet的样本
},
"quantized_dtype": "asymmetric_quantized-8", # RK3588支持的量化类型
"optimization_level": 3,
"quantized_method": "channel" # 量化方法
}
}
def check_model_exists(model_name):
"""检查RKNN模型是否已存在"""
rknn_path = os.path.join(RKNN_MODEL_DIR, f"{model_name}.rknn")
if os.path.exists(rknn_path):
logger.info(f"✅ 检测到已存在的RKNN模型: {model_name}.rknn")
logger.info(f" 大小: {os.path.getsize(rknn_path)/1024/1024:.1f} MB")
return True
return False
def create_calib_dataset(model_name):
"""创建校准数据集并返回文件路径"""
# 创建特定于模型的校准目录
model_calib_dir = os.path.join(CALIB_DATA_DIR, f"{model_name}_calib")
if os.path.exists(model_calib_dir):
shutil.rmtree(model_calib_dir) # 清理旧数据
os.makedirs(model_calib_dir, exist_ok=True)
# 生成校准样本 (减少到3个以加快处理)
num_samples = 3
logger.info(f" 生成 {num_samples} 个校准样本...")
# 为每个输入节点创建单独的数据文件
input_files = []
for node_name in MODEL_CONFIGS[model_name]["input_nodes"]:
# 找到对应的输入形状
shape = None
for j, node in enumerate(MODEL_CONFIGS[model_name]["input_nodes"]):
if node == node_name:
shape = MODEL_CONFIGS[model_name]["input_shapes"][j]
break
if shape:
# 为每个输入节点创建数据文件
node_files = []
for i in range(num_samples):
# 创建随机数据
data_path = os.path.join(model_calib_dir, f"{node_name}_{i}.npy")
# 对于时间步长特殊处理
if node_name == "timestep":
data = np.array([i], dtype=np.float32) # 简单递增
else:
data = np.random.rand(*shape).astype(np.float32)
np.save(data_path, data)
node_files.append(data_path)
input_files.append(node_files)
# 创建数据集文件
calib_file_path = os.path.join(model_calib_dir, "calib_dataset.txt")
with open(calib_file_path, "w") as f:
# 写入每个样本的数据
for i in range(num_samples):
sample_line = []
for node_files in input_files:
sample_line.append(node_files[i])
f.write(" ".join(sample_line) + "\n")
logger.info(f" 创建校准数据集文件: {calib_file_path}")
return calib_file_path
def convert_model(model_name):
"""转换单个模型到RKNN格式"""
logger.info(f"🚀 开始转换 {model_name.upper()} 模型...")
# 检查是否已完成
if check_model_exists(model_name):
return True
# 初始化RKNN对象
rknn = RKNN(verbose=True)
try:
# 步骤1: 配置参数
logger.info(" 设置配置参数...")
# 使用RK3588支持的量化类型
quant_type = MODEL_CONFIGS[model_name].get("quantized_dtype", "asymmetric_quantized-8")
# 验证量化类型是否受支持
if quant_type not in SUPPORTED_QUANT_TYPES[TARGET_PLATFORM]:
logger.warning(f" 量化类型 {quant_type} 不被 {TARGET_PLATFORM} 支持,使用默认值")
quant_type = "asymmetric_quantized-8"
config_params = {
'target_platform': TARGET_PLATFORM,
'quantized_dtype': quant_type,
'optimization_level': MODEL_CONFIGS[model_name].get("optimization_level", 3),
'quantized_method': MODEL_CONFIGS[model_name].get("quantized_method", "channel"),
}
# 添加可选参数 - 只使用布尔值
if model_name == "unet":
config_params['output_optimize'] = True # 只保留output_optimize参数
# 已移除sparse_infer参数,因为rk3588不支持
logger.info(f" 量化配置: {config_params['quantized_dtype']}")
logger.info(f" 优化级别: {config_params['optimization_level']}")
logger.info(f" 量化方法: {config_params['quantized_method']}")
# 检查配置参数是否有效
logger.info(" 验证配置参数...")
ret = rknn.config(**config_params)
if ret != 0:
logger.error(f"❌ 配置失败! 错误码: {ret}")
# 尝试使用更安全的配置
logger.warning("⚠️ 尝试使用默认配置...")
safe_config = {
'target_platform': TARGET_PLATFORM,
'quantized_dtype': "asymmetric_quantized-8",
'optimization_level': 1,
}
ret = rknn.config(**safe_config)
if ret != 0:
logger.error("❌ 安全配置也失败,终止转换")
return False
# 步骤2: 加载ONNX模型
onnx_path = os.path.join(ONNX_MODEL_DIR, f"{model_name}_fp16.onnx")
if not os.path.exists(onnx_path):
logger.error(f"❌ ONNX模型文件不存在: {onnx_path}")
return False
logger.info(f" 加载ONNX模型: {os.path.basename(onnx_path)}")
logger.info(f" 输入节点: {MODEL_CONFIGS[model_name]['input_nodes']}")
logger.info(f" 输入形状: {MODEL_CONFIGS[model_name]['input_shapes']}")
ret = rknn.load_onnx(
model=onnx_path,
inputs=MODEL_CONFIGS[model_name]["input_nodes"],
input_size_list=MODEL_CONFIGS[model_name]["input_shapes"]
)
if ret != 0:
logger.error(f"❌ 加载ONNX模型失败! 错误码: {ret}")
return False
# 步骤3: 构建模型
logger.info(" 构建RKNN模型...")
# 创建校准数据集文件
calib_file_path = create_calib_dataset(model_name)
# 验证校准文件
if not os.path.exists(calib_file_path):
logger.error(f"❌ 校准文件不存在: {calib_file_path}")
return False
# 打印校准文件内容用于调试
logger.info(f" 校准文件内容预览:")
with open(calib_file_path, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines[:min(2, len(lines))]): # 只显示前2行
logger.info(f" 样本 {i+1}: {line.strip()}")
start_time = time.time()
# 构建参数 - 只保留基本参数
build_params = {
'do_quantization': True,
'dataset': calib_file_path,
}
# 对于UNet模型添加特殊提示
if model_name == "unet":
logger.info(" UNet检测到 - 使用最小构建参数")
logger.info(" ⚠️ 注意: UNet转换可能需要较长时间和较大内存")
ret = rknn.build(**build_params)
build_time = time.time() - start_time
if ret != 0:
logger.error(f"❌ 构建失败! 错误码: {ret}")
logger.error(" 可能原因: 内存不足或不支持的算子")
# 对于UNet尝试更低优化级别
if model_name == "unet":
logger.warning("⚠️ 尝试使用优化级别0重新构建UNet...")
rknn.config(optimization_level=0)
start_time = time.time()
ret = rknn.build(do_quantization=True, dataset=calib_file_path)
build_time = time.time() - start_time
if ret != 0:
logger.error("❌ 优化级别0也失败,终止转换")
return False
logger.info(f" 构建成功! 耗时: {build_time:.1f}秒")
logger.info(f" 构建成功! 耗时: {build_time:.1f}秒")
# 步骤4: 导出RKNN模型
rknn_path = os.path.join(RKNN_MODEL_DIR, f"{model_name}.rknn")
logger.info(f" 导出RKNN模型到: {rknn_path}")
ret = rknn.export_rknn(rknn_path)
if ret != 0:
logger.error(f"❌ 导出失败! 错误码: {ret}")
return False
# 验证模型大小
if os.path.exists(rknn_path):
size_mb = os.path.getsize(rknn_path) / (1024 * 1024)
logger.info(f"✅ {model_name.upper()} 转换成功! 大小: {size_mb:.1f} MB")
# 验证模型是否可以加载
logger.info(" 验证模型加载...")
rknn_lite = RKNN()
ret = rknn_lite.load_rknn(rknn_path)
if ret != 0:
logger.warning("⚠️ 模型加载验证失败")
else:
logger.info("✅ 模型加载验证成功")
rknn_lite.release()
return True
else:
logger.error("❌ 导出文件未生成")
return False
except Exception as e:
logger.error(f"❌ 转换过程中发生异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
finally:
# 释放资源
rknn.release()
def main():
logger.info("=" * 60)
logger.info(f"📡 开始RKNN模型转换 - 目标平台: {TARGET_PLATFORM}")
logger.info(f"🔧 当前工作目录: {os.getcwd()}")
logger.info(f"🔧 项目根目录: {BASE_DIR}")
logger.info(f"🔧 ONNX模型目录: {ONNX_MODEL_DIR}")
logger.info(f"💾 RKNN输出目录: {RKNN_MODEL_DIR}")
logger.info(f"📊 校准数据目录: {CALIB_DATA_DIR}")
logger.info(f"📊 支持的量化类型: {SUPPORTED_QUANT_TYPES[TARGET_PLATFORM]}")
logger.info("=" * 60)
# 检查环境
if not os.path.exists(ONNX_MODEL_DIR):
logger.error("❌ 错误: ONNX模型目录不存在")
logger.error("请先完成ONNX转换")
return
# 确保校准数据目录存在
os.makedirs(CALIB_DATA_DIR, exist_ok=True)
# 转换顺序 - 先转换小型模型测试环境
models_to_convert = ["text_encoder", "vae_decoder", "unet"]
success = True
for model_name in models_to_convert:
if not convert_model(model_name):
logger.error(f"❌ {model_name.upper()} 转换失败,终止流程")
success = False
break
# 结果报告
logger.info("=" * 60)
if success:
logger.info("🎉 所有模型转换完成!")
logger.info(f"RKNN模型已保存到: {RKNN_MODEL_DIR}")
# 显示最终模型大小
total_size = 0
for model_name in models_to_convert:
model_path = os.path.join(RKNN_MODEL_DIR, f"{model_name}.rknn")
if os.path.exists(model_path):
size = os.path.getsize(model_path) / (1024 * 1024)
total_size += size
logger.info(f" - {model_name}.rknn: {size:.1f} MB")
logger.info(f"💾 总大小: {total_size:.1f} MB")
else:
logger.error("💥 转换过程出错,请检查日志文件")
logger.info("=" * 60)
logger.info("下一步: 将RKNN模型复制到Orange Pi进行推理")
logger.info("=" * 60)
if __name__ == "__main__":
start_time = time.time()
main()
logger.info(f"总耗时: {(time.time() - start_time)/60:.1f} 分钟")

这是03_convert_to_rknn.py,不赘述

至此,我们就得到了rknn的模型。 之后就只要把模型导到OrangePi上运行即可。

OrangePi阶段

webUI相对灵活,样式风格多,不给出方法,仅给出推理脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np
from rknnlite.api import RKNNLite
from PIL import Image
import time
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("SD-RKNN")
MODELS = {
"text_encoder": "rknn_models/text_encoder.rknn",
"unet": "rknn_models/unet.rknn",
"vae_decoder": "rknn_models/vae_decoder.rknn"
}
class StableDiffusionRKNN:
def __init__(self):
self.models = {}
self.rknn_instances = {}
self.load_models()
def load_models(self):
"""加载所有RKNN模型"""
logger.info("="*50)
logger.info("🚀 正在加载Stable Diffusion模型")
start_time = time.time()
# 加载顺序:文本编码器 -> UNet -> VAE解码器
self.load_model("text_encoder")
self.load_model("unet")
self.load_model("vae_decoder")
logger.info(f"✅ 所有模型加载完成! 耗时: {time.time()-start_time:.1f}秒")
logger.info("="*50)
def load_model(self, name):
"""加载单个模型"""
logger.info(f"🔧 加载 {name} 模型...")
start_time = time.time()
rknn = RKNNLite()
path = MODELS[name]
# 1. 加载RKNN模型
ret = rknn.load_rknn(path)
if ret != 0:
raise Exception(f"加载 {name} 模型失败: 错误码 {ret}")
# 2. 初始化运行时 (使用NPU核心0)
ret = rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0)
if ret != 0:
raise Exception(f"初始化 {name} 运行时失败: 错误码 {ret}")
self.rknn_instances[name] = rknn
logger.info(f"✅ {name} 加载成功! 耗时: {time.time()-start_time:.1f}秒")
def text_encoder(self, input_ids):
"""文本编码器推理"""
logger.info("📝 运行文本编码器...")
start = time.time()
# 执行推理
outputs = self.rknn_instances["text_encoder"].inference(inputs=[input_ids])
# 返回文本嵌入
text_embeddings = outputs[0]
logger.info(f"✅ 文本编码完成! 耗时: {time.time()-start:.2f}秒")
return text_embeddings
def unet(self, latent, timestep, text_embeddings):
"""UNet推理"""
logger.info("🎨 运行UNet...")
start = time.time()
# 执行推理
outputs = self.rknn_instances["unet"].inference(
inputs=[latent, timestep, text_embeddings]
)
# 返回噪声预测
noise_pred = outputs[0]
logger.info(f"✅ UNet完成! 耗时: {time.time()-start:.2f}秒")
return noise_pred
def vae_decoder(self, latent):
"""VAE解码器推理"""
logger.info("🖼️ 运行VAE解码器...")
start = time.time()
# 执行推理
outputs = self.rknn_instances["vae_decoder"].inference(inputs=[latent])
# 返回图像
image = outputs[0]
logger.info(f"✅ VAE解码完成! 耗时: {time.time()-start:.2f}秒")
return image
def generate_image(self, prompt, steps=20, guidance_scale=7.5, seed=42):
"""生成图像主函数"""
logger.info(f"\n🎨 开始生成图像: '{prompt}'")
logger.info(f"⚙️ 参数: {steps}步, 引导比例: {guidance_scale}, 种子: {seed}")
# 设置随机种子 (确保可重复性)
np.random.seed(seed)
# 1. 文本编码 (简化处理)
# 实际应用中应使用分词器将prompt转换为input_ids
input_ids = np.ones((1, 77), dtype=np.int64)
text_embeddings = self.text_encoder(input_ids)
# 2. 准备初始潜在变量
latents = np.random.randn(1, 4, 64, 64).astype(np.float32)
# 3. 扩散过程
for step in range(steps):
logger.info(f"⏳ 扩散步骤 {step+1}/{steps}")
# 时间步长 (简化处理)
t = np.array([step / steps * 1000], dtype=np.float32)
# UNet预测噪声
noise_pred = self.unet(latents, t, text_embeddings)
# 更新潜在变量 (简化处理)
latents = latents - 0.1 * noise_pred
# 4. VAE解码生成图像
image = self.vae_decoder(latents)
# 5. 后处理并保存图像
# 将输出转换为0-255范围的图像
image = image.transpose(0, 2, 3, 1)[0] # 转换维度 (BCHW -> BHWC)
image = ((image + 1) * 127.5).clip(0, 255).astype(np.uint8)
# 创建PIL图像
img = Image.fromarray(image)
return img
def release(self):
"""释放所有模型资源"""
logger.info("\n🔌 释放模型资源...")
for name, rknn in self.rknn_instances.items():
rknn.release()
logger.info(f"✅ {name} 已释放")
sd_instance = None
def get_sd_instance():
"""获取Stable Diffusion实例 (单例)"""
global sd_instance
if sd_instance is None:
sd_instance = StableDiffusionRKNN()
return sd_instance
if __name__ == "__main__":
# 测试生成
sd = get_sd_instance()
try:
user_input = input("type the prompt")
image = sd.generate_image(user_input, steps=15)
image.save("test_output.png")
print("✅ 测试图像已保存: test_output.png")
finally:
sd.release()

运行它就可得到结果了

方法多有不足,本人在人工智能方面没有什么基础因此希望读者多多包涵。


OrangePi上运行Stable Diffusion
http://example.com/2025/06/28/OrangePi上运行Stable-Diffusion/
作者
edbaeadaab
发布于
2025年6月28日
许可协议