AI分析时间不固定,且需要尽量确保所有生成的图片都能被上传,即使某些图片的分析时间较长或偶尔失败
这个方案通过持久化任务状态、无限重试机制和并行处理,确保AI分析时间不确定时也能尽量上传所有图片。任务状态保存在JSON文件中,支持程序重启后恢复;失败任务重新入队,保证不丢失;和提供高效并行处理。如果需要进一步优化(例如数据库存储、特定上传API或性能监控),请提供更多细节,我可以继续完善代码!
AI分析时间不固定,且需要尽量确保所有生成的图片都能被上传,即使某些图片的分析时间较长或偶尔失败。为避免因等待或重试导致漏掉图片,我们需要设计一个更健壮的机制,确保每个图片的处理流程(生成、移动、监控、上传)都能独立完成,且不会因超时或失败而丢失任务。以下是优化方案,重点解决“尽量都上传”和避免漏掉图片的问题,并结合并行计算技术,提供详细的C#实现。
问题分析
-
AI分析时间不确定:
- 之前的方案使用固定超时(30秒)和最大重试次数(3次),可能导致某些图片因AI分析延迟而被跳过。
- 解决方案:移除固定超时,改为持久化任务状态,确保每个图片任务最终完成。
-
避免漏掉图片:
- 使用一个持久化任务队列(例如基于文件的记录或数据库),记录每个图片的处理状态(生成、移动、监控、上传)。
- 如果监控失败,任务重新进入队列稍后重试,而不是直接丢弃。
- 使用独立的任务跟踪,确保每个图片有唯一的处理流程。
-
并行处理优化:
- 继续使用多线程并行处理,结合
SemaphoreSlim
控制并发。 - 使用
ConcurrentBag
或数据库存储未完成的任务状态,定期检查并重试。
- 继续使用多线程并行处理,结合
-
上传保证:
- 确保每个图片在AI分析完成后都能上传,即使需要多次重试。
- 使用异步上传和重试机制,处理网络或服务器问题。
优化方案
-
持久化任务状态:
- 使用简单的JSON文件或轻量级数据库(如SQLite)记录每个图片任务的状态(文件名、生成时间、当前状态等)。
- 状态包括:
Generated
(已生成)、Moved
(已移动到AI分析目录)、Analyzed
(AI分析完成)、Uploaded
(已上传)。 - 程序启动时加载未完成任务,优先处理。
-
无限重试机制:
- 移除固定超时和最大重试次数,改为持续监控,直到AI分析完成。
- 如果监控失败(例如文件系统错误),将任务标记为待重试,稍后重新进入队列。
-
并行处理:
- 使用
Parallel.ForEachAsync
并行处理任务队列,结合SemaphoreSlim
控制并发。 - 每个任务独立运行,互不干扰,主线程持续生成图片。
- 使用
-
异步上传:
- 使用异步
HttpClient
(或模拟上传)确保上传不阻塞其他任务。 - 添加上传重试机制,处理网络或服务器故障。
- 使用异步
-
日志和监控:
- 记录每个任务的状态变化和错误,便于调试和跟踪。
- 可选:添加监控仪表盘,显示队列长度、任务状态等。
实现步骤
-
任务状态管理:
- 使用JSON文件(
tasks.json
)记录任务状态,包含文件名、状态和时间戳。 - 程序启动时加载未完成任务(状态非
Uploaded
)。
- 使用JSON文件(
-
队列和并发:
- 使用
ConcurrentQueue
存储待处理文件名。 - 使用
ConcurrentBag
存储任务状态,线程安全。 SemaphoreSlim
限制最大并发任务数(例如10)。
- 使用
-
无限重试:
- 监控AI分析输出时,不设置超时,持续等待直到文件出现。
- 如果发生错误(例如文件系统异常),任务重新进入队列。
-
目录结构(与之前一致):
- 原始图片生成目录:
C:\GeneratedImages
- AI分析输入目录:
C:\AIAnalysisInput
- AI分析输出目录:
C:\AIAnalysisOutput
- 上传后存档目录:
C:\UploadedImages
- 任务状态和日志:
C:\Logs\tasks.json
和C:\Logs\log.txt
- 原始图片生成目录:
完整代码
以下是优化后的C#程序代码,包含持久化任务状态、无限重试和并行处理:
using System;
using System.Collections.Concurrent;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
public class TaskState
{
public string FileName { get; set; }
public string Status { get; set; } // Generated, Moved, Analyzed, Uploaded
public DateTime CreatedAt { get; set; }
public DateTime LastUpdatedAt { get; set; }
}
class Program
{
// 定义目录路径
private static readonly string GeneratedImagesPath = @"C:\GeneratedImages";
private static readonly string AIAnalysisInputPath = @"C:\AIAnalysisInput";
private static readonly string AIAnalysisOutputPath = @"C:\AIAnalysisOutput";
private static readonly string UploadedImagesPath = @"C:\UploadedImages";
private static readonly string LogPath = @"C:\Logs\log.txt";
private static readonly string TaskStatePath = @"C:\Logs\tasks.json";
// 线程安全队列和任务状态
private static readonly ConcurrentQueue<string> ImageQueue = new ConcurrentQueue<string>();
private static readonly ConcurrentBag<TaskState> TaskStates = new ConcurrentBag<TaskState>();
// 控制并发任务数量
private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(10, 10); // 最大10个并发任务
private static readonly int MaxQueueSize = 1000; // 队列最大长度
private static readonly int BatchSize = 5; // 每批生成图片数量
static async Task Main(string[] args)
{
// 确保所有目录存在
CreateDirectories();
// 加载未完成任务
LoadTaskStates();
// 启动后台任务处理队列
Task processTask = Task.Run(() => ProcessQueueAsync());
// 主线程无限循环生成图片
int imageCounter = 1;
while (true)
{
// 检查队列大小,防止过载
if (ImageQueue.Count >= MaxQueueSize)
{
Log($"队列已满({ImageQueue.Count}),等待处理...");
await Task.Delay(1000);
continue;
}
// 批量生成图片
var batch = new List<string>();
for (int i = 0; i < BatchSize; i++)
{
string fileName = $"image_{DateTime.Now:yyyyMMddHHmmssfff}_{imageCounter++}.png";
batch.Add(fileName);
}
// 并行生成图片
Parallel.ForEach(batch, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, fileName =>
{
string generatedImagePath = Path.Combine(GeneratedImagesPath, fileName);
GenerateImage(generatedImagePath);
Log($"已生成图片: {fileName}");
ImageQueue.Enqueue(fileName);
UpdateTaskState(fileName, "Generated");
Log($"已将图片加入队列: {fileName}");
});
// 控制生成速度
await Task.Delay(1000); // 每批次间隔1秒
}
}
// 创建必要的目录
private static void CreateDirectories()
{
Directory.CreateDirectory(GeneratedImagesPath);
Directory.CreateDirectory(AIAnalysisInputPath);
Directory.CreateDirectory(AIAnalysisOutputPath);
Directory.CreateDirectory(UploadedImagesPath);
Directory.CreateDirectory(Path.GetDirectoryName(LogPath));
}
// 加载未完成任务
private static void LoadTaskStates()
{
try
{
if (File.Exists(TaskStatePath))
{
string json = File.ReadAllText(TaskStatePath);
var tasks = JsonSerializer.Deserialize<List<TaskState>>(json);
foreach (var task in tasks.Where(t => t.Status != "Uploaded"))
{
TaskStates.Add(task);
if (task.Status == "Generated" || task.Status == "Moved")
{
ImageQueue.Enqueue(task.FileName);
Log($"恢复未完成任务: {task.FileName} ({task.Status})");
}
}
}
}
catch (Exception ex)
{
Log($"加载任务状态失败: {ex.Message}");
}
}
// 更新任务状态并保存
private static void UpdateTaskState(string fileName, string status)
{
var task = TaskStates.FirstOrDefault(t => t.FileName == fileName);
if (task == null)
{
task = new TaskState { FileName = fileName, Status = status, CreatedAt = DateTime.Now, LastUpdatedAt = DateTime.Now };
TaskStates.Add(task);
}
else
{
task.Status = status;
task.LastUpdatedAt = DateTime.Now;
}
try
{
var json = JsonSerializer.Serialize(TaskStates.ToList(), new JsonSerializerOptions { WriteIndented = true });
File.WriteAllText(TaskStatePath, json);
}
catch (Exception ex)
{
Log($"保存任务状态失败: {ex.Message}");
}
}
// 生成一张简单的图片
private static void GenerateImage(string filePath)
{
try
{
using (Bitmap bitmap = new Bitmap(200, 200))
{
using (Graphics g = Graphics.FromImage(bitmap))
{
Random rand = new Random();
g.Clear(Color.FromArgb(rand.Next(256), rand.Next(256), rand.Next(256)));
g.DrawString(Path.GetFileName(filePath), new Font("Arial", 12), Brushes.White, 10, 10);
}
bitmap.Save(filePath, System.Drawing.Imaging.ImageFormat.Png);
}
}
catch (Exception ex)
{
Log($"生成图片 {filePath} 失败: {ex.Message}");
}
}
// 处理队列中的图片
private static async Task ProcessQueueAsync()
{
while (true)
{
// 批量取出队列中的图片
var batch = new List<string>();
while (batch.Count < BatchSize && ImageQueue.TryDequeue(out string fileName))
{
batch.Add(fileName);
}
if (batch.Any())
{
// 并行处理批次中的图片
await Parallel.ForEachAsync(batch, new ParallelOptions { MaxDegreeOfParallelism = Semaphore.CurrentCount }, async (fileName, ct) =>
{
await Semaphore.WaitAsync(ct);
try
{
await ProcessImageAsync(fileName);
}
finally
{
Semaphore.Release();
}
});
}
// 避免过快轮询
await Task.Delay(100);
}
}
// 处理单个图片:移动、监控、上传
private static async Task ProcessImageAsync(string fileName)
{
try
{
string generatedImagePath = Path.Combine(GeneratedImagesPath, fileName);
string aiInputPath = Path.Combine(AIAnalysisInputPath, fileName);
// 1. 移动图片到AI分析输入目录
if (TaskStates.FirstOrDefault(t => t.FileName == fileName)?.Status == "Generated")
{
if (File.Exists(generatedImagePath))
{
using (var sourceStream = new FileStream(generatedImagePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
using (var destStream = new FileStream(aiInputPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
{
await sourceStream.CopyToAsync(destStream);
}
File.Delete(generatedImagePath);
UpdateTaskState(fileName, "Moved");
Log($"已移动图片到AI分析目录: {aiInputPath}");
}
else
{
Log($"图片不存在: {generatedImagePath}");
UpdateTaskState(fileName, "Failed");
return;
}
}
// 2. 等待AI分析后的图片并上传
await WaitForAnalyzedImage(fileName);
}
catch (Exception ex)
{
Log($"处理图片 {fileName} 时出错: {ex.Message}");
ImageQueue.Enqueue(fileName); // 出错时重新入队
}
}
// 等待AI分析后的图片并上传
private static async Task WaitForAnalyzedImage(string fileName)
{
string analyzedImagePath = Path.Combine(AIAnalysisOutputPath, fileName);
Log($"正在等待AI分析结果: {fileName}");
while (TaskStates.FirstOrDefault(t => t.FileName == fileName)?.Status == "Moved")
{
try
{
var tcs = new TaskCompletionSource<bool>();
using (FileSystemWatcher watcher = new FileSystemWatcher
{
Path = AIAnalysisOutputPath,
Filter = fileName,
EnableRaisingEvents = true
})
{
watcher.Created += (sender, e) =>
{
if (e.Name == fileName)
{
tcs.TrySetResult(true);
}
};
// 等待文件出现,无固定超时
await tcs.Task;
UpdateTaskState(fileName, "Analyzed");
Log($"检测到AI分析后的图片: {analyzedImagePath}");
// 3. 上传图片
await UploadImage(analyzedImagePath, fileName);
UpdateTaskState(fileName, "Uploaded");
return; // 成功上传,退出
}
}
catch (Exception ex)
{
Log($"监控图片 {fileName} 时出错: {ex.Message}");
await Task.Delay(5000); // 出错后等待5秒重试
}
}
}
// 模拟上传图片(可以替换为实际的上传逻辑)
private static async Task UploadImage(string filePath, string fileName)
{
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries)
{
try
{
// 模拟上传延迟
await Task.Delay(1000);
Log($"正在上传图片: {fileName}");
// 将图片移动到上传后的存档目录
string uploadedPath = Path.Combine(UploadedImagesPath, fileName);
using (var sourceStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
using (var destStream = new FileStream(uploadedPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
{
await sourceStream.CopyToAsync(destStream);
}
File.Delete(filePath);
Log($"图片上传并存档到: {uploadedPath}");
return;
}
catch (Exception ex)
{
retryCount++;
Log($"上传图片 {fileName} 失败(尝试 {retryCount}/{maxRetries}): {ex.Message}");
if (retryCount < maxRetries)
{
await Task.Delay(5000); // 等待5秒后重试
}
}
}
Log($"上传图片 {fileName} 失败,超过最大重试次数");
ImageQueue.Enqueue(fileName); // 上传失败重新入队
}
// 记录日志到文件和控制台
private static void Log(string message)
{
string logMessage = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff} - {message}";
Console.WriteLine(logMessage);
try
{
File.AppendAllText(LogPath, logMessage + Environment.NewLine);
}
catch (Exception ex)
{
Console.WriteLine($"写入日志失败: {ex.Message}");
}
}
}
优化点说明
-
持久化任务状态:
- 使用
TaskState
类记录每个图片的状态,存储在C:\Logs\tasks.json
。 - 程序启动时加载未完成任务(状态非
Uploaded
),加入队列继续处理。 - 每次状态更新(生成、移动、分析完成、上传)都保存到JSON文件。
- 使用
-
无限重试机制:
- 移除
WaitForAnalyzedImage
的固定超时,持续监控直到AI分析完成。 - 如果监控失败(例如文件系统错误),任务重新入队,稍后重试。
- 上传失败时,最多重试3次,失败后重新入队,确保不丢失任务。
- 移除
-
并行处理:
- 使用
Parallel.ForEach
并行生成图片,每批次BatchSize = 5
。 - 使用
Parallel.ForEachAsync
并行处理队列中的任务,SemaphoreSlim
限制最大并发数为10。 - 异步IO(
FileStream.CopyToAsync
)用于文件移动和上传,减少阻塞。
- 使用
-
队列管理:
- 使用
ConcurrentQueue
存储待处理文件名,线程安全。 - 队列大小限制为1000(
MaxQueueSize
),防止内存溢出。 - 失败任务重新入队,确保不丢失。
- 使用
-
日志和错误处理:
- 每个操作(生成、移动、监控、上传)记录详细日志,包含时间戳。
- 异常捕获和重新入队机制确保任务不丢失。
- 任务状态持久化到JSON文件,支持程序重启后恢复。
使用说明
-
环境要求:
- 需要 .NET 6 或更高版本(
Parallel.ForEachAsync
需要 .NET 6+)。 - 安装
System.Drawing.Common
和System.Text.Json
包:dotnet add package System.Drawing.Common dotnet add package System.Text.Json
- 需要 .NET 6 或更高版本(
-
配置目录:
- 修改目录路径(
GeneratedImagesPath
等)为实际路径。 - 确保AI分析软件读取
AIAnalysisInputPath
的图片,输出同名图片到AIAnalysisOutputPath
。 - 任务状态文件(
TaskStatePath
)和日志文件(LogPath
)需要写权限。
- 修改目录路径(
-
运行程序:
- 程序启动时加载未完成任务(
tasks.json
中状态非Uploaded
的任务)。 - 主线程每1秒生成一批(5张)图片,加入队列。
- 后台任务并行处理队列中的图片,最多10个并发任务。
- 每个任务异步移动图片、持续监控AI分析结果并上传。
- 日志记录所有操作,任务状态保存到
C:\Logs\tasks.json
。
- 程序启动时加载未完成任务(
-
性能调整:
- 批次大小:调整
BatchSize
(当前为5),根据AI分析速度优化。 - 并发度:修改
SemaphoreSlim
的最大并发数(当前为10),例如:private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2, Environment.ProcessorCount * 2);
- 生成速度:调整主线程的
Task.Delay(1000)
,例如500ms。 - 队列大小:调整
MaxQueueSize
(当前为1000)。 - 上传重试:修改
maxRetries
(当前为3)或重试间隔(Task.Delay(5000)
)。
- 批次大小:调整
示例运行流程
假设目录配置如上,程序运行后:
- 加载
C:\Logs\tasks.json
,恢复未完成任务(例如上次未上传的图片)。 - 主线程生成一批图片(例如
image_202509171705001_1.png
到_5.png
),保存到C:\GeneratedImages
,加入队列,更新任务状态为Generated
。 - 后台任务从队列取出最多5个文件名,异步移动到
C:\AIAnalysisInput
,更新状态为Moved
。 - 每个任务持续监控
C:\AIAnalysisOutput
的同名图片,成功后更新状态为Analyzed
。 - 异步上传(移动到
C:\UploadedImages
),更新状态为Uploaded
。 - 失败任务(移动、监控、上传)重新入队,稍后重试。
- 日志记录每个步骤,任务状态保存到
tasks.json
。
进一步优化建议
-
使用数据库:
- 替换JSON文件为SQLite数据库,存储任务状态,支持更高效的查询和更新:
dotnet add package Microsoft.Data.Sqlite
private static void UpdateTaskState(string fileName, string status) { using (var connection = new SqliteConnection("Data Source=tasks.db")) { connection.Open(); var command = connection.CreateCommand(); command.CommandText = "INSERT OR REPLACE INTO Tasks (FileName, Status, CreatedAt, LastUpdatedAt) VALUES ($fileName, $status, $createdAt, $lastUpdatedAt)"; command.Parameters.AddWithValue("$fileName", fileName); command.Parameters.AddWithValue("$status", status); command.Parameters.AddWithValue("$createdAt", TaskStates.FirstOrDefault(t => t.FileName == fileName)?.CreatedAt ?? DateTime.Now); command.Parameters.AddWithValue("$lastUpdatedAt", DateTime.Now); command.ExecuteNonQuery(); } }
- 替换JSON文件为SQLite数据库,存储任务状态,支持更高效的查询和更新:
-
动态并发控制:
- 根据系统负载动态调整并发度:
var cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total"); int maxConcurrency = cpuCounter.NextValue() > 80 ? 5 : Environment.ProcessorCount * 2;
- 根据系统负载动态调整并发度:
-
实际上传API:
- 替换
UploadImage
为异步HTTP上传:private static async Task UploadImage(string filePath, string fileName) { int maxRetries = 3; int retryCount = 0; while (retryCount < maxRetries) { try { using (var client = new HttpClient()) { using (var content = new MultipartFormDataContent()) { var fileBytes = await File.ReadAllBytesAsync(filePath); content.Add(new ByteArrayContent(fileBytes), "file", fileName); var response = await client.PostAsync("https://your-api-endpoint/upload", content); response.EnsureSuccessStatusCode(); Log($"图片上传成功: {fileName}"); } } string uploadedPath = Path.Combine(UploadedImagesPath, fileName); await using (var sourceStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true)) await using (var destStream = new FileStream(uploadedPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true)) { await sourceStream.CopyToAsync(destStream); } File.Delete(filePath); Log($"图片存档到: {uploadedPath}"); return; } catch (Exception ex) { retryCount++; Log($"上传图片 {fileName} 失败(尝试 {retryCount}/{maxRetries}): {ex.Message}"); if (retryCount < maxRetries) { await Task.Delay(5000); } } } Log($"上传图片 {fileName} 失败,重新入队"); ImageQueue.Enqueue(fileName); }
- 替换
-
性能监控:
- 添加性能指标,记录每批次处理时间和队列状态:
Log($"处理批次({batch.Count}张)耗时: {(DateTime.Now - startTime).TotalSeconds:F2}秒,队列剩余: {ImageQueue.Count},未完成任务: {TaskStates.Count(t => t.Status != "Uploaded")}");
- 添加性能指标,记录每批次处理时间和队列状态:
-
退出机制:
- 添加退出条件,例如检测文件或用户输入:
if (File.Exists(@"C:\StopSignal.txt")) { Log("检测到停止信号,退出程序..."); break; }
- 添加退出条件,例如检测文件或用户输入:
注意事项
- AI分析软件:确保AI软件稳定输出同名图片到
AIAnalysisOutputPath
。 - 文件系统性能:
- 并行生成和移动图片可能导致IO瓶颈,建议根据磁盘性能调整
BatchSize
和并发度。 - 使用异步IO减少阻塞。
- 并行生成和移动图片可能导致IO瓶颈,建议根据磁盘性能调整
- 任务状态文件:
tasks.json
可能随任务量增大而变大,建议定期清理已上传任务或使用数据库。
- 网络上传:
- 实际上传API需考虑网络延迟和重试策略。
- 可添加上传队列,限制并发上传请求。
总结
这个方案通过持久化任务状态、无限重试机制和并行处理,确保AI分析时间不确定时也能尽量上传所有图片。任务状态保存在JSON文件中,支持程序重启后恢复;失败任务重新入队,保证不丢失;Parallel.ForEachAsync
和 SemaphoreSlim
提供高效并行处理。如果需要进一步优化(例如数据库存储、特定上传API或性能监控),请提供更多细节,我可以继续完善代码!
更多推荐
所有评论(0)