从零开始搭建 gRPC 服务 Golang 篇(二) - qianlizeguo/doc GitHub Wiki
在从零开始搭建 gRPC 服务 - Golang 篇(一)中介绍了如何搭建 gRPC
环境并构建一个简单的 gRPC
服务,本文将介绍 gRPC
的 streaming
。
gRPC
基于标准的 HTTP/2 进行传输,可以方便的实现 streaming
功能。要在 gRPC
中使用 streaming
,只需要在 proto
中在请求或响应前加上 stream
即可。
客户端向服务器发送请求并获取流以读取消息序列;客户端从返回的流中读取,直到没有更多消息; gRPC 保证单个 RPC 调用中的消息排序。
在从零开始搭建 gRPC 服务 - Golang 篇(一)中的 helloworld.proto
中增加接口 LotsOfReplies
:
syntax = "proto3";
option go_package = "github.com/grpc/example/helloworld";
package helloworld;
// The greeting service definition.
service Greeter {
rpc LotsOfReplies (HelloRequest) returns (stream HelloReply){}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
$ protoc helloworld.proto --go_out=output
$ tree .
.
├── helloworld.proto
└── output
└── github.com
└── grpc
└── example
└── helloworld
└── helloworld.pb.go
5 directories, 2 files
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
此时生成的代码就已经包含了流的处理,在使用上需要注意:服务器端代码的实现要通过流的方式发送响应。
package main
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
"google.golang.org/grpc/reflection"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
for idx := 0; idx < 10; idx ++ {
stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s %d", in.Name, idx)})
}
return nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
如上代码所示,服务端在接收到请求后通过 stream 返回了 10 个响应。
package main
import (
"context"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
)
const (
address = "localhost:50051"
defaultName = "world"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.LotsOfReplies(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
for {
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.LotsOfReplies() = _, %v", c, err)
}
log.Printf("Greeting: %s\n", reply.Message)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
客户端从 stream 中读取到若干响应,直到读到 EOF 结束。
打开两个会话窗口,在其中之一执行:
$ go run server.go
- 1
在另一个会话窗口运行:
$ go run client.go gRPC_stream
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 0
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 1
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 2
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 3
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 4
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 5
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 6
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 7
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 8
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
客户端再次使用提供的流写入一系列消息并将其发送到服务器;一旦客户端写完消息,它就等待服务器读取它们并返回它的响应; gRPC 保证在单个 RPC 调用中的消息排序。
改写 helloworld.proto
,增加 LotsOfGreetings
:
syntax = "proto3";
option go_package = "github.com/grpc/example/helloworld";
package helloworld;
// The greeting service definition.
service Greeter {
rpc LotsOfGreetings (stream HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
int32 index = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
"google.golang.org/grpc/reflection"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error {
var total int32
var name string
for {
greeting, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.HelloReply{
Message: fmt.Sprintf("Hello %s, total %d", name, total),
})
}
if err != nil {
return err
}
name = greeting.Name
total += greeting.Index
}
return nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
服务端通过 stream 接收到若干请求,直到读到 EOF 后再返回响应。
package main
import (
"context"
"log"
"os"
"time"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
)
const (
address = "localhost:50051"
defaultName = "world"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.LotsOfGreetings(ctx)
if err != nil {
log.Fatalf("could not greet: %v", err)
}
for idx := 0; idx < 10; idx ++ {
if err := stream.Send(&pb.HelloRequest{
Name: name,
Index: int32(idx),
}); err != nil {
log.Fatalf("send err: %v", err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Greeting: %s\n", reply.Message)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
客户端通过 stream 发起 10 个请求,然后关闭 stream 并接收响应。
打开两个会话窗口,在其中之一执行:
$ go run server.go
- 1
在另一个会话窗口运行:
$ go run client.go gRPC_stream
2018/12/23 22:06:43 Greeting: Hello gRPC_stream, total 45
- 1
- 2
双方使用读写流发送一系列消息,这两个流独立运行,因此客户端和服务器可以按照他们喜欢的顺序进行读写:例如服务器可以在写入响应之前等待接收所有客户端消息,或者它可以交替读取消息然后写入消息,或其他一些读写组合;gRPC 保证在单个 RPC 调用中的消息排序。
改写 helloworld.proto
,增加 BidiHello
:
syntax = "proto3";
option go_package = "github.com/grpc/example/helloworld";
package helloworld;
// The greeting service definition.
service Greeter {
rpc BidiHello(stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
int32 index = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"strings"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
"google.golang.org/grpc/reflection"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
message := strings.Replace(in.Name, "吗", "", -1)
message = strings.Replace(message, "?", "!", -1)
err = stream.Send(&pb.HelloReply{Message: message})
if err != nil {
return err
}
}
return nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
服务端从 stream 中读取到请求后立即返回。
package main
import (
"context"
"fmt"
"log"
"io"
"time"
"google.golang.org/grpc"
pb "./output/github.com/grpc/example/helloworld"
)
const (
address = "localhost:50051"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
stream, err := c.BidiHello(ctx)
if err != nil {
log.Fatalf("%v.BidiHello(_) = _, %v", c, err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
fmt.Printf("AI: %s\n", in.Message)
}
}()
for {
request := &pb.HelloRequest{}
fmt.Scanln(&request.Name)
if request.Name == "quit" {
break
}
if err := stream.Send(request); err != nil {
log.Fatalf("Failed to send a req: %v", err)
}
}
stream.CloseSend()
<-waitc
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
客户端从标准输出接收输入,然后通过 stream 发送请求,另一个 goroutine
则不断从 stream 中接收响应。
打开两个会话窗口,在其中之一执行:
$ go run server.go
- 1
在另一个会话窗口运行:
$ go run client.go
在吗?
AI: 在!
你好
AI: 你好
能听懂汉语吗?
AI: 能听懂汉语!
真的吗?
AI: 真的!
quit
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
</div>