往期文章可见

Spring Ai Alibaba Graph源码解读系列—核心启动类

Spring Ai Alibaba Graph源码解读系列—action

Spring AI Alibaba Graph:中断!人类反馈介入,流程丝滑走完~

Spring AI Alibaba Graph:分配MCP到指定节点

Spring AI Alibaba Graph:节点流式透传案例

Spring AI Alibaba Graph:多节点并行—快速上手

Spring AI Alibaba Graph:快速入门

可付费(69.9元)获取飞书云文档在线版预览,及https://github.com/GTyingzi/spring-ai-tutorial教程项目会员群答疑服务

本期是Spring Ai Aliabab Graph的Stream流机制

  • • 项目源码地址:https://github.com/GTyingzi/spring-ai-alibaba/tree/main/spring-ai-alibaba-graph-core

stream 流

AsyncGeneratorOperators

接口类,本身不包含任何字段,通过抽象方法和默认方法定义了一组操作符功能

方法
描述
AsyncGenerator.Data next()
用于获取下一个数据元素
Executor executor()
返回执行器,在当前线程执行
AsyncGenerator map(Function
package com.alibaba.cloud.ai.graph.async;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.concurrent.CompletableFuture.completedFuture;

public interface AsyncGeneratorOperators<E> {

    AsyncGenerator.Data<E> next();

    default Executor executor() {
       return Runnable::run;
    }

    /**
     * Maps the elements of this generator to a new asynchronous generator.
     * @param mapFunction the function to map elements to a new asynchronous counterpart
     * @param <U> the type of elements in the new generator
     * @return a generator with mapped elements
     */
    default <U> AsyncGenerator<U> map(Function<E, U> mapFunction) {
       return () -> {
          final AsyncGenerator.Data<E> next = next();
          if (next.isDone()) {
             return AsyncGenerator.Data.done(next.resultValue);
          }
          return AsyncGenerator.Data.of(next.data.thenApplyAsync(mapFunction, executor()));
       };
    }

    /**
     * Maps the elements of this generator to a new asynchronous generator, and flattens
     * the resulting nested generators.
     * @param mapFunction the function to map elements to a new asynchronous counterpart
     * @param <U> the type of elements in the new generator
     * @return a generator with mapped and flattened elements
     */
    default <U> AsyncGenerator<U> flatMap(Function<E, CompletableFuture<U>> mapFunction) {
       return () -> {
          final AsyncGenerator.Data<E> next = next();
          if (next.isDone()) {
             return AsyncGenerator.Data.done(next.resultValue);
          }
          return AsyncGenerator.Data.of(next.data.thenComposeAsync(mapFunction, executor()));
       };
    }

    /**
     * Filters the elements of this generator based on the given predicate. Only elements
     * that satisfy the predicate will be included in the resulting generator.
     * @param predicate the predicate to test elements against
     * @return a generator with elements that satisfy the predicate
     */
    default AsyncGenerator<E> filter(Predicate<E> predicate) {
       return () -> {
          AsyncGenerator.Data<E> next = next();
          while (!next.isDone()) {

             final E value = next.data.join();

             if (predicate.test(value)) {
                return next;
             }
             next = next();
          }
          return AsyncGenerator.Data.done(next.resultValue);
       };
    }

    /**
     * Asynchronously iterates over the elements of the AsyncGenerator and applies the
     * given consumer to each element.
     * @param consumer the consumer function to be applied to each element
     * @return a CompletableFuture representing the completion of the iteration process.
     */
    default CompletableFuture<Object> forEachAsync(Consumer<E> consumer) {
       CompletableFuture<Object> future = completedFuture(null);
       for (AsyncGenerator.Data<E> next = next(); !next.isDone(); next = next()) {
          final AsyncGenerator.Data<E> finalNext = next;
          if (finalNext.embed != null) {
             future = future.thenCompose(v -> finalNext.embed.generator.async(executor()).forEachAsync(consumer));
          }
          else {
             future = future
                .thenCompose(v -> finalNext.data.thenAcceptAsync(consumer, executor()).thenApply(x -> null));
          }
       }
       return future;
    }

    /**
     * Collects elements from the AsyncGenerator asynchronously into a list.
     * @param <R> the type of the result list
     * @param result the result list to collect elements into
     * @param consumer the consumer function for processing elements
     * @return a CompletableFuture representing the completion of the collection process
     */
    default <R extends List<E>> CompletableFuture<R> collectAsync(R result, BiConsumer<R, E> consumer) {
       CompletableFuture<R> future = completedFuture(result);
       for (AsyncGenerator.Data<E> next = next(); !next.isDone(); next = next()) {
          final AsyncGenerator.Data<E> finalNext = next;
          future = future.thenCompose(res -> finalNext.data.thenApplyAsync(v -> {
             consumer.accept(res, v);
             return res;
          }, executor()));
       }
       return future;
    }

}

AsyncGenerator

核心异步生成器接口,是整个框架异步处理、流式输出的基础组件

内部接口和类:

  • 接口:

    • HasResultValue:包含 resultValue()方法,标记可获取结果值的生成器
    • EmbedCompletionHandler:包含 accept(Object t)方法,嵌入生成器完成时的回调处理函数接口
  • 类:

    • WithResult:AsyncGenerator 的装饰器类,运行获取异步操作的结果值
      • 包含 delegate、resultValue 字段
    • WithEmbed:AsyncGenerator 的装饰器类,支持生成器的嵌套组合
      • 包含 generatorsStack(生成器栈)、returnValueStack(返回值栈)字段
    • Embed:嵌入的生成器
      • 包含 generator(嵌入的生成器)、onCompletion(完成回调)
    • Data:表示 AsyncGenerator 中的数据元素
      • 包含 data(CompletableFuture 类型的数据)、embed(嵌入的生成器)和 resultValue(结果值)等字段

对外暴露的方法


方法
描述
静态方法
resultValue(AsyncGenerator<?> generator)
从生成器中获取结果值
resultValue(Iterator<?> iterator)
从迭代器中获取结果值的静态方法
empty()
返回一个空的 AsyncGenerator 实例
map(Iterator iterator, Function
package com.alibaba.cloud.ai.graph.async;

import com.alibaba.cloud.ai.graph.async.internal.UnmodifiableDeque;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Optional.ofNullable;
import static java.util.concurrent.CompletableFuture.completedFuture;

/**
 * An asynchronous generator interface that allows generating asynchronous elements.
 *
 * @param <E> the type of elements. The generator will emit {@link CompletableFuture
 * CompletableFutures&lt;E&gt;} elements
 */
public interface AsyncGenerator<E> extends Iterable<E>, AsyncGeneratorOperators<E> {

    interface HasResultValue {

       Optional<Object> resultValue();

    }

    static Optional<Object> resultValue(AsyncGenerator<?> generator) {
       if (generator instanceof HasResultValue withResult) {
          return withResult.resultValue();
       }
       return Optional.empty();
    }

    static Optional<Object> resultValue(Iterator<?> iterator) {
       if (iterator instanceof HasResultValue withResult) {
          return withResult.resultValue();
       }
       return Optional.empty();
    }

    /**
     * An asynchronous generator decorator that allows retrieving the result value of the
     * asynchronous operation, if any.
     *
     * @param <E> the type of elements in the generator
     */
    class WithResult<E> implements AsyncGenerator<E>, HasResultValue {

       protected final AsyncGenerator<E> delegate;

       private Object resultValue;

       public WithResult(AsyncGenerator<E> delegate) {
          this.delegate = delegate;
       }

       public AsyncGenerator<E> delegate() {
          return delegate;
       }

       /**
        * Retrieves the result value of the generator, if any.
        * @return an {@link Optional} containing the result value if present, or an empty
        * Optional if not
        */
       public Optional<Object> resultValue() {
          return ofNullable(resultValue);
       };

       @Override
       public final Data<E> next() {
          final Data<E> result = delegate.next();
          if (result.isDone()) {
             resultValue = result.resultValue;
          }
          return result;
       }

    }

    /**
     * An asynchronous generator decorator that allows to generators composition embedding
     * other generators.
     *
     * @param <E> the type of elements in the generator
     */
    class WithEmbed<E> implements AsyncGenerator<E>, HasResultValue {

       protected final Deque<Embed<E>> generatorsStack = new ArrayDeque<>(2);

       private final Deque<Data<E>> returnValueStack = new ArrayDeque<>(2);

       public WithEmbed(AsyncGenerator<E> delegate, EmbedCompletionHandler onGeneratorDoneWithResult) {
          generatorsStack.push(new Embed<>(delegate, onGeneratorDoneWithResult));
       }

       public WithEmbed(AsyncGenerator<E> delegate) {
          this(delegate, null);
       }

       public Deque<Data<E>> resultValues() {
          return new UnmodifiableDeque<>(returnValueStack);
       }

       public Optional<Object> resultValue() {
          return ofNullable(returnValueStack.peek()).map(r -> r.resultValue);
       }

       private void clearPreviousReturnsValuesIfAny() {
          // Check if the return values are which ones from previous run
          if (returnValueStack.size() > 1 && returnValueStack.size() == generatorsStack.size()) {
             returnValueStack.clear();
          }
       }

       // private AsyncGenerator.WithResult<E> toGeneratorWithResult( AsyncGenerator<E>
       // generator ) {
       // return ( generator instanceof WithResult ) ?
       // (AsyncGenerator.WithResult<E>) generator :
       // new WithResult<>(generator);
       // }

       protected boolean isLastGenerator() {
          return generatorsStack.size() == 1;
       }

       @Override
       public Data<E> next() {
          if (generatorsStack.isEmpty()) { // GUARD
             throw new IllegalStateException("no generator found!");
          }

          final Embed<E> embed = generatorsStack.peek();
          final Data<E> result = embed.generator.next();

          if (result.isDone()) {
             clearPreviousReturnsValuesIfAny();
             returnValueStack.push(result);
             if (embed.onCompletion != null /* && result.resultValue != null */ ) {
                try {
                   embed.onCompletion.accept(result.resultValue);
                }
                catch (Exception e) {
                   return Data.error(e);
                }
             }
             if (isLastGenerator()) {
                return result;
             }
             generatorsStack.pop();
             return next();
          }
          if (result.embed != null) {
             if (generatorsStack.size() >= 2) {
                return Data.error(new UnsupportedOperationException(
                      "Currently recursive nested generators are not supported!"));
             }
             generatorsStack.push(result.embed);
             return next();
          }

          return result;
       }

    }

    @FunctionalInterface
    interface EmbedCompletionHandler {

       void accept(Object t) throws Exception;

    }

    class Embed<E> {

       final AsyncGenerator<E> generator;

       final EmbedCompletionHandler onCompletion;

       public Embed(AsyncGenerator<E> generator, EmbedCompletionHandler onCompletion) {
          Objects.requireNonNull(generator, "generator cannot be null");
          this.generator = generator;
          this.onCompletion = onCompletion;
       }

       public AsyncGenerator<E> getGenerator() {
          return generator;
       }

    }

    /**
     * Represents a data element in the AsyncGenerator.
     *
     * @param <E> the type of the data element
     */
    class Data<E> {

       final CompletableFuture<E> data;

       final Embed<E> embed;

       final Object resultValue;

       public Data(CompletableFuture<E> data, Embed<E> embed, Object resultValue) {
          this.data = data;
          this.embed = embed;
          this.resultValue = resultValue;
       }

       public CompletableFuture<E> getData() {
          return data;
       }

       public Embed<E> getEmbed() {
          return embed;
       }

       public Optional<Object> resultValue() {
          return resultValue == null ? Optional.empty() : Optional.of(resultValue);
       }

       public boolean isDone() {
          return data == null && embed == null;
       }

       public boolean isError() {
          return data != null && data.isCompletedExceptionally();
       }

       public static <E> Data<E> of(CompletableFuture<E> data) {
          return new Data<>(data, null, null);
       }

       public static <E> Data<E> of(E data) {
          return new Data<>(completedFuture(data), null, null);
       }

       public static <E> Data<E> composeWith(AsyncGenerator<E> generator, EmbedCompletionHandler onCompletion) {
          return new Data<>(null, new Embed<>(generator, onCompletion), null);
       }

       public static <E> Data<E> done() {
          return new Data<>(null, null, null);
       }

       public static <E> Data<E> done(Object resultValue) {
          return new Data<>(null, null, resultValue);
       }

       public static <E> Data<E> error(Throwable exception) {
          CompletableFuture<E> future = new CompletableFuture<>();
          future.completeExceptionally(exception);
          return Data.of(future);
       }

    }

    default AsyncGeneratorOperators<E> async(Executor executor) {
       return new AsyncGeneratorOperators<E>() {
          @Override
          public Data<E> next() {
             return AsyncGenerator.this.next();
          }

          @Override
          public Executor executor() {
             return executor;
          }
       };
    }

    /**
     * Retrieves the next asynchronous element.
     * @return the next element from the generator
     */
    Data<E> next();

    /**
     * Converts the AsyncGenerator to a CompletableFuture.
     * @return a CompletableFuture representing the completion of the AsyncGenerator
     */
    default CompletableFuture<Object> toCompletableFuture() {
       final Data<E> next = next();
       if (next.isDone()) {
          return completedFuture(next.resultValue);
       }
       return next.data.thenCompose(v -> toCompletableFuture());
    }

    /**
     * Returns a sequential Stream with the elements of this AsyncGenerator. Each
     * CompletableFuture is resolved and then make available to the stream.
     * @return a Stream of elements from the AsyncGenerator
     */
    default Stream<E> stream() {
       return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false);
    }

    /**
     * Returns an iterator over the elements of this AsyncGenerator. Each call to `next`
     * retrieves the next "resolved" asynchronous element from the generator.
     * @return an iterator over the elements of this AsyncGenerator
     */
    default Iterator<E> iterator() {
       return new InternalIterator<E>(this);
    }

    /**
     * Returns an empty AsyncGenerator.
     * @param <E> the type of elements
     * @return an empty AsyncGenerator
     */
    static <E> AsyncGenerator<E> empty() {
       return Data::done;
    }

    /**
     * create a generator, mapping each element to an asynchronous counterpart.
     * @param <E> the type of elements in the collection
     * @param <U> the type of elements in the CompletableFuture
     * @param iterator the elements iterator
     * @param mapFunction the function to map elements to {@link CompletableFuture}
     * @return an AsyncGenerator instance with mapped elements
     */
    static <E, U> AsyncGenerator<U> map(Iterator<E> iterator, Function<E, CompletableFuture<U>> mapFunction) {
       return () -> {
          if (!iterator.hasNext()) {
             return Data.done();
          }
          return Data.of(mapFunction.apply(iterator.next()));
       };
    }

    /**
     * Collects asynchronous elements from an iterator.
     * @param <E> the type of elements in the iterator
     * @param <U> the type of elements in the CompletableFuture
     * @param iterator the iterator containing elements to collect
     * @param consumer the function to consume elements and add them to the accumulator
     * @return an AsyncGenerator instance with collected elements
     */
    static <E, U> AsyncGenerator<U> collect(Iterator<E> iterator,
          BiConsumer<E, Consumer<CompletableFuture<U>>> consumer) {
       final List<CompletableFuture<U>> accumulator = new ArrayList<>();

       final Consumer<CompletableFuture<U>> addElement = accumulator::add;
       while (iterator.hasNext()) {
          consumer.accept(iterator.next(), addElement);
       }

       final Iterator<CompletableFuture<U>> it = accumulator.iterator();
       return () -> {
          if (!it.hasNext()) {
             return Data.done();
          }
          return Data.of(it.next());
       };
    }

    /**
     * create a generator, mapping each element to an asynchronous counterpart.
     * @param <E> the type of elements in the collection
     * @param <U> the type of elements in the CompletableFuture
     * @param collection the collection of elements to map
     * @param mapFunction the function to map elements to CompletableFuture
     * @return an AsyncGenerator instance with mapped elements
     */
    static <E, U> AsyncGenerator<U> map(Collection<E> collection, Function<E, CompletableFuture<U>> mapFunction) {
       if (collection == null || collection.isEmpty()) {
          return empty();
       }
       return map(collection.iterator(), mapFunction);
    }

    /**
     * Collects asynchronous elements from a collection.
     * @param <E> the type of elements in the iterator
     * @param <U> the type of elements in the CompletableFuture
     * @param collection the iterator containing elements to collect
     * @param consumer the function to consume elements and add them to the accumulator
     * @return an AsyncGenerator instance with collected elements
     */
    static <E, U> AsyncGenerator<U> collect(Collection<E> collection,
          BiConsumer<E, Consumer<CompletableFuture<U>>> consumer) {
       if (collection == null || collection.isEmpty()) {
          return empty();
       }
       return collect(collection.iterator(), consumer);
    }

}

class InternalIterator<E> implements Iterator<E>, AsyncGenerator.HasResultValue {

    private final AsyncGenerator<E> delegate;

    final AtomicReference<AsyncGenerator.Data<E>> currentFetchedData;

    public InternalIterator(AsyncGenerator<E> delegate) {
       this.delegate = delegate;
       currentFetchedData = new AtomicReference<>(delegate.next());
    }

    @Override
    public boolean hasNext() {
       final var value = currentFetchedData.get();
       return value != null && !value.isDone();
    }

    @Override
    public E next() {
       var next = currentFetchedData.get();

       if (next == null || next.isDone()) {
          throw new IllegalStateException("no more elements into iterator");
       }

       if (!next.isError()) {
          currentFetchedData.set(delegate.next());
       }

       return next.data.join();
    }

    @Override
    public Optional<Object> resultValue() {
       if (delegate instanceof AsyncGenerator.HasResultValue withResult) {
          return withResult.resultValue();
       }
       return Optional.empty();
    }

};

FlowGenerator

提供 Flow.Publisher 和 AsyncGenerator 之间的相互转换功能,使框架能与响应式编程生态系统良好集成

方法
描述
fromPublisher(P publisher, Supplier mapResult)
1. 接收一个Publisher 作为数据源和一个 Supplier 作为结果映射函数
2. 从一个 Flow.Publisher 创建 AsyncGenerator 实例
3. 返回GeneratorSubscriber 实例,它实现了 AsyncGenerator 接口
fromPublisher(P publisher)
与上面方法重载,不需要Supplier
toPublisher(AsyncGenerator generator)
将 AsyncGenerator 转换为 Flow.Publisher
package com.alibaba.cloud.ai.graph.async;

import com.alibaba.cloud.ai.graph.async.internal.reactive.GeneratorPublisher;
import com.alibaba.cloud.ai.graph.async.internal.reactive.GeneratorSubscriber;

import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

/**
 * Provides methods for converting between {@link FlowGenerator} and various
 * {@link Flow.Publisher} types.
 *
 * @since 3.0.0
 */
public interface FlowGenerator {

    /**
     * Creates an {@code AsyncGenerator} from a {@code Flow.Publisher}.
     * @param <T> the type of item emitted by the publisher
     * @param <P> the type of the publisher
     * @param publisher the publisher to subscribe to for retrieving items asynchronously
     * @param mapResult function that will set generator's result
     * @return an {@code AsyncGenerator} that emits items from the publisher
     */
    @SuppressWarnings("unchecked")
    static <T, P extends Flow.Publisher<T>, R> AsyncGenerator<T> fromPublisher(P publisher, Supplier<R> mapResult) {
       var queue = new LinkedBlockingQueue<AsyncGenerator.Data<T>>();
       return new GeneratorSubscriber<>(publisher, (Supplier<Object>) mapResult, queue);
    }

    /**
     * Creates an {@code AsyncGenerator} from a {@code Flow.Publisher}.
     * @param <T> the type of item emitted by the publisher
     * @param <P> the type of the publisher
     * @param publisher the publisher to subscribe to for retrieving items asynchronously
     * @return an {@code AsyncGenerator} that emits items from the publisher
     */
    static <T, P extends Flow.Publisher<T>> AsyncGenerator<T> fromPublisher(P publisher) {
       return fromPublisher(publisher, null);
    }

    /**
     * Converts an {@code AsyncGenerator} into a {@code Flow.Publisher}.
     * @param <T> the type of elements emitted by the publisher
     * @param generator the async generator to convert
     * @return a flow publisher
     */
    static <T> Flow.Publisher<T> toPublisher(AsyncGenerator<T> generator) {
       return new GeneratorPublisher<>(generator);
    }

}

AsyncGeneratorQueue

用于创建阻塞队列的异步生成器,通过阻塞队列实现异步数据生成,允许在不同线程中生产和消费数据,在指定的 Executor 中异步执行消费逻辑,同时支持逐步生成和消费数据,适用于流式场景

内部类 Generator

  • Data isEnd:标记生成器是否已经结束
  • BlockingQueue<Data> queue:阻塞队列,存储生成的数据元素

对外暴露的静态方法

方法
描述
of(Q queue, Consumer consumer)
创建一个AsyncGenerator 实例
1. 使用默认的ForkJoinPool.commonPool() 作为执行器
2. 在执行器中异步执行消费者逻辑
of(Q queue, Consumer consumer, Executor executor)
创建一个 AsyncGenerator 实例,更灵活的执行器配置选项
package com.alibaba.cloud.ai.graph.async;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import static java.util.concurrent.ForkJoinPool.commonPool;

/**
 * Represents a queue-based asynchronous generator.
 */
public class AsyncGeneratorQueue {

    /**
     * Inner class to generate asynchronous elements from the queue.
     *
     * @param <E> the type of elements in the queue
     */
    public static class Generator<E> implements AsyncGenerator<E> {

       Data<E> isEnd = null;

       final BlockingQueue<Data<E>> queue;

       /**
        * Constructs a Generator with the specified queue.
        * @param queue the blocking queue to generate elements from
        */
       public Generator(BlockingQueue<Data<E>> queue) {
          this.queue = queue;
       }

       public BlockingQueue<Data<E>> queue() {
          return queue;
       }

       /**
        * Retrieves the next element from the queue asynchronously.
        * @return the next element from the queue
        */
       @Override
       public Data<E> next() {
          while (isEnd == null) {
             Data<E> value = queue.poll();
             if (value != null) {
                if (value.isDone()) {
                   isEnd = value;
                }
                return value;
             }
          }
          return isEnd;
       }

    }

    /**
     * Creates an AsyncGenerator from the provided blocking queue and consumer.
     * @param <E> the type of elements in the queue
     * @param <Q> the type of blocking queue
     * @param queue the blocking queue to generate elements from
     * @param consumer the consumer for processing elements from the queue
     * @return an AsyncGenerator instance
     */
    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q queue,
          Consumer<Q> consumer) {
       return of(queue, consumer, commonPool());
    }

    /**
     * Creates an AsyncGenerator from the provided queue, executor, and consumer.
     * @param <E> the type of elements in the queue
     * @param <Q> the type of blocking queue
     * @param queue the blocking queue to generate elements from
     * @param consumer the consumer for processing elements from the queue
     * @param executor the executor for asynchronous processing
     * @return an AsyncGenerator instance
     */
    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q queue,
          Consumer<Q> consumer, Executor executor) {
       Objects.requireNonNull(queue);
       Objects.requireNonNull(executor);
       Objects.requireNonNull(consumer);

       executor.execute(() -> {
          try {
             consumer.accept(queue);
          }
          catch (Throwable ex) {
             CompletableFuture<E> error = new CompletableFuture<>();
             error.completeExceptionally(ex);
             queue.add(AsyncGenerator.Data.of(error));
          }
          finally {
             queue.add(AsyncGenerator.Data.done());
          }

       });

       return new Generator<>(queue);
    }

}

GeneratorPublisher

实现了Flow.Publisher接口的类,用于将 AsyncGenerator 转换为响应式流的 Publisher,将 AsyncGenerator 生成的数据以响应式流的方式发布给订阅者

  • AsyncGenerator<? extends T> delegate:内部字段,代表被包装的异步生成器
方法
描述
subscribe(Flow.Subscriber<? super T> subscriber)
实现Flow.Publisher 接口的 subscribe 方法
- 订阅指定的订阅者到此发布者
- 为订阅者创建并传递Subscription对象
- 使用异步生成器的数据填充订阅者

工作原理

GeneratorPublisher 的工作原理如下:

  1. 初始化阶段:
  • 通过构造函数传入一个 AsyncGenerator 实例
  • 保存该实例作为 delegate 字段
  1. 订阅阶段:
  • 当有订阅者订阅时,调用订阅者的 onSubscribe 方法
  • 传递一个 Subscription 实现,用于控制数据流
  1. 数据发布阶段:
  • 使用 delegate 异步生成器的 forEachAsync 方法遍历所有元素
  • 对每个元素调用订阅者的 onNext 方法进行发布
  • 在遍历完成后调用订阅者的 onComplete 方法
  1. 异常处理阶段:
  • 如果在数据处理过程中出现异常,调用订阅者的 onError 方法
  • 将异常信息传递给订阅
package com.alibaba.cloud.ai.graph.async.internal.reactive;

import com.alibaba.cloud.ai.graph.async.AsyncGenerator;

import java.util.concurrent.Flow;

/**
 * A {@code GeneratorPublisher} is a {@link Flow.Publisher} that generates items from an
 * asynchronous generator.
 *
 * @param <T> the type of items to be published
 */
public class GeneratorPublisher<T> implements Flow.Publisher<T> {

    private final AsyncGenerator<? extends T> delegate;

    /**
     * Constructs a new <code>GeneratorPublisher</code> with the specified async
     * generator.
     * @param delegate The async generator to be used by this publisher.
     */
    public GeneratorPublisher(AsyncGenerator<? extends T> delegate) {
       this.delegate = delegate;
    }

    /**
     * Subscribes the provided {@code Flow.Subscriber} to this signal. The subscriber
     * receives initial subscription, handles asynchronous data flow, and manages any
     * errors or completion signals.
     * @param subscriber The subscriber to which the signal will be delivered.
     */
    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
       subscriber.onSubscribe(new Flow.Subscription() {
          /**
           * Requests more elements from the upstream Publisher.
           *
           * <p>
           * The Publisher calls this method to indicate that it wants more items. The
           * parameter {@code n} specifies the number of additional items requested.
           * @param n the number of items to request, a count greater than zero
           */
          @Override
          public void request(long n) {
          }

          /**
           * Cancels the operation.
           * @throws UnsupportedOperationException if the method is not yet implemented.
           */
          @Override
          public void cancel() {
             throw new UnsupportedOperationException("cancel is not implemented yet!");
          }
       });

       delegate.forEachAsync(subscriber::onNext).thenAccept(value -> {
          subscriber.onComplete();
       }).exceptionally(ex -> {
          subscriber.onError(ex);
          return null;
       }).join();
    }

}

GeneratorSubscriber

实现了Flow.Subscriber接口的类,使得外部的响应式流 Publisher 可被框架内部作为 AsyncGenerator 使用

  • AsyncGeneratorQueue.Generator delegate:委托的异步队列生成器
  • Supplier mapResult:生成结果映射函数,在 onComplete 方法中使用,为生成器提供最终结果值
方法
名称
mapResult()
返回结果映射函数的 Optional 包装
onSubscribe(Flow.Subscription subscription)
实现 Flow.Subscriber 接口的 onSubscribe 方法
- 在订阅建立时被调用
- 调用 subscription.request(Long.MAXVALUE) 请求无限数量的数据项
onNext(T item)
实现 Flow.Subscriber 接口的 onNext 方法
- 在接收到新数据项时被调用
- 奖接收到的数据项添加到委托队列中
onError(Throwable error)
实现 Flow.Subscriber 接口的 onError 方法
- 在发生错误时被调用
- 将错误信息封装后添加到委托队列中
onComplete()
实现 Flow.Subscriber 接口的 onComplete 方法
- 在数据流完成时被调用
- 添加完成标记到队列中,并包含结果值
next()
实现 AsyncGenerator 接口的 next 方法
- 从委托生成器中获取下一个数据项
- 返回 Data 类型的对象,表示异步数据
package com.alibaba.cloud.ai.graph.async.internal.reactive;

import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.async.AsyncGeneratorQueue;

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.function.Supplier;

/**
 * Represents a subscriber for generating asynchronous data streams.
 *
 * <p>
 * This class implements the {@link Flow.Subscriber} and {@link AsyncGenerator} interfaces
 * to handle data flow and produce asynchronous data. It is designed to subscribe to a
 * publisher, process incoming items, and manage error and completion signals.
 * </p>
 *
 * @param <T> The type of elements produced by this generator.
 */
public class GeneratorSubscriber<T> implements Flow.Subscriber<T>, AsyncGenerator<T> {

    private final AsyncGeneratorQueue.Generator<T> delegate;

    private final Supplier<Object> mapResult;

    public Optional<Supplier<Object>> mapResult() {
       return Optional.ofNullable(mapResult);
    }

    /**
     * Constructs a new instance of {@code GeneratorSubscriber}.
     * @param <P> the type of the publisher, which must extend {@link Flow.Publisher}
     * @param mapResult function that will set generator's result
     * @param publisher the source publisher that will push data to this subscriber
     * @param queue the blocking queue used for storing asynchronous generator data
     */
    public <P extends Flow.Publisher<T>> GeneratorSubscriber(P publisher, Supplier<Object> mapResult,
          BlockingQueue<Data<T>> queue) {
       this.delegate = new AsyncGeneratorQueue.Generator<>(queue);
       this.mapResult = mapResult;
       publisher.subscribe(this);
    }

    /**
     * Constructs a new instance of {@code GeneratorSubscriber}.
     * @param <P> the type of the publisher, which must extend {@link Flow.Publisher}
     * @param publisher the source publisher that will push data to this subscriber
     * @param queue the blocking queue used for storing asynchronous generator data
     */
    public <P extends Flow.Publisher<T>> GeneratorSubscriber(P publisher, BlockingQueue<Data<T>> queue) {
       this(publisher, null, queue);
    }

    /**
     * Handles the subscription event from a Flux.
     * <p>
     * This method is called when a subscription to the source {@link Flow} has been
     * established. The provided {@code Flow.Subscription} can be used to manage and
     * control the flow of data emissions.
     * @param subscription The subscription object representing this resource owner
     * lifecycle. Used to signal that resources being subscribed to should not be released
     * until this subscription is disposed.
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
       subscription.request(Long.MAXVALUE);
    }

    /**
     * Passes the received item to the delegated queue as an {@link Data} object.
     * @param item The item to be processed and queued.
     */
    @Override
    public void onNext(T item) {
       delegate.queue().add(Data.of(item));
    }

    /**
     * Handles an error by queuing it in the delegate's queue with an errored data.
     * @param error The Throwable that represents the error to be handled.
     */
    @Override
    public void onError(Throwable error) {
       delegate.queue().add(Data.error(error));
    }

    /**
     * This method is called when the asynchronous operation is completed successfully. It
     * notifies the delegate that no more data will be provided by adding a done marker to
     * the queue.
     */
    @Override
    public void onComplete() {
       delegate.queue().add(Data.done(mapResult().map(Supplier::get).orElse(null)));
    }

    /**
     * Returns the next {@code Data<T>} object from this iteration.
     * @return the next element in the iteration, or null if there is no such element
     */
    @Override
    public Data<T> next() {
       return delegate.next();
    }

}

AsyncGeneratorUtils

用于处理异步生成器的合并和输出处理

  • 生成器合并:将多个异步生成器合并为一个统一的生成器,支持并行处理多个数据流
  • 结果合并:将多个生成器的结果按照指定的键策略进行合并处理
  • 轮询调度:通过轮询机制在多个活动生成器之间分配执行机会
  • 并发安全:使用 StampedLock 和 CopyOnWriteArrayList 等并发工具确保线程安全
  • 状态管理:跟踪活动生成器和已生成的结果,确保正确处理完成和错误状态

AsyncGeneratorUtils 是一个工具类,只包含静态方法,没有实例字段。但其内部创建的匿名 AsyncGenerator 类包含以下字段:

  • StampedLock lock:用于确保并发安全的锁机制,使用 StampedLock 提供读写锁功能,提高并发性能
  • AtomicInteger pollCounter:原子计数器,用于轮询调度活动生成器,确保在多个生成器之间公平分配执行机会
  • Map<String, Object> mergedResult:存储合并后的结果,用于累积所有生成器的输出结果
  • List<AsyncGenerator> activeGenerators:存储当前活动的生成器列表,使用 CopyOnWriteArrayList 确保线程安全
  • Map<AsyncGenerator, Map<String, Object>> generatorResults:存储每个生成器的中间结果,用于跟踪各个生成器的执行状态

静态公共方法

方法
描述
createAppropriateGenerator(List
package com.alibaba.cloud.ai.graph.streaming;

import com.alibaba.cloud.ai.graph.KeyStrategy;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import java.util.HashMap;

/**
 * Utility class for handling asynchronous generator merging and output processing
 */
public class AsyncGeneratorUtils {

    private static final Logger log = LoggerFactory.getLogger(AsyncGeneratorUtils.class);

    /**
     * Creates an appropriate generator based on the number of generator entries
     * @param generatorEntries list of generator entries
     * @param <T> output type
     * @return single generator or merged generator
     */
    @SuppressWarnings("unchecked")
    public static <T> AsyncGenerator<T> createAppropriateGenerator(List<Map.Entry<String, Object>> generatorEntries,
          List<AsyncGenerator<T>> asyncNodeGenerators, Map<String, KeyStrategy> keyStrategyMap) {
       if (generatorEntries.size() == 1) {
          // Only one generator, return it directly
          return (AsyncGenerator<T>) generatorEntries.get(0).getValue();
       }

       // Multiple generators, create a merged generator
       List<AsyncGenerator<T>> generators = generatorEntries.stream()
          .map(entry -> (AsyncGenerator<T>) entry.getValue())
          .collect(Collectors.toList());
       generators.addAll(asyncNodeGenerators);
       return createMergedGenerator(generators, keyStrategyMap);
    }

    /**
     * Creates a merged generator that combines outputs from multiple generators
     * @param generators list of generators to merge
     * @param <T> output type
     * @return merged generator
     */
    public static <T> AsyncGenerator<T> createMergedGenerator(List<AsyncGenerator<T>> generators,
          Map<String, KeyStrategy> keyStrategyMap) {
       return new AsyncGenerator<>() {
          // Switch to StampedLock to simplify lock management
          private final StampedLock lock = new StampedLock();

          private AtomicInteger pollCounter = new AtomicInteger(0);

          private Map<String, Object> mergedResult = new HashMap<>();

          private final List<AsyncGenerator<T>> activeGenerators = new CopyOnWriteArrayList<>(generators);

          private final Map<AsyncGenerator<T>, Map<String, Object>> generatorResults = new HashMap<>();

          @Override
          public Data<T> next() {
             while (true) {
                // Read optimistically and check quickly
                long stamp = lock.tryOptimisticRead();
                boolean empty = activeGenerators.isEmpty();
                if (!lock.validate(stamp)) {
                   stamp = lock.readLock();
                   try {
                      empty = activeGenerators.isEmpty();
                   }
                   finally {
                      lock.unlockRead(stamp);
                   }
                }
                if (empty) {
                   return Data.done(mergedResult);
                }

                // Fine-grained lock control
                final int currentIdx;
                AsyncGenerator<T> current;
                long writeStamp = lock.writeLock();
                try {
                   final int size = activeGenerators.size();
                   if (size == 0)
                      return Data.done(mergedResult);

                   currentIdx = pollCounter.updateAndGet(i -> (i + 1) % size);
                   current = activeGenerators.get(currentIdx);
                }
                finally {
                   lock.unlockWrite(writeStamp);
                }

                // Execute the generator 'next()' in the unlocked state
                Data<T> data = current.next();

                writeStamp = lock.writeLock();
                try {
                   // Double checks prevent status changes
                   if (!activeGenerators.contains(current)) {
                      continue;
                   }

                   if (data.isDone() || data.isError()) {
                      handleCompletedGenerator(current, data);
                      if (activeGenerators.isEmpty()) {
                         return Data.done(mergedResult);
                      }
                      continue;
                   }

                   handleCompletedGenerator(current, data);
                   return data;
                }
                finally {
                   lock.unlockWrite(writeStamp);
                }
             }

          }

          /**
           * Helper method to handle completed or errored generators
           */
          private void handleCompletedGenerator(AsyncGenerator<T> generator, Data<T> data) {
             // Remove generator if done or error
             if (data.isDone() || data.isError()) {
                activeGenerators.remove(generator);
             }

             // Process result if exists
             data.resultValue().ifPresent(result -> {
                if (result instanceof Map) {
                   @SuppressWarnings("unchecked")
                   Map<String, Object> mapResult = (Map<String, Object>) result;
                   mergedResult = OverAllState.updateState(mergedResult, mapResult, keyStrategyMap);
                }
             });

             // Remove from generator results if present
             generatorResults.remove(generator);
          }
       };
    }

}

NodeOutput

代表图中某个节点执行后的输出结果,包含节点标识和执行后的状态信息

  • String node:代表节点的标识符,不可更改
  • OverAllState state:代表与节点关联的状态信息,不可更改
  • boolean subGraph:标识该节点是否为子图节点
package com.alibaba.cloud.ai.graph;

import static java.lang.String.format;

/**
 * Represents the output of a node in a graph.
 *
 */
public class NodeOutput {

    public static NodeOutput of(String node, OverAllState state) {
       return new NodeOutput(node, state);
    }

    /**
     * The identifier of the node.
     */
    private final String node;

    /**
     * The state associated with the node.
     */
    private final OverAllState state;

    private boolean subGraph = false;

    public boolean isSubGraph() {
       return subGraph;
    }

    public NodeOutput setSubGraph(boolean subGraph) {
       this.subGraph = subGraph;
       return this;
    }

    public String node() {
       return node;
    }

    public OverAllState state() {
       return state;
    }

    protected NodeOutput(String node, OverAllState state) {
       this.node = node;
       this.state = state;
    }

    @Override
    public String toString() {
       return format("NodeOutput{node=%s, state=%s}", node(), state());
    }

}

StreamingOutput

NodeOutput 子类,用于处理需要逐步返回结果的场景,适用于大语言模型交互时的流式响应处理

  • String chunk:流式输出的单个片段内容
  • ChatResponse chatResponse:聊天响应对象,包含完整的聊天响应信息
package com.alibaba.cloud.ai.graph.streaming;

import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import org.springframework.ai.chat.model.ChatResponse;

import static java.lang.String.format;

public class StreamingOutput extends NodeOutput {

    private final String chunk; // null

    private final ChatResponse chatResponse;

    public StreamingOutput(ChatResponse chatResponse, String node, OverAllState state) {
       super(node, state);
       this.chatResponse = chatResponse;
       this.chunk = null;
    }

    public StreamingOutput(String chunk, String node, OverAllState state) {
       super(node, state);
       this.chunk = chunk;
       this.chatResponse = null;
    }

    public String chunk() {
       return chunk;
    }

    public ChatResponse chatResponse() {
       return chatResponse;
    }

    @Override
    public String toString() {
       if (node() == null) {
          return format("StreamingOutput{chunk=%s}", chunk());
       }
       return format("StreamingOutput{node=%s, state=%s, chunk=%s}", node(), state(), chunk());
    }

}

StreamingChatGenerator

处理流式聊天响应的生成器接口,可将多个部分响应合并为完整的聊天响应,其主要功能通过内部 Builder 类实现,Builder 内部字段如下

字段名称
字段类型
描述
mapResult
Function

对外暴露的核心方法

方法名称
描述
build(Flux flux)
1. 构建并返回处理聊天响应的 AsyncGenerator 实例
2. 生成器会合并响应并将其映射为最终输出
3. 使用响应文本构建StreamingOutput对象
buildWithChatResponse(Flux flux)
1. 构建并返回处理聊天响应的 AsyncGenerator 实例
2. 生成器会合并响应并将其映射为最终输出
3. 使用ChatResponse构建StreamingOutput对象
package com.alibaba.cloud.ai.graph.streaming;

import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.async.FlowGenerator;
import org.reactivestreams.FlowAdapters;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

/**
 * A generator interface for streaming chat responses in a reactive manner. It provides a
 * fluent API to configure and build a streaming generator that processes chat responses
 * and produces output based on the streamed data.
 */
public interface StreamingChatGenerator {

    /**
     * Builder class for creating instances of {@link AsyncGenerator} that process chat
     * responses. This builder allows setting mapping logic, starting node, and initial
     * state before building.
     */
    class Builder {

       private Function<ChatResponse, Map<String, Object>> mapResult;

       private String startingNode;

       private OverAllState startingState;

       /**
        * Sets the mapping function that converts a ChatResponse into a Map result.
        * @param mapResult a function to transform the final chat response into a result
        * map
        * @return the builder instance for method chaining
        */
       public Builder mapResult(Function<ChatResponse, Map<String, Object>> mapResult) {
          this.mapResult = mapResult;
          return this;
       }

       /**
        * Sets the starting node for the streaming process.
        * @param node the identifier of the starting node in the flow
        * @return the builder instance for method chaining
        */
       public Builder startingNode(String node) {
          this.startingNode = node;
          return this;
       }

       /**
        * Sets the initial state for the streaming process.
        * @param state the overall state to start with
        * @return the builder instance for method chaining
        */
       public Builder startingState(OverAllState state) {
          this.startingState = state;
          return this;
       }

       /**
        * Builds and returns an instance of AsyncGenerator that processes chat responses.
        * The generator merges partial responses and maps them to final output.
        * @param flux a Flux stream of ChatResponse objects
        * @return an AsyncGenerator that produces NodeOutput instances
        */
       public AsyncGenerator<? extends NodeOutput> build(Flux<ChatResponse> flux) {
          return buildInternal(flux,
                chatResponse -> new StreamingOutput(chatResponse.getResult().getOutput().getText(), startingNode,
                      startingState));
       }

       /**
        * Builds and returns an instance of AsyncGenerator that processes chat responses,
        * wrapping each ChatResponse into a StreamingOutput object. This method allows
        * downstream consumers to access the full ChatResponse (not just the text chunk)
        * for each streamed result, enabling richer output handling such as extracting
        * metadata, finishReason, or tool call information.
        * @param flux a Flux stream of ChatResponse objects
        * @return an AsyncGenerator that produces StreamingOutput instances containing
        * the full ChatResponse
        */
       public AsyncGenerator<? extends NodeOutput> buildWithChatResponse(Flux<ChatResponse> flux) {
          return buildInternal(flux, chatResponse -> new StreamingOutput(chatResponse, startingNode, startingState));
       }

       private AsyncGenerator<? extends NodeOutput> buildInternal(Flux<ChatResponse> flux,
             Function<ChatResponse, StreamingOutput> outputMapper) {
          Objects.requireNonNull(flux, "flux cannot be null");
          Objects.requireNonNull(mapResult, "mapResult cannot be null");

          var result = new AtomicReference<ChatResponse>(null);

          Consumer<ChatResponse> mergeMessage = (response) -> {
             result.updateAndGet(lastResponse -> {

                if (lastResponse == null) {
                   return response;
                }

                final var currentMessage = response.getResult().getOutput();

                if (currentMessage.hasToolCalls()) {
                   return response;
                }

                final var lastMessageText = requireNonNull(lastResponse.getResult().getOutput().getText(),
                      "lastResponse text cannot be null");

                final var currentMessageText = currentMessage.getText();

                var newMessage = new AssistantMessage(
                      currentMessageText != null ? lastMessageText.concat(currentMessageText) : lastMessageText,
                      currentMessage.getMetadata(), currentMessage.getToolCalls(), currentMessage.getMedia());

                var newGeneration = new Generation(newMessage, response.getResult().getMetadata());
                return new ChatResponse(List.of(newGeneration), response.getMetadata());

             });
          };

          var processedFlux = flux
             .filter(response -> response.getResult() != null && response.getResult().getOutput() != null)
             .doOnNext(mergeMessage)
             .map(outputMapper);

          return FlowGenerator.fromPublisher(FlowAdapters.toFlowPublisher(processedFlux), () -> {
             ChatResponse finalResult = result.get();
             System.out.println("StreamingChatGenerator: mapResult called, finalResult: "
                   + (finalResult != null ? "not null" : "null"));
             if (finalResult == null) {
                // 如果没有收到任何响应,返回空的结果
                return Map.of();
             }
             return mapResult.apply(finalResult);
          });
       }

    }

    /**
     * Returns a new instance of the StreamingChatGenerator builder.
     * @return a new builder instance
     */
    static Builder builder() {
       return new Builder();
    }

}

学习交流圈

你好,我是影子,曾先后在🐻、新能源、老铁就职,兼任Spring AI Alibaba开源社区的Committer。目前新建了一个交流群,一个人走得快,一群人走得远,关注公众号后可获得个人微信,添加微信后备注“交流”入群。另外,本人长期维护一套飞书云文档笔记,涵盖后端、大数据系统化的面试资料,可私信免费获取

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐