串流通訊
串流的實現原理
`Triple` 協議的串流模式
從協定層面來看,`Triple` 建構在 `HTTP2` 的基礎之上,因此它直接擁有 `HTTP2` 的所有能力,所以具備分割 `stream` 和全雙工的能力。
在框架層面,提供 `StreamObserver` 作為串流介面給使用者,提供輸入和輸出參數的串流處理。框架在發送和接收串流資料時進行對應的介面呼叫,以確保串流生命週期的完整性。
啟用 Triple 的新功能
串流 (Stream)
串流是 Dubbo3 提供的一種新的呼叫類型,建議在以下場景使用串流
- 介面需要傳送大量的資料,這些資料無法放置在一個 RPC 請求或回應中,需要分批傳送。但如果應用層無法解決傳統多次 RPC 方法中的順序和效能問題,如果需要保證順序,則只能串列傳送
- 在串流場景下,需要按照資料傳送的順序進行處理,且資料本身沒有明確的邊界
- 在推播場景下,在同一個呼叫的上下文中傳送和處理多條訊息
串流分為以下三種類型
- SERVER_STREAM(伺服器端串流)
- CLIENT_STREAM(客戶端串流)
- BIDIRECTIONAL_STREAM(雙向串流)
由於
java
語言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的實現方式相同。
在 Dubbo3 中,串流介面被宣告並使用為 SteamObserver
,使用者可以使用並實現此介面來發送和處理串流數據、異常和結束。
對於 Dubbo2 使用者來說,他們可能不熟悉 StreamObserver,它是 Dubbo3 定義的一種串流類型。Dubbo2 中沒有 Stream 類型,因此它對遷移場景沒有影響。
串流語義保證
- 提供訊息邊界,可以輕鬆地單獨處理訊息
- 嚴格排序,發送方的順序與接收方的順序一致
- 全雙工,無需等待發送
- 支援取消和逾時
非 PB 序列化的串流
API
public interface IWrapperGreeter {
StreamObserver<String> sayHelloStream(StreamObserver<String> response);
void sayHelloServerStream(String request, StreamObserver<String> response);
}
Stream 方法的方法輸入參數和返回值有嚴格的約定。為了防止因編寫錯誤引起的問題,Dubbo3 框架側會檢查參數,如果出現錯誤則拋出異常。對於
BIDIRECTIONAL_STREAM
,需要注意的是參數中的StreamObserver
是響應流,返回值中的StreamObserver
是請求流。
實現類
public class WrapGreeterImpl implements WrapGreeter {
//...
@Override
public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
return new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
response.onNext("hello,"+data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
response.onCompleted();
}
};
}
@Override
public void sayHelloServerStream(String request, StreamObserver<String> response) {
for (int i = 0; i < 10; i++) {
response.onNext("hello," + request);
}
response.onCompleted();
}
}
呼叫方法
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
for (int i = 0; i < n; i++) {
request.onNext("stream request" + i);
}
request.onCompleted();
使用 Protobuf 序列化的串流
對於 Protobuf
序列化方法,建議編寫 IDL
並使用 compiler
插件進行編譯和生成。生成的代碼大致如下
public interface PbGreeter {
static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
static final boolean inited = PbGreeterDubbo.init();
//...
void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}
上次修改時間:2023 年 1 月 2 日:增強英文文件 (#1798) (95a9f4f6c1c)