反應式編程
功能說明
此功能基於 Triple 協定和 Project Reactor 實現,在 `3.1.0` 以上版本支援。使用者只需編寫 IDL 檔案,並指定 protobuf 插件對應的 Generator 即可生成並使用支援 Reactive API 的 Stub 程式碼。
共有四種呼叫模式,分別為 OneToOne、OneToMany、ManyToOne 和 ManyToMany,對應 Unary 呼叫、伺服器端串流、客戶端串流和雙向串流。在 Reactor 的實現中,One 對應 Mono,Many 對應 Flux。
背景
Reactive Stream 提供了一套標準的非同步串流處理 API。在允許應用程式編寫事件驅動程式同時,也透過背壓 (BackPressure) 機制保障節點穩定性。Triple 協定在通訊協定層面為 Dubbo 框架增加了對串流場景的支援。在此基礎上,可以實現上層包括大檔案傳輸、推播機制等業務需求。
Dubbo + Reactive Stream Stub 的組合模式可以為使用者帶來最便捷的串流使用方式,提升全鏈路的非同步效能。
參考用例
[https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-triple-reactor](https://github.com/apache/dubbo-samples/tree/master/3-extensions/ protocol/dubbo-samples-triple-reactor)
使用場景
系統需要處理大量併發請求,且不使任何伺服器過載。擁有大量使用者並提供即時資料的系統,希望確保系統能夠處理負載而不崩潰或變慢。
使用方法
關於 Triple 的使用和設定,請參考 IDL 中的 Triple 使用,並確保 Dubbo 版本 >= 3.1.0。
新增必要的依賴項
要使用 Reactor Triple,您需要新增以下額外的依賴項。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
設定 protobuf Maven 插件
只需將 mainClass 改為 org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator
並確保 ${compiler.version}
>= 3.1.0
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>dubbo</id>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compiler</artifactId>
<version>${compiler.version}</version>
<mainClass>org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
撰寫和編譯 IDL 檔案
IDL 檔案的撰寫方式與原生 Triple 協定完全一致,編譯後,對應的程式碼預設會在 target/generated-sources/protobuf/java
目錄中。
syntax = "proto3";
option java_multiple_files = true;
package org.apache.dubbo.samples.triple.reactor;
// The request message containing the user's name.
message GreeterRequest {
string name = 1;
}
// The response message containing the greetings
message GreeterReply {
string message = 1;
}
service GreeterService {
rpc greetOneToOne(GreeterRequest) returns (GreeterReply);
rpc greetOneToMany(GreeterRequest) returns (stream GreeterReply);
rpc greetManyToOne(stream GreeterRequest) returns (GreeterReply);
rpc greetManyToMany(stream GreeterRequest) returns (stream GreeterReply);
}
使用
- 新增伺服器介面實作
package org.apache.dubbo.samples.triple.reactor.impl;
import org.apache.dubbo.samples.triple.reactor.DubboGreeterServiceTriple;
import org.apache.dubbo.samples.triple.reactor.GreeterReply;
import org.apache.dubbo.samples.triple.reactor.GreeterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class GreeterServiceImpl extends DubboGreeterServiceTriple.GreeterServiceImplBase {
private static final Logger LOGGER = LoggerFactory. getLogger(GreeterServiceImpl. class);
@Override
public Flux<GreeterReply> greetManyToMany(Flux<GreeterRequest> request) {
return request.doOnNext(req -> LOGGER.info("greetManyToMany get data: {}", req))
.map(req -> GreeterReply. newBuilder(). setMessage(req. getName() + " -> server get"). build())
.doOnNext(res -> LOGGER.info("greetManyToMany response data: {}", res));
}
}
- 新增伺服器介面啟動類別
package org.apache.dubbo.samples.triple.reactor;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.samples.triple.reactor.impl.GreeterServiceImpl;
public class ReactorServer {
private static final int PORT = 50052;
public static void main(String[] args) {
ServiceConfig<GreeterService> reactorService = new ServiceConfig<>();
reactorService.setInterface(GreeterService.class);
reactorService.setRef(new GreeterServiceImpl());
DubboBootstrap bootstrap = DubboBootstrap. getInstance();
bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.protocol(new ProtocolConfig(CommonConstants.TRIPLE, PORT))
.service(reactorService)
.start();
}
}
- 新增客戶端啟動類別和消費者程式
package org.apache.dubbo.samples.triple.reactor;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
public class ReactorConsumer {
private static final Logger LOGGER = LoggerFactory. getLogger(ReactorConsumer. class);
private final GreeterService greeterService;
public ReactorConsumer() {
ReferenceConfig<GreeterService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface(GreeterService.class);
referenceConfig.setProtocol(CommonConstants.TRIPLE);
referenceConfig.setProxy(CommonConstants.NATIVE_STUB);
referenceConfig.setTimeout(10000);
DubboBootstrap bootstrap = DubboBootstrap. getInstance();
bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.reference(referenceConfig)
.start();
GreeterService greeterService = referenceConfig. get();
}
public static void main(String[] args) throws IOException {
ReactorConsumer reactorConsumer = new ReactorConsumer();
reactorConsumer.consumeManyToMany();
System.in.read();
}
private void consumeManyToMany() {
greeterService. greetManyToMany(Flux. range(1, 10)
.map(num->
GreeterRequest.newBuilder().setName(String.valueOf(num)).build())
.doOnNext(req -> LOGGER.info("consumeManyToMany request data: {}", req)))
.subscribe(res -> LOGGER.info("consumeManyToMany get response: {}", res));
}
}
啟動伺服器
啟動消費者