串流通訊

串流的實現原理

`Triple` 協議的串流模式

  • 從協定層面來看,`Triple` 建構在 `HTTP2` 的基礎之上,因此它直接擁有 `HTTP2` 的所有能力,所以具備分割 `stream` 和全雙工的能力。

  • 在框架層面,提供 `StreamObserver` 作為串流介面給使用者,提供輸入和輸出參數的串流處理。框架在發送和接收串流資料時進行對應的介面呼叫,以確保串流生命週期的完整性。

啟用 Triple 的新功能

串流 (Stream)

串流是 Dubbo3 提供的一種新的呼叫類型,建議在以下場景使用串流

  • 介面需要傳送大量的資料,這些資料無法放置在一個 RPC 請求或回應中,需要分批傳送。但如果應用層無法解決傳統多次 RPC 方法中的順序和效能問題,如果需要保證順序,則只能串列傳送
  • 在串流場景下,需要按照資料傳送的順序進行處理,且資料本身沒有明確的邊界
  • 在推播場景下,在同一個呼叫的上下文中傳送和處理多條訊息

串流分為以下三種類型

  • SERVER_STREAM(伺服器端串流) SERVER_STREAM
  • CLIENT_STREAM(客戶端串流) CLIENT_STREAM
  • BIDIRECTIONAL_STREAM(雙向串流) BIDIRECTIONAL_STREAM

由於 java 語言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的實現方式相同。

在 Dubbo3 中,串流介面被宣告並使用為 SteamObserver,使用者可以使用並實現此介面來發送和處理串流數據、異常和結束。

對於 Dubbo2 使用者來說,他們可能不熟悉 StreamObserver,它是 Dubbo3 定義的一種串流類型。Dubbo2 中沒有 Stream 類型,因此它對遷移場景沒有影響。

串流語義保證

  • 提供訊息邊界,可以輕鬆地單獨處理訊息
  • 嚴格排序,發送方的順序與接收方的順序一致
  • 全雙工,無需等待發送
  • 支援取消和逾時

非 PB 序列化的串流

API

public interface IWrapperGreeter {

    StreamObserver<String> sayHelloStream(StreamObserver<String> response);

    void sayHelloServerStream(String request, StreamObserver<String> response);
}

Stream 方法的方法輸入參數和返回值有嚴格的約定。為了防止因編寫錯誤引起的問題,Dubbo3 框架側會檢查參數,如果出現錯誤則拋出異常。對於 BIDIRECTIONAL_STREAM,需要注意的是參數中的 StreamObserver 是響應流,返回值中的 StreamObserver 是請求流。

實現類

public class WrapGreeterImpl implements WrapGreeter {

    //...

    @Override
    public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
        return new StreamObserver<String>() {
            @Override
            public void onNext(String data) {
                System.out.println(data);
                response.onNext("hello,"+data);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable. printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
                response.onCompleted();
            }
        };
    }

    @Override
    public void sayHelloServerStream(String request, StreamObserver<String> response) {
        for (int i = 0; i < 10; i++) {
            response.onNext("hello," + request);
        }
        response.onCompleted();
    }
}

呼叫方法

delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
    @Override
    public void onNext(String data) {
        System.out.println(data);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable. printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
});


StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
    @Override
    public void onNext(String data) {
        System.out.println(data);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable. printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
});
for (int i = 0; i < n; i++) {
    request.onNext("stream request" + i);
}
request.onCompleted();

使用 Protobuf 序列化的串流

對於 Protobuf 序列化方法,建議編寫 IDL 並使用 compiler 插件進行編譯和生成。生成的代碼大致如下

public interface PbGreeter {

    static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
    static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";

    static final boolean inited = PbGreeterDubbo.init();
    
    //...

    void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);

    org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}

上次修改時間:2023 年 1 月 2 日:增強英文文件 (#1798) (95a9f4f6c1c)