首页
聊一聊 gRPC 的四种通信模式

四种 rpc 服务接口定义方式

名称 请求(客户端) 响应(服务端)
简单 RPC 非 Stream 非 Stream
服务器端流式 RPC 非 Stream Stream
客户端流式 RPC Stream 非 Stream
双向流式 RPC Stream Stream

简单 PRC

客户端向服务器发送单个请求并获取单个响应,就像正常的函数调用一样。

服务端流式 RPC

客户端向服务器发送单个请求并获取一个数据流用来读取一系列消息。

客户端从返回的流中读取,直到没有更多消息为止。gRPC 保证单个 RPC 调用内的消息排序。

使用场景:

  1. 数据推送和实时更新:

当服务端有大量数据需要传输给客户端,并且这些数据可能是动态变化的,服务端流式 RPC 可以实现数据的实时推送和实时更新。例如,实时日志推送、实时监控数据更新等场景。

  1. 数据流处理:

当需要按照一定的流式处理方式处理客户端的请求,并逐步返回处理结果时,服务端流式 RPC 很有用。例如,数据流式处理、图像或视频流处理等场景。

  1. 大文件传输:

当需要下载大文件时,通过服务端流式 RPC 可以一边生成文件数据,一边传输给客户端,从而减少内存占用并提高效率。

  1. 批量数据处理:

当需要一次性处理大量数据,并将处理结果分批返回给客户端时,服务端流式 RPC 是一个有效的解决方案。例如,批量数据分析、批量数据导出等场景。

客户端流式 RPC

客户端写入一系列消息并将其发送到服务器,同样使用提供的流。客户端完成消息写入后,它会等待服务器读取消息并返回响应。gRPC 再次保证单个 RPC 调用内的消息排序。

使用场景:

  1. 大数据上传:

当客户端需要向服务端传输大量数据时,使用客户端流式 RPC 可以有效地减少网络开销和资源占用。客户端可以将数据分块发送给服务端,并在发送完所有数据后等待服务端的响应。

双向流式 RPC

双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序读取和写入:例如,服务器可以在写入响应之前等待接收所有客户端消息,或者可以交替读取消息然后写入消息,或其他一些读和写的组合。每个流中消息的顺序都会被保留。

使用场景:

  1. 即时聊天应用: 双向流式 RPC 可以用于实现即时聊天应用程序,客户端和服务端可以同时发送和接收消息,实现实时聊天功能。

  2. 实时数据处理: 当需要客户端和服务端之间进行实时数据传输和处理时,双向流式 RPC 提供了一种灵活且高效的通信方式。客户端和服务端可以同时发送和接收数据,并进行实时处理。

  3. 实时监控和通知: 双向流式 RPC 可以用于实现实时监控和通知系统,服务端可以向客户端推送监控数据或通知,客户端也可以向服务端发送请求并接收响应。

  4. 多人协作编辑器: 双向流式 RPC 可以用于实现多人协作编辑器,多个用户可以同时编辑同一文档,并实时同步编辑内容。

  5. 游戏开发: 在实时多人游戏开发中,双向流式 RPC 可以用于实现游戏客户端和服务端之间的实时通信,例如实时同步玩家位置、状态等信息。

// 简单 rpc
rpc SayHello(HelloRequest) returns (HelloResponse);
// 服务端流式 rpc
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
// 客户端流式 rpc
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
// 双向流式 rpc
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

示例代码

1. 服务端流式 RPC 实现

proto 文件定义

message LogStreamReq {
    int32 log_level = 1;
}

message LogStreamRes {
    int32 log_level = 1;
    string log_message = 2;
}

// 服务端流式 rpc,模拟一个服务端实时推送日志信息
service LogStreamService {
    rpc GetStreamLogs(LogStreamReq) returns (stream LogStreamRes);
}

服务端实现

  • 函数的参数和返回值与简单 RPC 不一样,req *pb.LogStreamReq, srv pb.LogStreamService_GetStreamLogsServer

  • 服务端通过流的方式进行返回,我们模拟了 10s 后返回所有的流数据,通过 return nil 或者 return io.EOF 表示服务端发送完毕

type LogStreamService struct {
	pb.UnimplementedLogStreamServiceServer
}

func (s *LogStreamService) GetStreamLogs(req *pb.LogStreamReq, srv pb.LogStreamService_GetStreamLogsServer) error {
	// 模拟 10s 发送完所有的数据
	timeout := time.After(5 * time.Second)
	for {
		select {
		case <-timeout:
			fmt.Println("finished...")
			return nil
		default:
			logMessage := generateLogMessage(req.LogLevel)
			if err := srv.Send(&pb.LogStreamRes{
				LogLevel:   req.LogLevel,
				LogMessage: logMessage,
			}); err != nil {
				return err
			}
		}
		// 模拟日志的间隔时间
		time.Sleep(1 * time.Second)
	}
}

func generateLogMessage(logLevel int32) string {
	return fmt.Sprintf("Log message with level %d", logLevel)
}

客户端实现

func main() {
	flag.Parse()
	// 1. 连接服务端
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 2. 实例化 grpc 客户端
	client := pb.NewLogStreamServiceClient(conn)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// 3. 发起流式 rpc 请求
	stream, err := client.GetStreamLogs(ctx, &pb.LogStreamReq{LogLevel: 1})
	if err != nil {
		log.Fatalf("could not get stream logs: %v", err)
	}
	// 4. 循环接收服务端推送的日志消息
	for {
		resp, err := stream.Recv()
		// 判断消息流是否已经结束
		if err == io.EOF {
			fmt.Println("接收完毕,退出...")
			break
		}
		if err != nil {
			log.Fatalf("Failed to receive log message: %v", err)
		}
		fmt.Println("Received log message:", resp.LogMessage)
	}
}

2. 客户端流式 RPC 实现

我们这里实现一个上传大文件的例子

proto 文件定义

message FileChunk {
    bytes data = 1;
}

message UploadStatus {
    bool success = 1;
}

service FileService {
    rpc UploadFile (stream FileChunk) returns (UploadStatus);
}

服务端实现

  • 使用for结合stream.Recv()来接收数据流

  • 同样用 io.EOF表示服务器已经接收完客户端的所有数据,使用SendAndClose告知客户端处理完毕

type FileService struct {
	pb.UnimplementedFileServiceServer
}

func (s *FileService) UploadFile(stream pb.FileService_UploadFileServer) error {
	filePath := "./file/movie.mkv"
	file, err := os.Create(filePath)
	if err != nil {
		return err
	}
	defer file.Close()

	for {
		chunk, err := stream.Recv()
		if err == io.EOF {
			log.Println("File uploaded successfully")
			return stream.SendAndClose(&pb.UploadStatus{Success: true})
		}
		if err != nil {
			return err
		}
		_, err = file.Write(chunk.Data)
		if err != nil {
			return err
		}
	}

}

客户端实现

func main() {
	flag.Parse()
	// 1. 连接服务端
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 2. 实例化 grpc 客户端
	client := pb.NewFileServiceClient(conn)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 3. open file
	filePath := "../movie.mkv"
	file, err := os.Open(filePath)
	if err != nil {
		log.Fatalf("Failed to open file: %v", err)
	}
	defer file.Close()

	// 4. 发起流式 rpc 请求
	stream, err := client.UploadFile(ctx)
	if err != nil {
		log.Fatalf("Failed to upload file: %v", err)
	}

	// 5. 开始上传文件
	buffer := make([]byte, 1024)
	for {
		n, err := file.Read(buffer)
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("Failed to read file: %v", err)
		}
		if err := stream.Send(&pb.FileChunk{Data: buffer[:n]}); err != nil {
			log.Fatalf("Failed to send chunk: %v", err)
		}
	}
	resp, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("Failed to receive response: %v", err)
	}
	if resp.Success {
		log.Println("File uploaded successfully")
	}
}

3. 双向流式 RPC 实现

我们实现一个聊天应用的例子

proto 文件定义

// 双向流 rpc 实现聊天应用
message Message {
    string sender = 1;
    string text = 2;
}

service ChatService {
    rpc Chat(stream Message) returns (stream Message);
}

服务端实现

服务端接收客户端的信息,并且处理后,发送给客户端

type ChatService struct {
	pb.UnimplementedChatServiceServer
}

func (s *ChatService) Chat(stream pb.ChatService_ChatServer) error {
	for {
		msg, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		log.Printf("[%s]: %s", msg.Sender, msg.Text)
		stream.Send(&pb.Message{Sender: "Server", Text: msg.Text})
	}
}

客户端实现

客户端另开一个协程,用于处理接收服务端的信息

func main() {
	flag.Parse()
	// 1. 连接服务端
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 2. 实例化 grpc 客户端
	client := pb.NewChatServiceClient(conn)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 3. 调用远程服务器 Chat 函数
	stream, err := client.Chat(ctx)
	if err != nil {
		log.Fatalf("Failed to create chat stream: %v", err)
	}

	// 4. 开一个协程处理服务器返回的消息
	go func() {
		for {
			msg, err := stream.Recv()
			if err == io.EOF {
				return
			}
			if err != nil {
				log.Fatalf("Failed to received message: %v", err)
			}
			log.Printf("[%s]: %s", msg.Sender, msg.Text)
		}
	}()

	// 5. 开始发送消息
	log.Print("Enter your message: ")
	for {
		var text string
		if _, err := fmt.Scanln(&text); err != nil {
			log.Fatalf("Failed to read input: %v", err)
		}
		if err := stream.Send(&pb.Message{Sender: "Client", Text: text}); err != nil {
			log.Fatalf("Failed to send message : %v", err)
		}
	}
}

同步与异步 stub

stub 是 grpc 的客户端。

  • 同步 RPC 调用会一直阻塞,直到服务器发出响应为止,这最接近 RPC 所追求的过程调用的抽象。

  • 另一方面,网络本质上是异步的,在许多场景中,能够在不阻塞当前线程的情况下启动 RPC 非常有用。

上面的 1 和 2 两种接口的客户端有同步和异步两种写法;3 和 4 两种接口的客户端只有异步一种写法。

// 客户端同步调用
StreamingGreeterClient.sayHelloBlock(String)
// 客户端异步调用
StreamingGreeterClient.sayHelloAsync(String)
// 服务端
StreamingGreeterImpl.sayHello(HelloRequest, StreamObserver<HelloReply>)
// 客户端-同步
StreamingGreeterClient.sayHelloServerStreamBlock(String)
// 客户端-异步
StreamingGreeterClient.sayHelloServerStreamAsync(String)
// 服务端
StreamingGreeterImpl.sayHelloServerStream(HelloRequest, StreamObserver<HelloReply>)
// 客户端-异步
StreamingGreeterClient.sayHelloClientStreamAsync(String...)
// 服务端
StreamingGreeterImpl.sayHelloClientStream(StreamObserver<HelloReply>)
// 客户端-异步
StreamingGreeterClient.sayHelloBidirStreamAsync(String...)
// 服务端服务端
StreamingGreeterImpl.sayHelloBidirStream(StreamObserver<HelloReply>)

参考资料: