反應式編程

使用 Reactive API 操作 Triple 串流呼叫

功能說明

此功能基於 Triple 協定和 Project Reactor 實現,在 `3.1.0` 以上版本支援。使用者只需編寫 IDL 檔案,並指定 protobuf 插件對應的 Generator 即可生成並使用支援 Reactive API 的 Stub 程式碼。

共有四種呼叫模式,分別為 OneToOne、OneToMany、ManyToOne 和 ManyToMany,對應 Unary 呼叫、伺服器端串流、客戶端串流和雙向串流。在 Reactor 的實現中,One 對應 Mono,Many 對應 Flux。

背景

Reactive Stream 提供了一套標準的非同步串流處理 API。在允許應用程式編寫事件驅動程式同時,也透過背壓 (BackPressure) 機制保障節點穩定性。Triple 協定在通訊協定層面為 Dubbo 框架增加了對串流場景的支援。在此基礎上,可以實現上層包括大檔案傳輸、推播機制等業務需求。

Dubbo + Reactive Stream Stub 的組合模式可以為使用者帶來最便捷的串流使用方式,提升全鏈路的非同步效能。

參考用例

[https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-triple-reactor](https://github.com/apache/dubbo-samples/tree/master/3-extensions/ protocol/dubbo-samples-triple-reactor)

使用場景

系統需要處理大量併發請求,且不使任何伺服器過載。擁有大量使用者並提供即時資料的系統,希望確保系統能夠處理負載而不崩潰或變慢。

使用方法

關於 Triple 的使用和設定,請參考 IDL 中的 Triple 使用,並確保 Dubbo 版本 >= 3.1.0。

新增必要的依賴項

要使用 Reactor Triple,您需要新增以下額外的依賴項。

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

設定 protobuf Maven 插件

只需將 mainClass 改為 org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator 並確保 ${compiler.version} >= 3.1.0

<build>
    <plugins>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
                </pluginArtifact>
                <protocPlugins>
                    <protocPlugin>
                        <id>dubbo</id>
                        <groupId>org.apache.dubbo</groupId>
                        <artifactId>dubbo-compiler</artifactId>
                        <version>${compiler.version}</version>
                        <mainClass>org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator</mainClass>
                    </protocPlugin>
                </protocPlugins>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

撰寫和編譯 IDL 檔案

IDL 檔案的撰寫方式與原生 Triple 協定完全一致,編譯後,對應的程式碼預設會在 target/generated-sources/protobuf/java 目錄中。

syntax = "proto3";

option java_multiple_files = true;

package org.apache.dubbo.samples.triple.reactor;

// The request message containing the user's name.
message GreeterRequest {
  string name = 1;
}

// The response message containing the greetings
message GreeterReply {
  string message = 1;
}

service GreeterService {

  rpc greetOneToOne(GreeterRequest) returns (GreeterReply);

  rpc greetOneToMany(GreeterRequest) returns (stream GreeterReply);

  rpc greetManyToOne(stream GreeterRequest) returns (GreeterReply);

  rpc greetManyToMany(stream GreeterRequest) returns (stream GreeterReply);
}

使用

  1. 新增伺服器介面實作
package org.apache.dubbo.samples.triple.reactor.impl;

import org.apache.dubbo.samples.triple.reactor.DubboGreeterServiceTriple;
import org.apache.dubbo.samples.triple.reactor.GreeterReply;
import org.apache.dubbo.samples.triple.reactor.GreeterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

public class GreeterServiceImpl extends DubboGreeterServiceTriple.GreeterServiceImplBase {
    
    private static final Logger LOGGER = LoggerFactory. getLogger(GreeterServiceImpl. class);

    @Override
    public Flux<GreeterReply> greetManyToMany(Flux<GreeterRequest> request) {
        return request.doOnNext(req -> LOGGER.info("greetManyToMany get data: {}", req))
                .map(req -> GreeterReply. newBuilder(). setMessage(req. getName() + " -> server get"). build())
                .doOnNext(res -> LOGGER.info("greetManyToMany response data: {}", res));
    }
}
  1. 新增伺服器介面啟動類別
package org.apache.dubbo.samples.triple.reactor;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.samples.triple.reactor.impl.GreeterServiceImpl;

public class ReactorServer {

    private static final int PORT = 50052;

    public static void main(String[] args) {
        ServiceConfig<GreeterService> reactorService = new ServiceConfig<>();
        reactorService.setInterface(GreeterService.class);
        reactorService.setRef(new GreeterServiceImpl());

        DubboBootstrap bootstrap = DubboBootstrap. getInstance();
        bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                .protocol(new ProtocolConfig(CommonConstants.TRIPLE, PORT))
                .service(reactorService)
                .start();
    }
}
  1. 新增客戶端啟動類別和消費者程式
package org.apache.dubbo.samples.triple.reactor;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;

public class ReactorConsumer {

    private static final Logger LOGGER = LoggerFactory. getLogger(ReactorConsumer. class);

    private final GreeterService greeterService;

    public ReactorConsumer() {
        ReferenceConfig<GreeterService> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setInterface(GreeterService.class);
        referenceConfig.setProtocol(CommonConstants.TRIPLE);
        referenceConfig.setProxy(CommonConstants.NATIVE_STUB);
        referenceConfig.setTimeout(10000);

        DubboBootstrap bootstrap = DubboBootstrap. getInstance();
        bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                .reference(referenceConfig)
                .start();
        GreeterService greeterService = referenceConfig. get();
    }
    
    public static void main(String[] args) throws IOException {
        ReactorConsumer reactorConsumer = new ReactorConsumer();
        reactorConsumer.consumeManyToMany();
        System.in.read();
    }
    
    private void consumeManyToMany() {
        greeterService. greetManyToMany(Flux. range(1, 10)
                    .map(num->
                        GreeterRequest.newBuilder().setName(String.valueOf(num)).build())
                    .doOnNext(req -> LOGGER.info("consumeManyToMany request data: {}", req)))
                .subscribe(res -> LOGGER.info("consumeManyToMany get response: {}", res));
    }
}
  1. 啟動伺服器

  2. 啟動消費者


最後修改日期:2023 年 3 月 1 日:移動 SDK 文件 (#2337) (ceab0ea115a)