Skip to main content

Microservices & gRPC

Go is the language of choice for building microservices due to its simplicity, performance, and excellent concurrency support. This chapter covers gRPC, Protocol Buffers, and essential microservices patterns.

gRPC Fundamentals

gRPC is a high-performance RPC framework that uses HTTP/2 and Protocol Buffers.

Why gRPC?

FeaturegRPCREST
ProtocolHTTP/2HTTP/1.1
PayloadBinary (Protobuf)Text (JSON/XML)
StreamingBidirectionalLimited
Code GenerationYesOptional
Type SafetyStrongWeak
PerformanceFasterSlower

Protocol Buffers

Protocol Buffers (protobuf) is Google’s language-neutral serialization format.
// user.proto
syntax = "proto3";

package user;
option go_package = "github.com/myapp/proto/user";

// User message
message User {
  int64 id = 1;
  string name = 2;
  string email = 3;
  repeated string roles = 4;
  UserStatus status = 5;
  google.protobuf.Timestamp created_at = 6;
}

enum UserStatus {
  USER_STATUS_UNSPECIFIED = 0;
  USER_STATUS_ACTIVE = 1;
  USER_STATUS_INACTIVE = 2;
  USER_STATUS_BANNED = 3;
}

// Request/Response messages
message GetUserRequest {
  int64 id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  string next_page_token = 2;
  int32 total_count = 3;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  string password = 3;
}

// Service definition
service UserService {
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(User) returns (User);
  rpc DeleteUser(GetUserRequest) returns (google.protobuf.Empty);
  
  // Streaming
  rpc WatchUsers(ListUsersRequest) returns (stream User);
  rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
}

Generating Go Code

# Install protoc compiler and plugins
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# Generate code
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       proto/user.proto

gRPC Server

Implementing the Service

package main

import (
    "context"
    "log"
    "net"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    
    pb "github.com/myapp/proto/user"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    users map[int64]*pb.User
}

func NewUserServer() *userServer {
    return &userServer{
        users: make(map[int64]*pb.User),
    }
}

func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, ok := s.users[req.Id]
    if !ok {
        return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
    }
    
    return &pb.GetUserResponse{User: user}, nil
}

func (s *userServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
    var users []*pb.User
    for _, user := range s.users {
        users = append(users, user)
        if len(users) >= int(req.PageSize) {
            break
        }
    }
    
    return &pb.ListUsersResponse{
        Users:      users,
        TotalCount: int32(len(s.users)),
    }, nil
}

func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    // Validate
    if req.Name == "" {
        return nil, status.Error(codes.InvalidArgument, "name is required")
    }
    if req.Email == "" {
        return nil, status.Error(codes.InvalidArgument, "email is required")
    }
    
    // Create user
    user := &pb.User{
        Id:     int64(len(s.users) + 1),
        Name:   req.Name,
        Email:  req.Email,
        Status: pb.UserStatus_USER_STATUS_ACTIVE,
    }
    s.users[user.Id] = user
    
    return user, nil
}

// Server streaming
func (s *userServer) WatchUsers(req *pb.ListUsersRequest, stream pb.UserService_WatchUsersServer) error {
    // Send existing users
    for _, user := range s.users {
        if err := stream.Send(user); err != nil {
            return err
        }
    }
    
    // In a real app, watch for new users and stream them
    // This is simplified - you'd use channels/pub-sub
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-ticker.C:
            // Check for new users and stream
        }
    }
}

// Client streaming
func (s *userServer) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
    var users []*pb.User
    
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.ListUsersResponse{
                Users:      users,
                TotalCount: int32(len(users)),
            })
        }
        if err != nil {
            return err
        }
        
        user, err := s.CreateUser(stream.Context(), req)
        if err != nil {
            return err
        }
        users = append(users, user)
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    
    grpcServer := grpc.NewServer()
    pb.RegisterUserServiceServer(grpcServer, NewUserServer())
    
    log.Println("gRPC server starting on :50051")
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

gRPC Client

package main

import (
    "context"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    
    pb "github.com/myapp/proto/user"
)

func main() {
    // Connect to server
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("failed to connect: %v", err)
    }
    defer conn.Close()
    
    client := pb.NewUserServiceClient(conn)
    
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // Create user
    user, err := client.CreateUser(ctx, &pb.CreateUserRequest{
        Name:  "John Doe",
        Email: "[email protected]",
    })
    if err != nil {
        log.Fatalf("CreateUser failed: %v", err)
    }
    log.Printf("Created user: %+v", user)
    
    // Get user
    resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: user.Id})
    if err != nil {
        log.Fatalf("GetUser failed: %v", err)
    }
    log.Printf("Got user: %+v", resp.User)
    
    // List users
    listResp, err := client.ListUsers(ctx, &pb.ListUsersRequest{PageSize: 10})
    if err != nil {
        log.Fatalf("ListUsers failed: %v", err)
    }
    log.Printf("Found %d users", listResp.TotalCount)
    
    // Server streaming
    stream, err := client.WatchUsers(ctx, &pb.ListUsersRequest{})
    if err != nil {
        log.Fatalf("WatchUsers failed: %v", err)
    }
    
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("stream error: %v", err)
        }
        log.Printf("Received user: %+v", user)
    }
}

gRPC Interceptors (Middleware)

Server Interceptors

// Logging interceptor
func loggingInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()
    
    resp, err := handler(ctx, req)
    
    log.Printf("Method: %s, Duration: %s, Error: %v",
        info.FullMethod,
        time.Since(start),
        err,
    )
    
    return resp, err
}

// Recovery interceptor
func recoveryInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (resp interface{}, err error) {
    defer func() {
        if r := recover(); r != nil {
            err = status.Errorf(codes.Internal, "panic: %v", r)
            log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
        }
    }()
    
    return handler(ctx, req)
}

// Auth interceptor
func authInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    tokens := md.Get("authorization")
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing token")
    }
    
    claims, err := validateToken(tokens[0])
    if err != nil {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }
    
    // Add claims to context
    ctx = context.WithValue(ctx, userClaimsKey{}, claims)
    
    return handler(ctx, req)
}

// Using interceptors
func main() {
    grpcServer := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            recoveryInterceptor,
            loggingInterceptor,
            authInterceptor,
        ),
        grpc.ChainStreamInterceptor(
            // Stream interceptors
        ),
    )
}

Client Interceptors

func clientLoggingInterceptor(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    log.Printf("Method: %s, Duration: %s, Error: %v", method, time.Since(start), err)
    return err
}

func clientAuthInterceptor(token string) grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        ctx = metadata.AppendToOutgoingContext(ctx, "authorization", token)
        return invoker(ctx, method, req, reply, cc, opts...)
    }
}

// Using client interceptors
conn, err := grpc.Dial("localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(
        clientLoggingInterceptor,
        clientAuthInterceptor("my-token"),
    ),
)

Service Discovery

Using Consul

import (
    "github.com/hashicorp/consul/api"
)

type ServiceRegistry struct {
    client *api.Client
}

func NewServiceRegistry(addr string) (*ServiceRegistry, error) {
    config := api.DefaultConfig()
    config.Address = addr
    
    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }
    
    return &ServiceRegistry{client: client}, nil
}

func (r *ServiceRegistry) Register(name, address string, port int) error {
    registration := &api.AgentServiceRegistration{
        ID:      fmt.Sprintf("%s-%s-%d", name, address, port),
        Name:    name,
        Address: address,
        Port:    port,
        Check: &api.AgentServiceCheck{
            GRPC:                           fmt.Sprintf("%s:%d", address, port),
            Interval:                       "10s",
            DeregisterCriticalServiceAfter: "1m",
        },
    }
    
    return r.client.Agent().ServiceRegister(registration)
}

func (r *ServiceRegistry) Discover(name string) ([]*api.ServiceEntry, error) {
    entries, _, err := r.client.Health().Service(name, "", true, nil)
    return entries, err
}

func (r *ServiceRegistry) Deregister(id string) error {
    return r.client.Agent().ServiceDeregister(id)
}

Load Balancing with gRPC

import (
    "google.golang.org/grpc/resolver"
)

// Custom resolver for service discovery
type consulResolver struct {
    registry *ServiceRegistry
    service  string
    cc       resolver.ClientConn
}

func (r *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {
    entries, err := r.registry.Discover(r.service)
    if err != nil {
        return
    }
    
    var addrs []resolver.Address
    for _, entry := range entries {
        addr := fmt.Sprintf("%s:%d", entry.Service.Address, entry.Service.Port)
        addrs = append(addrs, resolver.Address{Addr: addr})
    }
    
    r.cc.UpdateState(resolver.State{Addresses: addrs})
}

// Using round-robin load balancing
conn, err := grpc.Dial("consul:///user-service",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)

Health Checks

import (
    "google.golang.org/grpc/health"
    "google.golang.org/grpc/health/grpc_health_v1"
)

func main() {
    grpcServer := grpc.NewServer()
    
    // Register your services
    pb.RegisterUserServiceServer(grpcServer, NewUserServer())
    
    // Register health service
    healthServer := health.NewServer()
    grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
    
    // Set service health status
    healthServer.SetServingStatus("user.UserService", grpc_health_v1.HealthCheckResponse_SERVING)
    
    // Update health based on dependencies
    go func() {
        for {
            if dbHealthy() {
                healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
            } else {
                healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
            }
            time.Sleep(10 * time.Second)
        }
    }()
    
    grpcServer.Serve(lis)
}

Circuit Breaker

import "github.com/sony/gobreaker"

type UserClient struct {
    client pb.UserServiceClient
    cb     *gobreaker.CircuitBreaker
}

func NewUserClient(conn *grpc.ClientConn) *UserClient {
    settings := gobreaker.Settings{
        Name:        "UserService",
        MaxRequests: 3,                      // Max requests in half-open state
        Interval:    10 * time.Second,       // Cyclic period of closed state
        Timeout:     30 * time.Second,       // Timeout in open state
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
            return counts.Requests >= 3 && failureRatio >= 0.6
        },
        OnStateChange: func(name string, from, to gobreaker.State) {
            log.Printf("Circuit breaker %s: %s -> %s", name, from, to)
        },
    }
    
    return &UserClient{
        client: pb.NewUserServiceClient(conn),
        cb:     gobreaker.NewCircuitBreaker(settings),
    }
}

func (c *UserClient) GetUser(ctx context.Context, id int64) (*pb.User, error) {
    result, err := c.cb.Execute(func() (interface{}, error) {
        resp, err := c.client.GetUser(ctx, &pb.GetUserRequest{Id: id})
        if err != nil {
            return nil, err
        }
        return resp.User, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.(*pb.User), nil
}

Event-Driven Architecture

Publishing Events

type Event struct {
    ID        string
    Type      string
    Payload   json.RawMessage
    Timestamp time.Time
}

type EventPublisher interface {
    Publish(ctx context.Context, topic string, event *Event) error
}

// NATS implementation
type NATSPublisher struct {
    conn *nats.Conn
}

func (p *NATSPublisher) Publish(ctx context.Context, topic string, event *Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    return p.conn.Publish(topic, data)
}

// Usage in service
func (s *OrderService) CreateOrder(ctx context.Context, order *Order) error {
    // Create order in DB
    if err := s.repo.Create(ctx, order); err != nil {
        return err
    }
    
    // Publish event
    event := &Event{
        ID:        uuid.New().String(),
        Type:      "order.created",
        Payload:   json.RawMessage(mustMarshal(order)),
        Timestamp: time.Now(),
    }
    
    return s.publisher.Publish(ctx, "orders", event)
}

Consuming Events

type EventHandler func(ctx context.Context, event *Event) error

type EventConsumer struct {
    conn     *nats.Conn
    handlers map[string]EventHandler
}

func (c *EventConsumer) Subscribe(topic string, handler EventHandler) error {
    _, err := c.conn.Subscribe(topic, func(msg *nats.Msg) {
        var event Event
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("Failed to unmarshal event: %v", err)
            return
        }
        
        ctx := context.Background()
        if err := handler(ctx, &event); err != nil {
            log.Printf("Handler error: %v", err)
        }
    })
    return err
}

// Handler
func handleOrderCreated(ctx context.Context, event *Event) error {
    var order Order
    if err := json.Unmarshal(event.Payload, &order); err != nil {
        return err
    }
    
    // Process order (send confirmation email, update inventory, etc.)
    return nil
}

Distributed Tracing

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/trace"
    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)

func initTracer() (*trace.TracerProvider, error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, err
    }
    
    tp := trace.NewTracerProvider(
        trace.WithBatcher(exporter),
        trace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("user-service"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    return tp, nil
}

func main() {
    tp, err := initTracer()
    if err != nil {
        log.Fatal(err)
    }
    defer tp.Shutdown(context.Background())
    
    // gRPC server with tracing
    grpcServer := grpc.NewServer(
        grpc.StatsHandler(otelgrpc.NewServerHandler()),
    )
    
    // gRPC client with tracing
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
    )
}

Interview Questions

  • Protocol: gRPC uses HTTP/2, REST typically HTTP/1.1
  • Payload: gRPC uses binary Protobuf, REST uses text (JSON/XML)
  • Streaming: gRPC has native bidirectional streaming
  • Type Safety: gRPC has strong contracts, REST relies on documentation
  • Performance: gRPC is faster due to binary serialization and HTTP/2
  1. Unary: Single request, single response
  2. Server streaming: Single request, stream of responses
  3. Client streaming: Stream of requests, single response
  4. Bidirectional streaming: Stream in both directions
Use google.golang.org/grpc/status package with standard codes:
  • codes.NotFound for missing resources
  • codes.InvalidArgument for bad input
  • codes.Unauthenticated for auth failures
  • codes.PermissionDenied for authorization failures
  • codes.Internal for server errors
  • Circuit Breaker: Prevent cascading failures
  • Retry with backoff: Handle transient failures
  • Timeout: Prevent hanging requests
  • Load balancing: Distribute traffic
  • Service discovery: Dynamic endpoint resolution
  • Health checks: Monitor service availability

Summary

ComponentPurpose
Protocol BuffersDefine service contracts and messages
gRPC ServerImplement service methods
gRPC ClientCall remote services
InterceptorsAdd cross-cutting concerns
Service DiscoveryFind service instances
Circuit BreakerHandle failures gracefully
Health ChecksMonitor service health
Distributed TracingTrack requests across services