Microservices & gRPC
Go is the language of choice for building microservices due to its simplicity, performance, and excellent concurrency support. This chapter covers gRPC, Protocol Buffers, and essential microservices patterns.gRPC Fundamentals
gRPC is a high-performance RPC framework that uses HTTP/2 and Protocol Buffers.Why gRPC?
| Feature | gRPC | REST |
|---|---|---|
| Protocol | HTTP/2 | HTTP/1.1 |
| Payload | Binary (Protobuf) | Text (JSON/XML) |
| Streaming | Bidirectional | Limited |
| Code Generation | Yes | Optional |
| Type Safety | Strong | Weak |
| Performance | Faster | Slower |
Protocol Buffers
Protocol Buffers (protobuf) is Google’s language-neutral serialization format.Copy
// user.proto
syntax = "proto3";
package user;
option go_package = "github.com/myapp/proto/user";
// User message
message User {
int64 id = 1;
string name = 2;
string email = 3;
repeated string roles = 4;
UserStatus status = 5;
google.protobuf.Timestamp created_at = 6;
}
enum UserStatus {
USER_STATUS_UNSPECIFIED = 0;
USER_STATUS_ACTIVE = 1;
USER_STATUS_INACTIVE = 2;
USER_STATUS_BANNED = 3;
}
// Request/Response messages
message GetUserRequest {
int64 id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
// Service definition
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
rpc CreateUser(CreateUserRequest) returns (User);
rpc UpdateUser(User) returns (User);
rpc DeleteUser(GetUserRequest) returns (google.protobuf.Empty);
// Streaming
rpc WatchUsers(ListUsersRequest) returns (stream User);
rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
}
Generating Go Code
Copy
# Install protoc compiler and plugins
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# Generate code
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user.proto
gRPC Server
Implementing the Service
Copy
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "github.com/myapp/proto/user"
)
type userServer struct {
pb.UnimplementedUserServiceServer
users map[int64]*pb.User
}
func NewUserServer() *userServer {
return &userServer{
users: make(map[int64]*pb.User),
}
}
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
user, ok := s.users[req.Id]
if !ok {
return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
var users []*pb.User
for _, user := range s.users {
users = append(users, user)
if len(users) >= int(req.PageSize) {
break
}
}
return &pb.ListUsersResponse{
Users: users,
TotalCount: int32(len(s.users)),
}, nil
}
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// Validate
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "name is required")
}
if req.Email == "" {
return nil, status.Error(codes.InvalidArgument, "email is required")
}
// Create user
user := &pb.User{
Id: int64(len(s.users) + 1),
Name: req.Name,
Email: req.Email,
Status: pb.UserStatus_USER_STATUS_ACTIVE,
}
s.users[user.Id] = user
return user, nil
}
// Server streaming
func (s *userServer) WatchUsers(req *pb.ListUsersRequest, stream pb.UserService_WatchUsersServer) error {
// Send existing users
for _, user := range s.users {
if err := stream.Send(user); err != nil {
return err
}
}
// In a real app, watch for new users and stream them
// This is simplified - you'd use channels/pub-sub
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-ticker.C:
// Check for new users and stream
}
}
}
// Client streaming
func (s *userServer) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
var users []*pb.User
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.ListUsersResponse{
Users: users,
TotalCount: int32(len(users)),
})
}
if err != nil {
return err
}
user, err := s.CreateUser(stream.Context(), req)
if err != nil {
return err
}
users = append(users, user)
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
log.Println("gRPC server starting on :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
gRPC Client
Copy
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/myapp/proto/user"
)
func main() {
// Connect to server
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create user
user, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: "John Doe",
Email: "[email protected]",
})
if err != nil {
log.Fatalf("CreateUser failed: %v", err)
}
log.Printf("Created user: %+v", user)
// Get user
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: user.Id})
if err != nil {
log.Fatalf("GetUser failed: %v", err)
}
log.Printf("Got user: %+v", resp.User)
// List users
listResp, err := client.ListUsers(ctx, &pb.ListUsersRequest{PageSize: 10})
if err != nil {
log.Fatalf("ListUsers failed: %v", err)
}
log.Printf("Found %d users", listResp.TotalCount)
// Server streaming
stream, err := client.WatchUsers(ctx, &pb.ListUsersRequest{})
if err != nil {
log.Fatalf("WatchUsers failed: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("stream error: %v", err)
}
log.Printf("Received user: %+v", user)
}
}
gRPC Interceptors (Middleware)
Server Interceptors
Copy
// Logging interceptor
func loggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("Method: %s, Duration: %s, Error: %v",
info.FullMethod,
time.Since(start),
err,
)
return resp, err
}
// Recovery interceptor
func recoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
err = status.Errorf(codes.Internal, "panic: %v", r)
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
}
}()
return handler(ctx, req)
}
// Auth interceptor
func authInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
claims, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// Add claims to context
ctx = context.WithValue(ctx, userClaimsKey{}, claims)
return handler(ctx, req)
}
// Using interceptors
func main() {
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
recoveryInterceptor,
loggingInterceptor,
authInterceptor,
),
grpc.ChainStreamInterceptor(
// Stream interceptors
),
)
}
Client Interceptors
Copy
func clientLoggingInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %s, Error: %v", method, time.Since(start), err)
return err
}
func clientAuthInterceptor(token string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", token)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
// Using client interceptors
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(
clientLoggingInterceptor,
clientAuthInterceptor("my-token"),
),
)
Service Discovery
Using Consul
Copy
import (
"github.com/hashicorp/consul/api"
)
type ServiceRegistry struct {
client *api.Client
}
func NewServiceRegistry(addr string) (*ServiceRegistry, error) {
config := api.DefaultConfig()
config.Address = addr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceRegistry{client: client}, nil
}
func (r *ServiceRegistry) Register(name, address string, port int) error {
registration := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%s-%s-%d", name, address, port),
Name: name,
Address: address,
Port: port,
Check: &api.AgentServiceCheck{
GRPC: fmt.Sprintf("%s:%d", address, port),
Interval: "10s",
DeregisterCriticalServiceAfter: "1m",
},
}
return r.client.Agent().ServiceRegister(registration)
}
func (r *ServiceRegistry) Discover(name string) ([]*api.ServiceEntry, error) {
entries, _, err := r.client.Health().Service(name, "", true, nil)
return entries, err
}
func (r *ServiceRegistry) Deregister(id string) error {
return r.client.Agent().ServiceDeregister(id)
}
Load Balancing with gRPC
Copy
import (
"google.golang.org/grpc/resolver"
)
// Custom resolver for service discovery
type consulResolver struct {
registry *ServiceRegistry
service string
cc resolver.ClientConn
}
func (r *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {
entries, err := r.registry.Discover(r.service)
if err != nil {
return
}
var addrs []resolver.Address
for _, entry := range entries {
addr := fmt.Sprintf("%s:%d", entry.Service.Address, entry.Service.Port)
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
// Using round-robin load balancing
conn, err := grpc.Dial("consul:///user-service",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
Health Checks
Copy
import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
grpcServer := grpc.NewServer()
// Register your services
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
// Register health service
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
// Set service health status
healthServer.SetServingStatus("user.UserService", grpc_health_v1.HealthCheckResponse_SERVING)
// Update health based on dependencies
go func() {
for {
if dbHealthy() {
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
} else {
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
}
time.Sleep(10 * time.Second)
}
}()
grpcServer.Serve(lis)
}
Circuit Breaker
Copy
import "github.com/sony/gobreaker"
type UserClient struct {
client pb.UserServiceClient
cb *gobreaker.CircuitBreaker
}
func NewUserClient(conn *grpc.ClientConn) *UserClient {
settings := gobreaker.Settings{
Name: "UserService",
MaxRequests: 3, // Max requests in half-open state
Interval: 10 * time.Second, // Cyclic period of closed state
Timeout: 30 * time.Second, // Timeout in open state
ReadyToTrip: func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 3 && failureRatio >= 0.6
},
OnStateChange: func(name string, from, to gobreaker.State) {
log.Printf("Circuit breaker %s: %s -> %s", name, from, to)
},
}
return &UserClient{
client: pb.NewUserServiceClient(conn),
cb: gobreaker.NewCircuitBreaker(settings),
}
}
func (c *UserClient) GetUser(ctx context.Context, id int64) (*pb.User, error) {
result, err := c.cb.Execute(func() (interface{}, error) {
resp, err := c.client.GetUser(ctx, &pb.GetUserRequest{Id: id})
if err != nil {
return nil, err
}
return resp.User, nil
})
if err != nil {
return nil, err
}
return result.(*pb.User), nil
}
Event-Driven Architecture
Publishing Events
Copy
type Event struct {
ID string
Type string
Payload json.RawMessage
Timestamp time.Time
}
type EventPublisher interface {
Publish(ctx context.Context, topic string, event *Event) error
}
// NATS implementation
type NATSPublisher struct {
conn *nats.Conn
}
func (p *NATSPublisher) Publish(ctx context.Context, topic string, event *Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
return p.conn.Publish(topic, data)
}
// Usage in service
func (s *OrderService) CreateOrder(ctx context.Context, order *Order) error {
// Create order in DB
if err := s.repo.Create(ctx, order); err != nil {
return err
}
// Publish event
event := &Event{
ID: uuid.New().String(),
Type: "order.created",
Payload: json.RawMessage(mustMarshal(order)),
Timestamp: time.Now(),
}
return s.publisher.Publish(ctx, "orders", event)
}
Consuming Events
Copy
type EventHandler func(ctx context.Context, event *Event) error
type EventConsumer struct {
conn *nats.Conn
handlers map[string]EventHandler
}
func (c *EventConsumer) Subscribe(topic string, handler EventHandler) error {
_, err := c.conn.Subscribe(topic, func(msg *nats.Msg) {
var event Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("Failed to unmarshal event: %v", err)
return
}
ctx := context.Background()
if err := handler(ctx, &event); err != nil {
log.Printf("Handler error: %v", err)
}
})
return err
}
// Handler
func handleOrderCreated(ctx context.Context, event *Event) error {
var order Order
if err := json.Unmarshal(event.Payload, &order); err != nil {
return err
}
// Process order (send confirmation email, update inventory, etc.)
return nil
}
Distributed Tracing
Copy
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
func initTracer() (*trace.TracerProvider, error) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://localhost:14268/api/traces"),
))
if err != nil {
return nil, err
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exporter),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("user-service"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
func main() {
tp, err := initTracer()
if err != nil {
log.Fatal(err)
}
defer tp.Shutdown(context.Background())
// gRPC server with tracing
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
// gRPC client with tracing
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
}
Interview Questions
What's the difference between gRPC and REST?
What's the difference between gRPC and REST?
- Protocol: gRPC uses HTTP/2, REST typically HTTP/1.1
- Payload: gRPC uses binary Protobuf, REST uses text (JSON/XML)
- Streaming: gRPC has native bidirectional streaming
- Type Safety: gRPC has strong contracts, REST relies on documentation
- Performance: gRPC is faster due to binary serialization and HTTP/2
What are the four types of gRPC methods?
What are the four types of gRPC methods?
- Unary: Single request, single response
- Server streaming: Single request, stream of responses
- Client streaming: Stream of requests, single response
- Bidirectional streaming: Stream in both directions
How do you handle errors in gRPC?
How do you handle errors in gRPC?
Use
google.golang.org/grpc/status package with standard codes:codes.NotFoundfor missing resourcescodes.InvalidArgumentfor bad inputcodes.Unauthenticatedfor auth failurescodes.PermissionDeniedfor authorization failurescodes.Internalfor server errors
What patterns do you use for service-to-service communication?
What patterns do you use for service-to-service communication?
- Circuit Breaker: Prevent cascading failures
- Retry with backoff: Handle transient failures
- Timeout: Prevent hanging requests
- Load balancing: Distribute traffic
- Service discovery: Dynamic endpoint resolution
- Health checks: Monitor service availability
Summary
| Component | Purpose |
|---|---|
| Protocol Buffers | Define service contracts and messages |
| gRPC Server | Implement service methods |
| gRPC Client | Call remote services |
| Interceptors | Add cross-cutting concerns |
| Service Discovery | Find service instances |
| Circuit Breaker | Handle failures gracefully |
| Health Checks | Monitor service health |
| Distributed Tracing | Track requests across services |