串流通訊模式
具體用例請參考:[dubbo-samples-triple/pojo](https://github.com/apache/dubbo-samples/tree/master/3-extensions/protocol/dubbo-samples-triple/src/main/java /org/apache/dubbo/sample/tri/pojo);
開啟 Triple 的新功能 - 串流(stream)
Stream 是 Dubbo3 提供的一種新的呼叫類型,建議在以下場景中使用 stream
- 介面需要傳輸大量的數據,這些數據無法放置在一個 RPC 請求或響應中,需要分批傳輸。但是如果應用層無法解決傳統多次 RPC 方法的順序和性能問題,如果需要保證順序,則只能串行發送
- 在串流場景下,需要按照數據發送的順序進行處理,並且數據本身沒有明確的邊界
- 在推送場景下,在同一個呼叫的上下文中發送和處理多條消息
Stream 分為以下三種類型
- SERVER_STREAM(服務器端串流)
- CLIENT_STREAM(客戶端串流)
- BIDIRECTIONAL_STREAM(雙向串流)
由於
java
語言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的實現方式相同。
在 Dubbo3 中,串流介面以 StreamObserver
聲明和使用,使用者可以使用並實現此介面來發送和處理串流數據、異常和結束。
對於 Dubbo2 的使用者來說,可能會對 StreamObserver 感到陌生,它是 Dubbo3 定義的一種串流類型。Dubbo2 中沒有 Stream 類型,因此對遷移場景沒有影響。
串流語義保證
- 提供消息邊界,可以方便地對消息進行單獨處理
- 嚴格有序,發送端的順序與接收端的順序一致
- 全雙工,無需等待發送
- 支援取消和逾時
非 PB 序列化串流
- API
public interface IWrapperGreeter {
StreamObserver<String> sayHelloStream(StreamObserver<String> response);
void sayHelloServerStream(String request, StreamObserver<String> response);
}
Stream 方法的輸入參數和回傳值有嚴格的約定。為了防止寫錯造成的錯誤,Dubbo3 框架端會檢查參數,如果有錯誤會拋出異常。對於
BIDIRECTIONAL_STREAM
,需要注意的是,參數中的StreamObserver
是響應流,回傳值中的StreamObserver
是請求流。
- 實作類別
public class WrapGreeterImpl implements WrapGreeter {
//...
@Override
public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
return new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
response.onNext("hello,"+data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
response.onCompleted();
}
};
}
@Override
public void sayHelloServerStream(String request, StreamObserver<String> response) {
for (int i = 0; i < 10; i++) {
response.onNext("hello," + request);
}
response.onCompleted();
}
}
- 呼叫方法
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
throwable. printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
for (int i = 0; i < n; i++) {
request.onNext("stream request" + i);
}
request.onCompleted();
使用 Protobuf 序列化串流
對於 Protobuf
序列化方式,建議撰寫 IDL
並使用 compiler
插件編譯生成。生成的程式碼大致如下
public interface PbGreeter {
static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
static final boolean inited = PbGreeterDubbo.init();
//...
void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}
完整使用案例
撰寫 Java 介面
import org.apache.dubbo.common.stream.StreamObserver; import org.apache.dubbo.hello.HelloReply; import org.apache.dubbo.hello.HelloRequest; public interface IGreeter { /** * <pre> * Sends greeting by stream * </pre> */ StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver); }
撰寫實作類別
public class IStreamGreeterImpl implements IStreamGreeter { @Override public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) { return new StreamObserver<HelloRequest>() { private List<HelloReply> replyList = new ArrayList<>(); @Override public void onNext(HelloRequest helloRequest) { System.out.println("onNext receive request name:" + helloRequest.getName()); replyList.add(HelloReply.newBuilder() .setMessage("receive name:" + helloRequest.getName()) .build()); } @Override public void onError(Throwable cause) { System.out.println("onError"); replyObserver.onError(cause); } @Override public void onCompleted() { System.out.println("onComplete receive request size:" + replyList.size()); for (HelloReply reply : replyList) { replyObserver.onNext(reply); } replyObserver.onCompleted(); } }; } }
建立 Provider
public class StreamProvider { public static void main(String[] args) throws InterruptedException { ServiceConfig<IStreamGreeter> service = new ServiceConfig<>(); service.setInterface(IStreamGreeter.class); service.setRef(new IStreamGreeterImpl()); service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)); service.setApplication(new ApplicationConfig("stream-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service. export(); System.out.println("dubbo service started"); new CountDownLatch(1). await(); } }
建立 Consumer
public class StreamConsumer { public static void main(String[] args) throws InterruptedException, IOException { ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>(); ref. setInterface(IStreamGreeter. class); ref. setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref. setLazy(true); ref. setTimeout(100000); ref. setApplication(new ApplicationConfig("stream-consumer")); ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181")); final IStreamGreeter iStreamGreeter = ref. get(); System.out.println("dubbo ref started"); try { StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() { @Override public void onNext(HelloReply reply) { System.out.println("onNext"); System.out.println(reply.getMessage()); } @Override public void onError(Throwable throwable) { System.out.println("onError:" + throwable.getMessage()); } @Override public void onCompleted() { System.out.println("onCompleted"); } }); streamObserver.onNext(HelloRequest.newBuilder() .setName("tony") .build()); streamObserver.onNext(HelloRequest.newBuilder() .setName("nick") .build()); streamObserver.onCompleted(); } catch (Throwable t) { t. printStackTrace(); } System.in.read(); } }
執行 Provider 和 Consumer,可以看到請求正常回傳
onNext
收到名稱:tony
onNext
收到名稱:nick
onCompleted
常見問題
- 找不到 protobuf 類別
由於 Triple 協定底層需要依賴 protobuf 協定進行傳輸,即使定義的服務介面沒有使用 protobuf,也需要在環境中引入 protobuf 相關依賴。
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>