串流通訊模式

具體用例請參考:[dubbo-samples-triple/pojo](https://github.com/apache/dubbo-samples/tree/master/3-extensions/protocol/dubbo-samples-triple/src/main/java /org/apache/dubbo/sample/tri/pojo);

開啟 Triple 的新功能 - 串流(stream)

Stream 是 Dubbo3 提供的一種新的呼叫類型,建議在以下場景中使用 stream

  • 介面需要傳輸大量的數據,這些數據無法放置在一個 RPC 請求或響應中,需要分批傳輸。但是如果應用層無法解決傳統多次 RPC 方法的順序和性能問題,如果需要保證順序,則只能串行發送
  • 在串流場景下,需要按照數據發送的順序進行處理,並且數據本身沒有明確的邊界
  • 在推送場景下,在同一個呼叫的上下文中發送和處理多條消息

Stream 分為以下三種類型

  • SERVER_STREAM(服務器端串流) SERVER_STREAM
  • CLIENT_STREAM(客戶端串流) CLIENT_STREAM
  • BIDIRECTIONAL_STREAM(雙向串流) BIDIRECTIONAL_STREAM

由於 java 語言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的實現方式相同。

在 Dubbo3 中,串流介面以 StreamObserver 聲明和使用,使用者可以使用並實現此介面來發送和處理串流數據、異常和結束。

對於 Dubbo2 的使用者來說,可能會對 StreamObserver 感到陌生,它是 Dubbo3 定義的一種串流類型。Dubbo2 中沒有 Stream 類型,因此對遷移場景沒有影響。

串流語義保證

  • 提供消息邊界,可以方便地對消息進行單獨處理
  • 嚴格有序,發送端的順序與接收端的順序一致
  • 全雙工,無需等待發送
  • 支援取消和逾時

非 PB 序列化串流

  1. API
public interface IWrapperGreeter {

     StreamObserver<String> sayHelloStream(StreamObserver<String> response);

     void sayHelloServerStream(String request, StreamObserver<String> response);
}

Stream 方法的輸入參數和回傳值有嚴格的約定。為了防止寫錯造成的錯誤,Dubbo3 框架端會檢查參數,如果有錯誤會拋出異常。對於 BIDIRECTIONAL_STREAM,需要注意的是,參數中的 StreamObserver 是響應流,回傳值中的 StreamObserver 是請求流。

  1. 實作類別
public class WrapGreeterImpl implements WrapGreeter {

     //...

     @Override
     public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
         return new StreamObserver<String>() {
             @Override
             public void onNext(String data) {
                 System.out.println(data);
                 response.onNext("hello,"+data);
             }

             @Override
             public void onError(Throwable throwable) {
                 throwable. printStackTrace();
             }

             @Override
             public void onCompleted() {
                 System.out.println("onCompleted");
                 response.onCompleted();
             }
         };
     }

     @Override
     public void sayHelloServerStream(String request, StreamObserver<String> response) {
         for (int i = 0; i < 10; i++) {
             response.onNext("hello," + request);
         }
         response.onCompleted();
     }
}
  1. 呼叫方法
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
     @Override
     public void onNext(String data) {
         System.out.println(data);
     }

     @Override
     public void onError(Throwable throwable) {
         throwable. printStackTrace();
     }

     @Override
     public void onCompleted() {
         System.out.println("onCompleted");
     }
});


StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
     @Override
     public void onNext(String data) {
         System.out.println(data);
     }

     @Override
     public void onError(Throwable throwable) {
         throwable. printStackTrace();
     }

     @Override
     public void onCompleted() {
         System.out.println("onCompleted");
     }
});
for (int i = 0; i < n; i++) {
     request.onNext("stream request" + i);
}
request.onCompleted();

使用 Protobuf 序列化串流

對於 Protobuf 序列化方式,建議撰寫 IDL 並使用 compiler 插件編譯生成。生成的程式碼大致如下

public interface PbGreeter {

     static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
     static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";

     static final boolean inited = PbGreeterDubbo.init();
    
     //...

     void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);

     org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}

完整使用案例

  1. 撰寫 Java 介面

    import org.apache.dubbo.common.stream.StreamObserver;
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    
    public interface IGreeter {
        /**
         * <pre>
         * Sends greeting by stream
         * </pre>
         */
        StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    
    }
    
  2. 撰寫實作類別

    public class IStreamGreeterImpl implements IStreamGreeter {
    
        @Override
        public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
    
            return new StreamObserver<HelloRequest>() {
                private List<HelloReply> replyList = new ArrayList<>();
    
                @Override
                public void onNext(HelloRequest helloRequest) {
                    System.out.println("onNext receive request name:" + helloRequest.getName());
                   replyList.add(HelloReply.newBuilder()
                       .setMessage("receive name:" + helloRequest.getName())
                       .build());
               }
    
               @Override
               public void onError(Throwable cause) {
                   System.out.println("onError");
                   replyObserver.onError(cause);
               }
    
               @Override
               public void onCompleted() {
                   System.out.println("onComplete receive request size:" + replyList.size());
                   for (HelloReply reply : replyList) {
                       replyObserver.onNext(reply);
                   }
                   replyObserver.onCompleted();
               }
           };
       }
    }
    
  3. 建立 Provider

    public class StreamProvider {
        public static void main(String[] args) throws InterruptedException {
            ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
            service.setInterface(IStreamGreeter.class);
            service.setRef(new IStreamGreeterImpl());
            service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
            service.setApplication(new ApplicationConfig("stream-provider"));
            service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
            service. export();
            System.out.println("dubbo service started");
            new CountDownLatch(1). await();
        }
    }
    
  4. 建立 Consumer

    public class StreamConsumer {
        public static void main(String[] args) throws InterruptedException, IOException {
            ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
            ref. setInterface(IStreamGreeter. class);
            ref. setCheck(false);
            ref.setProtocol(CommonConstants.TRIPLE);
            ref. setLazy(true);
            ref. setTimeout(100000);
            ref. setApplication(new ApplicationConfig("stream-consumer"));
            ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
            final IStreamGreeter iStreamGreeter = ref. get();
    
            System.out.println("dubbo ref started");
            try {
    
                StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
                    @Override
                    public void onNext(HelloReply reply) {
                        System.out.println("onNext");
                        System.out.println(reply.getMessage());
                    }
    
                    @Override
                    public void onError(Throwable throwable) {
                        System.out.println("onError:" + throwable.getMessage());
                    }
    
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                });
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("tony")
                    .build());
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("nick")
                    .build());
    
                streamObserver.onCompleted();
            } catch (Throwable t) {
                t. printStackTrace();
            }
            System.in.read();
        }
    }
    
  5. 執行 Provider 和 Consumer,可以看到請求正常回傳

    onNext
    收到名稱:tony
    onNext
    收到名稱:nick
    onCompleted

常見問題

  1. 找不到 protobuf 類別

由於 Triple 協定底層需要依賴 protobuf 協定進行傳輸,即使定義的服務介面沒有使用 protobuf,也需要在環境中引入 protobuf 相關依賴。

         <dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.19.4</version>
</dependency>

最後修改日期:2023 年 1 月 2 日:增強英文文件 (#1798) (95a9f4f6c1c)