gRPC 구현 - Client Streaming RPC #2

이번 글에서는 gRPC 의 Client Streaming RPC 예제를 구현 해보도록 한다. Client Streaming RPC 는 다량의 데이터를 스트리밍 방식으로 전송한다는 점에서 지난 글에서 다뤘던 Unary RPC 와 다르다. (아래 그림에서 왼쪽 아래)

gRPC Learning (Part 1) - Yang(Bruce) Li - Medium

Client Streaming RPC의 구현

시작하기

Client Streaming RPC는 큰 데이터를 서버로 전송하고 단항으로 응답을 받는 서비스에 적합한 모델이다. 예를 들어 큰 이미지들을 전송해야 한다던지, 다량의 데이터를 서버로 전송해야 하는 상황에 적합하다. 다음은 이번 예제의 디렉토리 구조이다.

// 입장 클라이언트 path
$ mkdir -p cmd/stream-client/client
// 서버 path
$ mkdir -p cmd/stream-client/server
// gRPC 라이브러리 path
$ mkdir -p core/stream-client/

core/stream-client/stream-client.proto

어떤 예제를 작성할까 고민하다가 지난 글(gRPC 구현 - Unary RPC #1)에 구현했던 protobuf 명세를 수정해서 사용하기로 했다.
이전 명세와 가장 크게 다른 부분은 rpc 선언부에 인자값으로 Guest 메시지를 stream 을 받겠다고 명시한 부분과 Guest 메시지에 bytes 데이터 포멧으로 이미지를 받도록 명시한 부분이다.

syntax = "proto3";

package stream_client;

// Room 서비스를 생성
service Room {
    // Guest메시지를 스트림으로 전달하겠다고 정의함
    rpc Entry (stream Guest) returns (Message);
}

message Message {
    string message = 1;
}

message Guest {
    string name = 1;
    int32 age = 2;
    // 아바타이미지 데이터 필드 추가
    bytes avatar = 3;
}

make go code

새로운 코드를 만들어 보자

protoc --proto_path=core/stream-client --go_out=plugins=grpc:cor/stream-client/ stream-client.proto

cmd/stream-client/server/main.go

이전에 작성해봤던 서버코드와 크게 다르지 않다. protoc에서 생성된 인터페이스를 찾아서 ~바인딩할 구조체~ (room) 를 구현하면 된다.
서버코드에서 눈여겨 봐야할 부분은 stream 을 수신 받는 부분과 종료하는 부분이며 주석처리한 부분을 살펴보자.

package main

import (
    "fmt"
    "github.com/sirupsen/logrus"
    "google.golang.org/grpc"
    "io"
    pb "learn-grpc/core/stream-client"
    "net"
    "os"
)

type room struct{}
    // protoc 로 생성한 stream-client.pb.go 파일내 정의된 interface 를 참조하여 room struct 를 구현하면 된다.
func (t *room) Entry(stream pb.Room_EntryServer) error {
    // [] 스트리밍 종료시까지 아바타 이미지를 저장한다.
    for {
        // [] 임시 파일 생성
        file, e := os.Create("temp")
        if e != nil {
            logrus.Error(e)
            return e
        }
        // [] 스트림 데이터 수신
        res, e := stream.Recv()

        if e == io.EOF {
			  // 스트리밍 종료 처리
            logrus.Info("receive done")
            logrus.Info(" ")
            file.Close()
            os.Remove("temp")
            break
        }
        if e != nil {
            logrus.Error(e)
            file.Close()
            os.Remove("temp")
            return nil
        }
		  // 아바타 이미지 저장
        file.Write(res.Avatar)
        file.Close()
        // 파일명 변경
        os.Rename("temp", fmt.Sprintf("%s.png", res.Name))

        logrus.Info("receive ", res.Name)
    }
    // 스트리밍 종료시에 단항 메시지 전송
    return stream.SendAndClose(&pb.Message{
        Message: "success",
    })
}

func main() {
    l, e := net.Listen("tcp", ":8080")
    if e != nil {
        logrus.Error(e)
        return
    }
    srv := grpc.NewServer()

    wsrv := &room{}
    pb.RegisterRoomServer(srv, wsrv)

    logrus.Info(fmt.Sprintf("gRPC Server (%s)", l.Addr().String()))

    if e := srv.Serve(l); e != nil {
        logrus.Error(e)
    }
}

proton 에서 작성한 stream-client.pb.go 에서 살펴봐야 하는 부분

room 구조체를 작성할때 stream-client.pb.go 의 RoomServer 인터페이스를 따라 작성하면 된다.

cmd/stream-client/client/main.go

클라이언트에서는 아바타 이미지를 가지고 있는 가상의 사용자를 여럿 만들어서 0.5초 간격으로 서버로 전송시키는 코드를 작성했다. 눈여겨 봐야 할 부분은 스트림 데이터를 반복 전송하고 종료 후 메시지를 받아오는 부분이다.

package main

import (
    "context"
    "github.com/sirupsen/logrus"
    "google.golang.org/grpc"
    "io"
    pb "learn-grpc/core/stream-client"
    "os"
    "time"
)

func load(path string) ([]byte, error) {
    file, e := os.Open(path)
    if e != nil {
        logrus.Error(e)
        return nil, e
    }
    defer file.Close()

    done := make([]byte, 0)
    buf := make([]byte, 2048)
    for {
        _, e := file.Read(buf)
        if e == io.EOF {
            break
        }
        if e != nil {
            return nil, e
        }
        done = append(done, buf...)
    }
    return done, nil
}

func main() {
    conn, e := grpc.Dial("localhost:8080", grpc.WithInsecure())
    if e != nil {
        logrus.Error(e)
        return
    }
    defer conn.Close()

    // gRPC 클라이언트 접속
    c := pb.NewRoomClient(conn)
    stream, e := c.Entry(context.Background())
    if e != nil {
        logrus.Error(e)
        return
    }

    for _, name := range []string{
        "karl-1", "sienna-1",
        "karl-2", "sienna-2",
        "karl-3", "sienna-3",
    } {
        // [] 가상의 사용자 블럭
        member := pb.Guest{Name: name, Age: 10}

        // load 는 특정위치의 파일을 읽어 []byte 로 리턴한다.
        sp, e := load("./cmd/stream-client/client/avatar.png")
        if e != nil {
            logrus.Error(e)
            return
        }
        member.Avatar = sp

        // [] 스트리밍 발송
        stream.Send(&member)
        logrus.Info("send")
        time.Sleep(500 * time.Millisecond)
    }
    //
    //
    res, e := stream.CloseAndRecv()
    if e != nil {
        logrus.Error(e)
        return
    }
    logrus.Info("Final response", res)
}

구현 결과

실행 결과는 구현한 바 그대로 가상의 사용자 이름으로 이미지 파일이 생성 되었고, 서버 단에서도 이를 올바르게 처리 했다.