AI分析时间不固定,且需要尽量确保所有生成的图片都能被上传,即使某些图片的分析时间较长或偶尔失败。为避免因等待或重试导致漏掉图片,我们需要设计一个更健壮的机制,确保每个图片的处理流程(生成、移动、监控、上传)都能独立完成,且不会因超时或失败而丢失任务。以下是优化方案,重点解决“尽量都上传”和避免漏掉图片的问题,并结合并行计算技术,提供详细的C#实现。

问题分析

  1. AI分析时间不确定

    • 之前的方案使用固定超时(30秒)和最大重试次数(3次),可能导致某些图片因AI分析延迟而被跳过。
    • 解决方案:移除固定超时,改为持久化任务状态,确保每个图片任务最终完成。
  2. 避免漏掉图片

    • 使用一个持久化任务队列(例如基于文件的记录或数据库),记录每个图片的处理状态(生成、移动、监控、上传)。
    • 如果监控失败,任务重新进入队列稍后重试,而不是直接丢弃。
    • 使用独立的任务跟踪,确保每个图片有唯一的处理流程。
  3. 并行处理优化

    • 继续使用多线程并行处理,结合 SemaphoreSlim 控制并发。
    • 使用 ConcurrentBag 或数据库存储未完成的任务状态,定期检查并重试。
  4. 上传保证

    • 确保每个图片在AI分析完成后都能上传,即使需要多次重试。
    • 使用异步上传和重试机制,处理网络或服务器问题。

优化方案

  1. 持久化任务状态

    • 使用简单的JSON文件或轻量级数据库(如SQLite)记录每个图片任务的状态(文件名、生成时间、当前状态等)。
    • 状态包括:Generated(已生成)、Moved(已移动到AI分析目录)、Analyzed(AI分析完成)、Uploaded(已上传)。
    • 程序启动时加载未完成任务,优先处理。
  2. 无限重试机制

    • 移除固定超时和最大重试次数,改为持续监控,直到AI分析完成。
    • 如果监控失败(例如文件系统错误),将任务标记为待重试,稍后重新进入队列。
  3. 并行处理

    • 使用 Parallel.ForEachAsync 并行处理任务队列,结合 SemaphoreSlim 控制并发。
    • 每个任务独立运行,互不干扰,主线程持续生成图片。
  4. 异步上传

    • 使用异步 HttpClient(或模拟上传)确保上传不阻塞其他任务。
    • 添加上传重试机制,处理网络或服务器故障。
  5. 日志和监控

    • 记录每个任务的状态变化和错误,便于调试和跟踪。
    • 可选:添加监控仪表盘,显示队列长度、任务状态等。

实现步骤

  1. 任务状态管理

    • 使用JSON文件(tasks.json)记录任务状态,包含文件名、状态和时间戳。
    • 程序启动时加载未完成任务(状态非 Uploaded)。
  2. 队列和并发

    • 使用 ConcurrentQueue 存储待处理文件名。
    • 使用 ConcurrentBag 存储任务状态,线程安全。
    • SemaphoreSlim 限制最大并发任务数(例如10)。
  3. 无限重试

    • 监控AI分析输出时,不设置超时,持续等待直到文件出现。
    • 如果发生错误(例如文件系统异常),任务重新进入队列。
  4. 目录结构(与之前一致):

    • 原始图片生成目录:C:\GeneratedImages
    • AI分析输入目录:C:\AIAnalysisInput
    • AI分析输出目录:C:\AIAnalysisOutput
    • 上传后存档目录:C:\UploadedImages
    • 任务状态和日志:C:\Logs\tasks.jsonC:\Logs\log.txt

完整代码

以下是优化后的C#程序代码,包含持久化任务状态、无限重试和并行处理:

using System;
using System.Collections.Concurrent;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

public class TaskState
{
    public string FileName { get; set; }
    public string Status { get; set; } // Generated, Moved, Analyzed, Uploaded
    public DateTime CreatedAt { get; set; }
    public DateTime LastUpdatedAt { get; set; }
}

class Program
{
    // 定义目录路径
    private static readonly string GeneratedImagesPath = @"C:\GeneratedImages";
    private static readonly string AIAnalysisInputPath = @"C:\AIAnalysisInput";
    private static readonly string AIAnalysisOutputPath = @"C:\AIAnalysisOutput";
    private static readonly string UploadedImagesPath = @"C:\UploadedImages";
    private static readonly string LogPath = @"C:\Logs\log.txt";
    private static readonly string TaskStatePath = @"C:\Logs\tasks.json";

    // 线程安全队列和任务状态
    private static readonly ConcurrentQueue<string> ImageQueue = new ConcurrentQueue<string>();
    private static readonly ConcurrentBag<TaskState> TaskStates = new ConcurrentBag<TaskState>();

    // 控制并发任务数量
    private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(10, 10); // 最大10个并发任务
    private static readonly int MaxQueueSize = 1000; // 队列最大长度
    private static readonly int BatchSize = 5; // 每批生成图片数量

    static async Task Main(string[] args)
    {
        // 确保所有目录存在
        CreateDirectories();

        // 加载未完成任务
        LoadTaskStates();

        // 启动后台任务处理队列
        Task processTask = Task.Run(() => ProcessQueueAsync());

        // 主线程无限循环生成图片
        int imageCounter = 1;
        while (true)
        {
            // 检查队列大小,防止过载
            if (ImageQueue.Count >= MaxQueueSize)
            {
                Log($"队列已满({ImageQueue.Count}),等待处理...");
                await Task.Delay(1000);
                continue;
            }

            // 批量生成图片
            var batch = new List<string>();
            for (int i = 0; i < BatchSize; i++)
            {
                string fileName = $"image_{DateTime.Now:yyyyMMddHHmmssfff}_{imageCounter++}.png";
                batch.Add(fileName);
            }

            // 并行生成图片
            Parallel.ForEach(batch, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, fileName =>
            {
                string generatedImagePath = Path.Combine(GeneratedImagesPath, fileName);
                GenerateImage(generatedImagePath);
                Log($"已生成图片: {fileName}");
                ImageQueue.Enqueue(fileName);
                UpdateTaskState(fileName, "Generated");
                Log($"已将图片加入队列: {fileName}");
            });

            // 控制生成速度
            await Task.Delay(1000); // 每批次间隔1秒
        }
    }

    // 创建必要的目录
    private static void CreateDirectories()
    {
        Directory.CreateDirectory(GeneratedImagesPath);
        Directory.CreateDirectory(AIAnalysisInputPath);
        Directory.CreateDirectory(AIAnalysisOutputPath);
        Directory.CreateDirectory(UploadedImagesPath);
        Directory.CreateDirectory(Path.GetDirectoryName(LogPath));
    }

    // 加载未完成任务
    private static void LoadTaskStates()
    {
        try
        {
            if (File.Exists(TaskStatePath))
            {
                string json = File.ReadAllText(TaskStatePath);
                var tasks = JsonSerializer.Deserialize<List<TaskState>>(json);
                foreach (var task in tasks.Where(t => t.Status != "Uploaded"))
                {
                    TaskStates.Add(task);
                    if (task.Status == "Generated" || task.Status == "Moved")
                    {
                        ImageQueue.Enqueue(task.FileName);
                        Log($"恢复未完成任务: {task.FileName} ({task.Status})");
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Log($"加载任务状态失败: {ex.Message}");
        }
    }

    // 更新任务状态并保存
    private static void UpdateTaskState(string fileName, string status)
    {
        var task = TaskStates.FirstOrDefault(t => t.FileName == fileName);
        if (task == null)
        {
            task = new TaskState { FileName = fileName, Status = status, CreatedAt = DateTime.Now, LastUpdatedAt = DateTime.Now };
            TaskStates.Add(task);
        }
        else
        {
            task.Status = status;
            task.LastUpdatedAt = DateTime.Now;
        }

        try
        {
            var json = JsonSerializer.Serialize(TaskStates.ToList(), new JsonSerializerOptions { WriteIndented = true });
            File.WriteAllText(TaskStatePath, json);
        }
        catch (Exception ex)
        {
            Log($"保存任务状态失败: {ex.Message}");
        }
    }

    // 生成一张简单的图片
    private static void GenerateImage(string filePath)
    {
        try
        {
            using (Bitmap bitmap = new Bitmap(200, 200))
            {
                using (Graphics g = Graphics.FromImage(bitmap))
                {
                    Random rand = new Random();
                    g.Clear(Color.FromArgb(rand.Next(256), rand.Next(256), rand.Next(256)));
                    g.DrawString(Path.GetFileName(filePath), new Font("Arial", 12), Brushes.White, 10, 10);
                }
                bitmap.Save(filePath, System.Drawing.Imaging.ImageFormat.Png);
            }
        }
        catch (Exception ex)
        {
            Log($"生成图片 {filePath} 失败: {ex.Message}");
        }
    }

    // 处理队列中的图片
    private static async Task ProcessQueueAsync()
    {
        while (true)
        {
            // 批量取出队列中的图片
            var batch = new List<string>();
            while (batch.Count < BatchSize && ImageQueue.TryDequeue(out string fileName))
            {
                batch.Add(fileName);
            }

            if (batch.Any())
            {
                // 并行处理批次中的图片
                await Parallel.ForEachAsync(batch, new ParallelOptions { MaxDegreeOfParallelism = Semaphore.CurrentCount }, async (fileName, ct) =>
                {
                    await Semaphore.WaitAsync(ct);
                    try
                    {
                        await ProcessImageAsync(fileName);
                    }
                    finally
                    {
                        Semaphore.Release();
                    }
                });
            }

            // 避免过快轮询
            await Task.Delay(100);
        }
    }

    // 处理单个图片:移动、监控、上传
    private static async Task ProcessImageAsync(string fileName)
    {
        try
        {
            string generatedImagePath = Path.Combine(GeneratedImagesPath, fileName);
            string aiInputPath = Path.Combine(AIAnalysisInputPath, fileName);

            // 1. 移动图片到AI分析输入目录
            if (TaskStates.FirstOrDefault(t => t.FileName == fileName)?.Status == "Generated")
            {
                if (File.Exists(generatedImagePath))
                {
                    using (var sourceStream = new FileStream(generatedImagePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
                    using (var destStream = new FileStream(aiInputPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
                    {
                        await sourceStream.CopyToAsync(destStream);
                    }
                    File.Delete(generatedImagePath);
                    UpdateTaskState(fileName, "Moved");
                    Log($"已移动图片到AI分析目录: {aiInputPath}");
                }
                else
                {
                    Log($"图片不存在: {generatedImagePath}");
                    UpdateTaskState(fileName, "Failed");
                    return;
                }
            }

            // 2. 等待AI分析后的图片并上传
            await WaitForAnalyzedImage(fileName);
        }
        catch (Exception ex)
        {
            Log($"处理图片 {fileName} 时出错: {ex.Message}");
            ImageQueue.Enqueue(fileName); // 出错时重新入队
        }
    }

    // 等待AI分析后的图片并上传
    private static async Task WaitForAnalyzedImage(string fileName)
    {
        string analyzedImagePath = Path.Combine(AIAnalysisOutputPath, fileName);
        Log($"正在等待AI分析结果: {fileName}");

        while (TaskStates.FirstOrDefault(t => t.FileName == fileName)?.Status == "Moved")
        {
            try
            {
                var tcs = new TaskCompletionSource<bool>();
                using (FileSystemWatcher watcher = new FileSystemWatcher
                {
                    Path = AIAnalysisOutputPath,
                    Filter = fileName,
                    EnableRaisingEvents = true
                })
                {
                    watcher.Created += (sender, e) =>
                    {
                        if (e.Name == fileName)
                        {
                            tcs.TrySetResult(true);
                        }
                    };

                    // 等待文件出现,无固定超时
                    await tcs.Task;
                    UpdateTaskState(fileName, "Analyzed");
                    Log($"检测到AI分析后的图片: {analyzedImagePath}");

                    // 3. 上传图片
                    await UploadImage(analyzedImagePath, fileName);
                    UpdateTaskState(fileName, "Uploaded");
                    return; // 成功上传,退出
                }
            }
            catch (Exception ex)
            {
                Log($"监控图片 {fileName} 时出错: {ex.Message}");
                await Task.Delay(5000); // 出错后等待5秒重试
            }
        }
    }

    // 模拟上传图片(可以替换为实际的上传逻辑)
    private static async Task UploadImage(string filePath, string fileName)
    {
        int maxRetries = 3;
        int retryCount = 0;

        while (retryCount < maxRetries)
        {
            try
            {
                // 模拟上传延迟
                await Task.Delay(1000);
                Log($"正在上传图片: {fileName}");

                // 将图片移动到上传后的存档目录
                string uploadedPath = Path.Combine(UploadedImagesPath, fileName);
                using (var sourceStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
                using (var destStream = new FileStream(uploadedPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
                {
                    await sourceStream.CopyToAsync(destStream);
                }
                File.Delete(filePath);
                Log($"图片上传并存档到: {uploadedPath}");
                return;
            }
            catch (Exception ex)
            {
                retryCount++;
                Log($"上传图片 {fileName} 失败(尝试 {retryCount}/{maxRetries}): {ex.Message}");
                if (retryCount < maxRetries)
                {
                    await Task.Delay(5000); // 等待5秒后重试
                }
            }
        }

        Log($"上传图片 {fileName} 失败,超过最大重试次数");
        ImageQueue.Enqueue(fileName); // 上传失败重新入队
    }

    // 记录日志到文件和控制台
    private static void Log(string message)
    {
        string logMessage = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff} - {message}";
        Console.WriteLine(logMessage);
        try
        {
            File.AppendAllText(LogPath, logMessage + Environment.NewLine);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"写入日志失败: {ex.Message}");
        }
    }
}

优化点说明

  1. 持久化任务状态

    • 使用 TaskState 类记录每个图片的状态,存储在 C:\Logs\tasks.json
    • 程序启动时加载未完成任务(状态非 Uploaded),加入队列继续处理。
    • 每次状态更新(生成、移动、分析完成、上传)都保存到JSON文件。
  2. 无限重试机制

    • 移除 WaitForAnalyzedImage 的固定超时,持续监控直到AI分析完成。
    • 如果监控失败(例如文件系统错误),任务重新入队,稍后重试。
    • 上传失败时,最多重试3次,失败后重新入队,确保不丢失任务。
  3. 并行处理

    • 使用 Parallel.ForEach 并行生成图片,每批次 BatchSize = 5
    • 使用 Parallel.ForEachAsync 并行处理队列中的任务,SemaphoreSlim 限制最大并发数为10。
    • 异步IO(FileStream.CopyToAsync)用于文件移动和上传,减少阻塞。
  4. 队列管理

    • 使用 ConcurrentQueue 存储待处理文件名,线程安全。
    • 队列大小限制为1000(MaxQueueSize),防止内存溢出。
    • 失败任务重新入队,确保不丢失。
  5. 日志和错误处理

    • 每个操作(生成、移动、监控、上传)记录详细日志,包含时间戳。
    • 异常捕获和重新入队机制确保任务不丢失。
    • 任务状态持久化到JSON文件,支持程序重启后恢复。

使用说明

  1. 环境要求

    • 需要 .NET 6 或更高版本(Parallel.ForEachAsync 需要 .NET 6+)。
    • 安装 System.Drawing.CommonSystem.Text.Json 包:
      dotnet add package System.Drawing.Common
      dotnet add package System.Text.Json
      
  2. 配置目录

    • 修改目录路径(GeneratedImagesPath 等)为实际路径。
    • 确保AI分析软件读取 AIAnalysisInputPath 的图片,输出同名图片到 AIAnalysisOutputPath
    • 任务状态文件(TaskStatePath)和日志文件(LogPath)需要写权限。
  3. 运行程序

    • 程序启动时加载未完成任务(tasks.json 中状态非 Uploaded 的任务)。
    • 主线程每1秒生成一批(5张)图片,加入队列。
    • 后台任务并行处理队列中的图片,最多10个并发任务。
    • 每个任务异步移动图片、持续监控AI分析结果并上传。
    • 日志记录所有操作,任务状态保存到 C:\Logs\tasks.json
  4. 性能调整

    • 批次大小:调整 BatchSize(当前为5),根据AI分析速度优化。
    • 并发度:修改 SemaphoreSlim 的最大并发数(当前为10),例如:
      private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2, Environment.ProcessorCount * 2);
      
    • 生成速度:调整主线程的 Task.Delay(1000),例如500ms。
    • 队列大小:调整 MaxQueueSize(当前为1000)。
    • 上传重试:修改 maxRetries(当前为3)或重试间隔(Task.Delay(5000))。

示例运行流程

假设目录配置如上,程序运行后:

  1. 加载 C:\Logs\tasks.json,恢复未完成任务(例如上次未上传的图片)。
  2. 主线程生成一批图片(例如 image_202509171705001_1.png_5.png),保存到 C:\GeneratedImages,加入队列,更新任务状态为 Generated
  3. 后台任务从队列取出最多5个文件名,异步移动到 C:\AIAnalysisInput,更新状态为 Moved
  4. 每个任务持续监控 C:\AIAnalysisOutput 的同名图片,成功后更新状态为 Analyzed
  5. 异步上传(移动到 C:\UploadedImages),更新状态为 Uploaded
  6. 失败任务(移动、监控、上传)重新入队,稍后重试。
  7. 日志记录每个步骤,任务状态保存到 tasks.json

进一步优化建议

  1. 使用数据库

    • 替换JSON文件为SQLite数据库,存储任务状态,支持更高效的查询和更新:
      dotnet add package Microsoft.Data.Sqlite
      
      private static void UpdateTaskState(string fileName, string status)
      {
          using (var connection = new SqliteConnection("Data Source=tasks.db"))
          {
              connection.Open();
              var command = connection.CreateCommand();
              command.CommandText = "INSERT OR REPLACE INTO Tasks (FileName, Status, CreatedAt, LastUpdatedAt) VALUES ($fileName, $status, $createdAt, $lastUpdatedAt)";
              command.Parameters.AddWithValue("$fileName", fileName);
              command.Parameters.AddWithValue("$status", status);
              command.Parameters.AddWithValue("$createdAt", TaskStates.FirstOrDefault(t => t.FileName == fileName)?.CreatedAt ?? DateTime.Now);
              command.Parameters.AddWithValue("$lastUpdatedAt", DateTime.Now);
              command.ExecuteNonQuery();
          }
      }
      
  2. 动态并发控制

    • 根据系统负载动态调整并发度:
      var cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");
      int maxConcurrency = cpuCounter.NextValue() > 80 ? 5 : Environment.ProcessorCount * 2;
      
  3. 实际上传API

    • 替换 UploadImage 为异步HTTP上传:
      private static async Task UploadImage(string filePath, string fileName)
      {
          int maxRetries = 3;
          int retryCount = 0;
      
          while (retryCount < maxRetries)
          {
              try
              {
                  using (var client = new HttpClient())
                  {
                      using (var content = new MultipartFormDataContent())
                      {
                          var fileBytes = await File.ReadAllBytesAsync(filePath);
                          content.Add(new ByteArrayContent(fileBytes), "file", fileName);
                          var response = await client.PostAsync("https://your-api-endpoint/upload", content);
                          response.EnsureSuccessStatusCode();
                          Log($"图片上传成功: {fileName}");
                      }
                  }
                  string uploadedPath = Path.Combine(UploadedImagesPath, fileName);
                  await using (var sourceStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
                  await using (var destStream = new FileStream(uploadedPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
                  {
                      await sourceStream.CopyToAsync(destStream);
                  }
                  File.Delete(filePath);
                  Log($"图片存档到: {uploadedPath}");
                  return;
              }
              catch (Exception ex)
              {
                  retryCount++;
                  Log($"上传图片 {fileName} 失败(尝试 {retryCount}/{maxRetries}): {ex.Message}");
                  if (retryCount < maxRetries)
                  {
                      await Task.Delay(5000);
                  }
              }
          }
          Log($"上传图片 {fileName} 失败,重新入队");
          ImageQueue.Enqueue(fileName);
      }
      
  4. 性能监控

    • 添加性能指标,记录每批次处理时间和队列状态:
      Log($"处理批次({batch.Count}张)耗时: {(DateTime.Now - startTime).TotalSeconds:F2}秒,队列剩余: {ImageQueue.Count},未完成任务: {TaskStates.Count(t => t.Status != "Uploaded")}");
      
  5. 退出机制

    • 添加退出条件,例如检测文件或用户输入:
      if (File.Exists(@"C:\StopSignal.txt"))
      {
          Log("检测到停止信号,退出程序...");
          break;
      }
      

注意事项

  1. AI分析软件:确保AI软件稳定输出同名图片到 AIAnalysisOutputPath
  2. 文件系统性能
    • 并行生成和移动图片可能导致IO瓶颈,建议根据磁盘性能调整 BatchSize 和并发度。
    • 使用异步IO减少阻塞。
  3. 任务状态文件
    • tasks.json 可能随任务量增大而变大,建议定期清理已上传任务或使用数据库。
  4. 网络上传
    • 实际上传API需考虑网络延迟和重试策略。
    • 可添加上传队列,限制并发上传请求。

总结

这个方案通过持久化任务状态、无限重试机制和并行处理,确保AI分析时间不确定时也能尽量上传所有图片。任务状态保存在JSON文件中,支持程序重启后恢复;失败任务重新入队,保证不丢失;Parallel.ForEachAsyncSemaphoreSlim 提供高效并行处理。如果需要进一步优化(例如数据库存储、特定上传API或性能监控),请提供更多细节,我可以继续完善代码!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐