This PR can provides stream transport support for triple protocol, including server streaming and bidirectional/client streaming.


  • Add interface StreamHandler and implements
  • Provide and update related test.

Change log (Chinese) :


Stream 方式是一种异步的流式处理方式,可以在数据传输过程中逐个处理数据,避免一次性传输大量数据造成的性能问题。服务端Stream 是指服务端在处理请求时,将数据分成多个部分逐个返回给客户端的过程;客户端 Stream 是指客户端在请求服务器时,将请求参数分成多个部分逐个发送给服务器的过程。Stream 方式可以让我们在处理大量数据时更高效地使用资源,提高系统的性能和响应速度。



  • 接口希望发送大量数据,如图像、视频二进制等数据。在一次RPC请求或响应中传输大量数据可能导致性能或实现上的问题。而如果按照传统方式,多次发起普通RPC请求,则难以解决消息顺序和性能问题(想要保证有序则需要串行发送请求)
  • 数据流场景,数据本身没有确定边界,需要按照其发送顺序进行处理
  • 推送场景,希望在单次调用的上下文中处理多个消息


  • 提供消息边界,可以方便地对消息单独处理
  • 严格有序,发送端的顺序和接收端顺序一致
  • 全双工,发送不需要等待
  • 支持取消和超时


对于流式传输,我们可以将Stream分为四种类型 (借鉴于gRPC):







考虑到目前传输层的实现,此处可以考虑直接使用 gRPC 中的 StreamObserver,这样对于 Triple 协议的传输层实现来说比较方便(TripleClientInvoker 直接依赖 gRPC 进行传输),但缺点是可能导致其它未来可能支持流传输的协议同时耦合了gRPC的API。

更好的方案是使用自定义的 StreamObserver(此处先叫做StreamHandler),将其和gRPC解耦,在 Triple 传输层通过适配器转换为gRPC StreamObserver。此处使用自定义的StreamHandler。


public interface StreamHandler<T> {
    * 接收者通过实现该方法,定义单条信息的处理逻辑
    * 发送者则通过调用该方法发送一条信息
    void onMessage(T message);
    * 接收者通过该方法定义信息发送完成后的处理逻辑
    * 发送者通过调用该方法告知接收者所有信息发送完毕
    void onFinish();
    * 接收者通过该方法定义处理某条消息发生异常时的处理逻辑 (onMessage方法抛出异常时)
    * 发送者不应调用该方法
    void onException(Throwable throwable);


**1,标记请求类型 **

首先是识别方法调用类型,并为 SofaRequest 打上流传输的标记。

对于客户端来说,SofaRequest在客户端代理中创建。其中标记传输类型的字段在DefaultClientProxyInvoker中的 decorateRequest方法中设置(对于泛型调用)。

    protected void decorateRequest(SofaRequest request) {
        // 公共的设置

        // 缓存是为了加快速度
        request.setSerializeType(serializeType == null ? 0 : serializeType);

        if (!consumerConfig.isGeneric()) {
            // 找到调用类型, generic的时候类型在filter里进行判断

setInvokeType方法最终调用ConsumerConfig中的getMethodInvokeType,尝试从接口配置中获取已缓存的调用方式,否则使用默认值(一元调用)。可以将判断调用类型的操作添加在其中的 getMethodInvokeType 中,这样也可以将方法调用模式缓存,防止每次调用都要进行繁琐的判断操作。


  public String getMethodInvokeType(SofaRequest sofaRequest) {
        String methodName = sofaRequest.getMethodName();
        String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE,null);

        if(invokeType == null) {
            invokeType = getAndCacheCallType(sofaRequest);

        return invokeType;
     * Get and cache the call type of certain method
     * @param request RPC request
     * @return request call type
    public String getAndCacheCallType(SofaRequest request){
         Method method  = request.getMethod();
         String callType = MethodConfig.mapStreamType(
                 (String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE, getInvokeType())
         //Method level config
         return callType;
     * Gets the stream call type of certain method
     * @param method the method
     * @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value
    public static String mapStreamType(Method method, String defaultValue){
        Class<?>[] paramClasses = method.getParameterTypes();
        Class<?> returnClass = method.getReturnType();

        int paramLen = paramClasses.length;
        String callType;

        if(paramLen>0 && paramClasses[0].isAssignableFrom(StreamHandler.class) && returnClass.isAssignableFrom(StreamHandler.class)){

            if(paramLen > 1){
                throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional stream method parameters can be only one StreamHandler.");

            callType = RpcConstants.INVOKER_TYPE_BI_STREAMING;
        else if ( paramLen>1 && paramClasses[1].isAssignableFrom(StreamHandler.class) && returnClass.isAssignableFrom(StreamHandler.class)){
            callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING;
        else if (returnClass.isAssignableFrom(StreamHandler.class) && -> clazz.isAssignableFrom(StreamHandler.class))) {
            callType = RpcConstants.INVOKER_TYPE_CLIENT_STREAMING;
        else if (returnClass.isAssignableFrom(StreamHandler.class) || -> clazz.isAssignableFrom(StreamHandler.class))) {
            throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only appear at the specified location of parameter. Please check related docs.");
        //Other call types
        else {
            callType = defaultValue;

        return callType;


  • 如果方法参数中包含 StreamHandler,根据 StreamHandler 在方法参数中出现的情况,将SofaRequest 的 InvokeType 标记为 STREAM_CLIENT / STREAM_SERVER / STREAM_BI ,对应三种流传输方式。这三个常量可以添加到 RpcConstants 中。
  • 同时对方法中 StreamHandler 参数位置进行校验,若位置不符合定义中的用法直接抛出异常,并日志提示用户不符的原因。以上方法的考虑较简单,之后可添加更多的提示信息。
  • 缓存方法调用方式。

此处将调用方式缓存到了MethodConfig中,因为InterfaceConfig使用的是UnmodifiableMap,在其初始化完成后已不能再添加新的信息。由于 MethodConfig 本身没有保存 Method 引用,且不会默认创建,使得获取方法参数及返回值的具体类型有些困难,需要在实际请求时通过 SofaRequest 拿到具体的参数 Class,才可较简单的判断方法参数及返回值中StreamHandler的出现情况,判断请求类型。



 protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,sofaRequest request) throws SofaRpcException {
        try {
            // 同步调用
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
            // 单向调用
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
            // Callback调用
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
            // Future调用
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
            // 流式调用
            else if (RpcConstants.INVOKER_TYPE_STREAM_CLIENT.equals(invokeType)
                    || RpcConstants.INVOKER_TYPE_STREAM_BI.equals(invokeType)
                    || RpcConstants.INVOKER_TYPE_STREAM_SERVER.equals(invokeType)) { 
                response = transport.syncSend(request, Integer.MAX_VALUE);
            else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // 客户端其它异常
            throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);





SofaRPC 的 Triple 协议传输层实现目前直接依赖 gRPC 进行。

  • Triple 目前借助 gRPC API 完成传输操作,因此添加流传输支持较为简单,复用 gRPC 的流传输实现即可,无须处理下层的连接问题。在Triple客户端传输层(TripleClientInvoker)中需要添加的处理主要是将上层传入的StreamHandler适配为gRPC的StreamObserver,并根据方法参数中 StreamObserver 的存在情况,判断当前调用方式,选择对应的gRPC API即可。对于服务端,在IDL中添加流传输方法的定义,修改GenericsService实现新方法即可(泛型调用时)。


当 TripleClientInvoker 中 useGeneric 字段为 false 时,表示消费者调用的服务存在IDL及对应的Stub,可以直接通过生成的Stub进行调用。当 useGeneric 为 true 时,将在运行时动态指定调用的服务及方法,借助底层的泛型服务完成传输。

因此,Triple协议的修改分为两个方面:非泛型调用时,通过服务接口生成的stub进行流式调用;以及泛型调用时,通过预生成的GenericService stub发起流式调用。


对于泛型调用,Triple协议通过将已序列化完成的请求统一封装为 Request,通过预生成的gRPC stub完成传输过程。

因此需要修改transformer.proto, 添加流式调用方法的定义。 考虑客户端流和双向流使用可能使用相同的调用方法,此处没有单独定义客户端流的传输方法。

syntax = "proto3";
option java_multiple_files = true;
option java_package = "triple";
option java_outer_classname = "GenericProto";

service GenericService {
    rpc generic (Request) returns (Response) {}

    rpc genericBiStream (stream Request) returns (stream Response){}

    rpc genericServerStream(Request) returns (stream Response){}

message Request {
    string serializeType = 1;
    repeated bytes  args = 2;
    repeated string argTypes = 3;

message Response {
    string serializeType = 1;
    bytes  data = 2;
    string type = 3;


public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceImplBase {
    public StreamObserver<Request> genericBiStream(StreamObserver<Response> responseObserver) {
        Method serviceMethod = getBidirectionalStreamRequestMethod();
        SofaRequest sofaRequest = TracingContextKey.getKeySofaRequest().get(Context.current());

        if (serviceMethod == null) {
            throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " +
        String methodName = serviceMethod.getName();
        try {
            ResponseSerializeStreamHandler serverResponseHandler = new ResponseSerializeStreamHandler(responseObserver,

            setBidirectionalStreamRequestParams(sofaRequest, serviceMethod, serverResponseHandler);

            SofaResponse sofaResponse = invoker.invoke(sofaRequest);

            StreamHandler<Object> clientHandler = (StreamHandler<Object>) sofaResponse.getAppResponse();

            return new StreamObserver<Request>() {
                volatile Serializer serializer = null;

                volatile Class<?>[] argTypes   = null;

                public void onNext(Request request) {
                    Object message = getInvokeArgs(request, argTypes, serializer, false)[0];

                private void checkInitialize(Request request) {
                    if (serializer == null && argTypes == null) {
                        synchronized (this) {
                            if (serializer == null && argTypes == null) {
                                serializer = SerializerFactory.getSerializer(request.getSerializeType());
                                argTypes = getArgTypes(request, false);

                public void onError(Throwable t) {

                public void onCompleted() {
        } catch (Exception e) {
            LOGGER.error("Invoke " + methodName + " error:", e);
            throw new SofaRpcRuntimeException(e);
        } finally {







    public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
            throws Exception {

        MethodDescriptor.MethodType callType = mapCallType(sofaRequest);

            return stubCall(sofaRequest,timeout);
        } else if (callType.equals(MethodDescriptor.MethodType.UNARY)) {
            return unaryCall(sofaRequest, timeout);
        } else {
            return streamCall(sofaRequest, timeout, callType);


private SofaResponse streamCall(SofaRequest sofaRequest, int timeout, boolean useGeneric,MethodDescriptor.MethodType callType) {
         switch (callType){
             case BIDI_STREAMING:
                 return binaryCall(sofaRequest,timeout,useGeneric);
             case CLIENT_STREAMING:
                 return clientStreamCall(sofaRequest,timeout,useGeneric);
             case SERVER_STREAMING:
                 return serverStreamCall(sofaRequest,timeout,useGeneric);
                 throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Unknown stream call type:"+callType);


private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) {
        StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0];

        MethodDescriptor<Request, Response> methodDescriptor = getMethodDescriptor(sofaRequest);
        ClientCall<Request, Response> call = channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout));

        StreamObserver<Request> observer = ClientCalls.asyncBidiStreamingCall(
                new ClientStreamObserverAdapter(
        StreamHandler<Request> handler = new StreamHandler() {
            public void onMessage(Object message) {
                SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest);
                Object[] args = new Object[]{message};
                Request req = getRequest(request, serialization, serializer, 0);

            public void onFinish() {

            public void onException(Throwable throwable) {
        SofaResponse sofaResponse = new SofaResponse();
        return sofaResponse;


    private MethodDescriptor<Request,Response> getMethodDescriptor(SofaRequest sofaRequest) {
        String serviceName = sofaRequest.getInterfaceName();
        String methodName = sofaRequest.getMethodName();
        MethodDescriptor.Marshaller<?> requestMarshaller = ProtoUtils.marshaller(Request.getDefaultInstance());
        MethodDescriptor.Marshaller<?> responseMarshaller = ProtoUtils.marshaller(Response.getDefaultInstance());
        String fullMethodName = generateFullMethodName(serviceName, methodName);

        MethodDescriptor.Builder builder = MethodDescriptor
                .setRequestMarshaller((MethodDescriptor.Marshaller<Object>) requestMarshaller)
                .setResponseMarshaller((MethodDescriptor.Marshaller<Object>) responseMarshaller);

        MethodDescriptor.MethodType callType = SofaProtoUtils.mapGrpcCallType(sofaRequest.getInvokeType());


    public static MethodDescriptor.MethodType mapGrpcCallType(String callType){
        switch (callType){
            case INVOKER_TYPE_ONEWAY:
            case INVOKER_TYPE_FUTURE:
            case INVOKER_TYPE_CALLBACK:
            case INVOKER_TYPE_SYNC:
                return MethodDescriptor.MethodType.UNARY;
                return MethodDescriptor.MethodType.BIDI_STREAMING;
                return MethodDescriptor.MethodType.CLIENT_STREAMING;
                return MethodDescriptor.MethodType.SERVER_STREAMING;
                throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unsupported invoke type:" + callType);



对于服务端,泛型调用的入口是 GenericService 的实现,即 GenericSerivceImpl。

在为泛型服务的IDL中添加流式方法的定义并重新编译后,发现一元调用时(UNARY)不能正确的选择GenericServiceImple 的 generic 方法,而选择调用了 genericBiStream 方法。

断点调试可以发现,底层 ServerMethodDefinition 和 CallHandler 的绑定错误,使得方法选择了ID错误的CallHandler,从而调用了错误的方法。ServerMethodDefinition 在 ServerImpl.runInternal() 中通过 registry.lookupMethod(methodName)获取。实际调用的是 MutableHandlerRegistry,通过其中的 services 获取 ServerServiceDefinition。而 service 中的 ServerServiceDefinition 由 TripleServer.registerProcessor 放入,自此回到了 SofaRPC 自己的实现。

ServerServiceDefinition serviceDefinition = getServerServiceDefinition(providerConfig, uniqueIdInvoker);
            this.serviceInfo.put(providerConfig, serviceDefinition);


private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,ProviderConfig providerConfig) {
        ServerServiceDefinition templateDefinition = genericService.bindService();
        ServerCallHandler<Request, Response> templateHandler = (ServerCallHandler<Request, Response>) templateDefinition
            .getMethods().iterator().next().getServerCallHandler(); //这里只拿到了泛型服务中的第一个方法,没有适配新的流调用方法。
        List<MethodDescriptor<Request, Response>> methodDescriptor = getMethodDescriptor(providerConfig);
        List<ServerMethodDefinition<Request, Response>> methodDefs = getMethodDefinitions(templateHandler,//此处使用的 TemplateHandler 应随方法调用方式变化
        ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(getServiceDescriptor( //应该为将实际服务与泛型服务中的特定方法绑定起来
            templateDefinition, providerConfig, methodDescriptor));
        for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {


   private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
                                                        ProviderConfig providerConfig) {
        ServerServiceDefinition templateDefinition = genericService.bindService();
        List<MethodDescriptor<Request, Response>> methodDescriptor = getMethodDescriptor(providerConfig);//拿实际方法的 Descriptor
        List<ServerMethodDefinition<Request, Response>> methodDefs = mapMethodHandler(templateDefinition,methodDescriptor);
        ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(getServiceDescriptor( 
            templateDefinition, providerConfig, methodDescriptor));
        for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
   private List<ServerMethodDefinition<Request,Response>>createMethodDefainition(ServerServiceDefinition geneticServiceDefinition, List<MethodDescriptor<Request, Response>> serviceMethods){
            Collection<ServerMethodDefinition<?, ?>> genericServiceMethods = geneticServiceDefinition.getMethods();
            List<ServerMethodDefinition<Request,Response>> serverMethodDefinitions = new ArrayList<>();
            //Map ture service method to certain generic service method.
            for (ServerMethodDefinition<?,?> genericMethods : genericServiceMethods){
                for(MethodDescriptor<Request,Response> methodDescriptor : serviceMethods){


                        ServerMethodDefinition<Request,Response> genericMeth = (ServerMethodDefinition<Request, Response>) genericMethods;

                                ServerMethodDefinition.create(methodDescriptor, genericMeth.getServerCallHandler())
            return serverMethodDefinitions;

这样 ServerMethodDefinition 就和 callHandler 正确的绑定起来了。


以上方法传入的 ProviderConfig 提供了实际服务接口的配置信息,包括这些方法的调用方式。但是调用方式默认被设置为UNARY。在上层需要附加一部分根据接口方法参数判断调用类型的逻辑。

private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
            MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
                    .setType(MethodDescriptor.MethodType.UNARY) //默认固定为一元调用
                    .setFullMethodName(generateFullMethodName(providerConfig.getInterfaceId(), name))
        return result;


    private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
        List<MethodDescriptor<Request, Response>> result = new ArrayList<>();
        Set<String> methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId());

        for (String name : methodNames) {
            MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(providerConfig.getMethodCallType(name));
            MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
                    .setFullMethodName(generateFullMethodName(providerConfig.getInterfaceId(), name))
        return result;


     * MethodCallType ( Map<method.getName(),callType> )
    protected transient volatile Map<String,String> methodCallType = null;
     * Cache the call type of interface methods
    protected void loadMethodCallType(Class<?> interfaceClass){
        Method[] methods =  interfaceClass.getDeclaredMethods();
        this.methodCallType = new ConcurrentHashMap<>();
        for(Method method :methods) {
            //此处调用的是全局处理中提到的 MethodConfig 的新方法,根据方法参数匹配调用方法

    public String getMethodCallType(String methodName) {
        return methodCallType.get(methodName);


*写在 AbstractInterfaceConfig 是考虑消费者是否能复用这部分逻辑

在 ProviderConfig 初始化时,设置服务引用时会尝试匹配服务方法的具体调用方式,并缓存:

     * Sets ref.
     * @param ref the ref
     * @return the ref
    public ProviderConfig<T> setRef(T ref) {
        this.ref = ref;
        return this;


对于双向流和客户端流,接口声明时的入参和返回值均仅为 StreamHandler。这导致无论是客户端请求和服务端响应都无法在调用开始就得知具体的消息类型。因此获取、缓存具体消息类型、序列化器的操作需要延迟在客户端/服务端第一次得到具体请求时再进行。


   StreamHandler<Request> handler = new StreamHandler() {
                    public void onMessage(Object message) {
                        SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest);
                        Object[] args = new Object[]{message};
                        Request req = getRequest(request,serialization,serializer);

                    public void onFinish() {

                    public void onException(Throwable throwable) {


public class ClientStreamObserverAdapter implements StreamObserver<triple.Response> {

    private final StreamHandler<Object> streamHandler;

    private final Serializer serializer;

    private volatile Class<?> returnType;

    public ClientStreamObserverAdapter(StreamHandler<Object> streamHandler, byte serializeType) {
        this.streamHandler = streamHandler;
        this.serializer = SerializerFactory.getSerializer(serializeType);

    public void onNext(triple.Response response) {
        byte[] responseDate = response.getData().toByteArray();
        Object appResponse = null;
        String returnTypeName = response.getType();
        if (responseDate != null && responseDate.length > 0) {
            if(returnType == null && !returnTypeName.isEmpty()) {
                try {
                    returnType = Class.forName(returnTypeName);
                }catch (ClassNotFoundException e){
                    throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE,"Can not find return type :"+returnType);
            appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);


    public void onError(Throwable t) {

    public void onCompleted() {


public class ResponseSerializeStreamHandler<T> implements StreamHandler<T> {

    private final StreamObserver<triple.Response> streamObserver;

    private Serializer serializer;

    private String serializeType;

    public ResponseSerializeStreamHandler(StreamObserver<triple.Response> streamObserver,String serializeType) {
        this.streamObserver = streamObserver;
        serializer = SerializerFactory.getSerializer(serializeType);
        this.serializeType = serializeType;

    public void onMessage(T message) {
        Response.Builder builder = Response.newBuilder();
        builder.setData(ByteString.copyFrom(serializer.encode(message, null).array()));


    public void onFinish() {

    public void onException(Throwable throwable) {


服务端处理客户端请求使用的StreamHandler,TripleClientInvoker 中的匿名实现,将请求反序列化为指定类型:

return new StreamObserver<Request>() {
                volatile Serializer serializer = null;

                volatile Class<?>[] argTypes = null;

                public void onNext(Request request) {
                    Object message = getInvokeArgs(request, argTypes, serializer)[0];

                private void checkInitialize(Request request){
                    if (serializer == null && argTypes == null) {
                        synchronized (this) {
                            if (serializer == null && argTypes == null) {
                                serializer = SerializerFactory.getSerializer(request.getSerializeType());
                                argTypes = getArgTypes(request);

                public void onError(Throwable t) {

                public void onCompleted() {

@sofastack-cla sofastack-cla bot added cla:no Need sign CLA First-time contributor First-time contributor size/XXL cla:yes CLA is ok and removed cla:no Need sign CLA labels Aug 25, 2023
@EvenLjj PTAL :)

codecov bot commented Aug 25, 2023

Codecov Report

Attention: Patch coverage is 83.66013% with 50 lines in your changes are missing coverage. Please review.

Project coverage is 72.19%. Comparing base (e67ea54) to head (df27427).
