Skip to main content

Command Palette

Search for a command to run...

gRPC Streaming: Bidirectional Communication Patterns

Client and server streaming for high-throughput microservices

Published
7 min read
T

Welcome to TopperBlog! 👋

I'm a tech content creator passionate about helping developers level up their careers and master cutting-edge technologies.

🎯 What I Write About: • AI/ML Engineering & LLMs • Web3 & Blockchain Development
• System Design & Architecture • Interview Preparation (FAANG) • Freelancing & Remote Work • Modern Tech Stacks (Next.js, React, Rust, TypeScript) • Performance Optimization & Best Practices

💼 Mission: Sharing practical, actionable insights that accelerate your tech career and maximize your earning potential.

📚 15+ In-Depth Guides covering everything from earning $10k/month as a freelancer to cracking FAANG interviews.

🌐 Let's connect and grow together in this amazing tech journey!

#TechBlogger #SoftwareEngineering #CareerGrowth #WebDevelopment #AIEngineering

Content Role: pillar

gRPC Streaming: Bidirectional Communication Patterns

Client and server streaming for high-throughput microservices

Traditional request-response APIs struggle with real-time data requirements, large payload transfers, and continuous data flows. When building microservices that need to process streaming telemetry, deliver live updates, or handle bulk data transfers, the unary RPC model creates unnecessary overhead and latency. gRPC streaming patterns solve these problems by enabling efficient, bidirectional communication between services.

The Problem with Unary RPCs

Unary RPCs follow a simple request-response model: the client sends one request, the server sends one response. This works well for simple CRUD operations but breaks down in several scenarios:

High-frequency updates: Polling for changes creates network overhead and increases latency. A monitoring dashboard polling metrics every second generates unnecessary traffic and delays data delivery.

Large dataset transfers: Sending gigabytes of data in a single response consumes memory on both client and server, potentially causing timeouts or out-of-memory errors.

Real-time bidirectional communication: Chat applications, collaborative editing tools, and live gaming require simultaneous sending and receiving of messages, which unary RPCs cannot efficiently support.

Backpressure handling: When a client cannot process responses as fast as the server generates them, unary RPCs provide no mechanism for flow control.

Understanding gRPC Streaming Patterns

gRPC provides three streaming patterns beyond unary RPCs:

Server streaming: The client sends one request, and the server responds with a stream of messages. Ideal for delivering real-time updates, paginated results, or large file downloads.

Client streaming: The client sends a stream of messages, and the server responds with a single message. Perfect for uploading large files, batch processing, or aggregating data from multiple sources.

Bidirectional streaming: Both client and server send streams of messages independently. Enables real-time chat, live collaboration, and complex request-response patterns.

Implementing Server Streaming

Server streaming allows a single client request to trigger multiple server responses over time. Here's a practical example of streaming log entries:

syntax = "proto3";

service LogService {
  rpc StreamLogs(LogRequest) returns (stream LogEntry) {}
}

message LogRequest {
  string service_name = 1;
  string level = 2;
  int64 since_timestamp = 3;
}

message LogEntry {
  int64 timestamp = 1;
  string level = 2;
  string message = 3;
  map<string, string> metadata = 4;
}

Server implementation in TypeScript:

import * as grpc from '@grpc/grpc-js';
import { ServerWritableStream } from '@grpc/grpc-js';

class LogServiceImpl {
  async streamLogs(
    call: ServerWritableStream<LogRequest, LogEntry>
  ): Promise<void> {
    const { service_name, level, since_timestamp } = call.request;

    // Simulate streaming logs from a data source
    const logStream = this.getLogStream(service_name, level, since_timestamp);

    for await (const log of logStream) {
      // Check if client cancelled the stream
      if (call.cancelled) {
        console.log('Client cancelled stream');
        break;
      }

      // Write log entry to stream
      call.write({
        timestamp: log.timestamp,
        level: log.level,
        message: log.message,
        metadata: log.metadata
      });

      // Implement backpressure handling
      if (!call.writable) {
        await new Promise(resolve => call.once('drain', resolve));
      }
    }

    call.end();
  }

  private async *getLogStream(
    serviceName: string,
    level: string,
    sinceTimestamp: number
  ): AsyncGenerator<LogEntry> {
    // Implementation would connect to actual log source
    // This is a simplified example
    while (true) {
      const logs = await this.fetchLogs(serviceName, level, sinceTimestamp);
      for (const log of logs) {
        yield log;
      }
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  }
}

Client implementation:

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';

async function consumeLogStream() {
  const packageDefinition = protoLoader.loadSync('log_service.proto');
  const proto = grpc.loadPackageDefinition(packageDefinition);

  const client = new proto.LogService(
    'localhost:50051',
    grpc.credentials.createInsecure()
  );

  const call = client.streamLogs({
    service_name: 'api-gateway',
    level: 'ERROR',
    since_timestamp: Date.now() - 3600000
  });

  call.on('data', (logEntry: LogEntry) => {
    console.log(`[${logEntry.level}] ${logEntry.message}`);
  });

  call.on('error', (error: grpc.ServiceError) => {
    console.error('Stream error:', error.message);
  });

  call.on('end', () => {
    console.log('Stream ended');
  });
}

Implementing Client Streaming

Client streaming enables clients to send multiple messages while the server processes them and returns a single response. This pattern works well for file uploads or metric aggregation:

service MetricsService {
  rpc RecordMetrics(stream MetricPoint) returns (MetricsSummary) {}
}

message MetricPoint {
  string metric_name = 1;
  double value = 2;
  int64 timestamp = 3;
  map<string, string> tags = 4;
}

message MetricsSummary {
  int32 points_received = 1;
  map<string, double> aggregates = 2;
}

Server implementation:

import { ServerReadableStream } from '@grpc/grpc-js';

class MetricsServiceImpl {
  async recordMetrics(
    call: ServerReadableStream<MetricPoint, MetricsSummary>,
    callback: grpc.sendUnaryData<MetricsSummary>
  ): Promise<void> {
    const aggregates = new Map<string, number[]>();
    let pointsReceived = 0;

    call.on('data', (point: MetricPoint) => {
      pointsReceived++;

      if (!aggregates.has(point.metric_name)) {
        aggregates.set(point.metric_name, []);
      }
      aggregates.get(point.metric_name)!.push(point.value);
    });

    call.on('end', () => {
      const summary: MetricsSummary = {
        points_received: pointsReceived,
        aggregates: {}
      };

      // Calculate averages
      for (const [metric, values] of aggregates) {
        const avg = values.reduce((a, b) => a + b, 0) / values.length;
        summary.aggregates[metric] = avg;
      }

      callback(null, summary);
    });

    call.on('error', (error) => {
      console.error('Client stream error:', error);
      callback(error, null);
    });
  }
}

Implementing Bidirectional Streaming

Bidirectional streaming enables full-duplex communication where both sides can send messages independently:

service ChatService {
  rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}

message ChatMessage {
  string user_id = 1;
  string room_id = 2;
  string content = 3;
  int64 timestamp = 4;
}

Server implementation with room-based message routing:

import { ServerDuplexStream } from '@grpc/grpc-js';

class ChatServiceImpl {
  private rooms = new Map<string, Set<ServerDuplexStream<ChatMessage, ChatMessage>>>();

  chat(call: ServerDuplexStream<ChatMessage, ChatMessage>): void {
    let currentRoom: string | null = null;

    call.on('data', (message: ChatMessage) => {
      // Handle room changes
      if (currentRoom !== message.room_id) {
        if (currentRoom) {
          this.rooms.get(currentRoom)?.delete(call);
        }
        currentRoom = message.room_id;
        if (!this.rooms.has(currentRoom)) {
          this.rooms.set(currentRoom, new Set());
        }
        this.rooms.get(currentRoom)!.add(call);
      }

      // Broadcast message to all clients in room
      const roomClients = this.rooms.get(message.room_id);
      if (roomClients) {
        for (const client of roomClients) {
          if (client !== call && client.writable) {
            client.write(message);
          }
        }
      }
    });

    call.on('end', () => {
      if (currentRoom) {
        this.rooms.get(currentRoom)?.delete(call);
      }
      call.end();
    });

    call.on('error', (error) => {
      console.error('Bidirectional stream error:', error);
      if (currentRoom) {
        this.rooms.get(currentRoom)?.delete(call);
      }
    });
  }
}

Common Pitfalls

Ignoring backpressure: Writing to streams without checking writability can cause memory issues. Always monitor the writable property and listen for drain events.

Missing error handling: Streams can fail at any point. Implement comprehensive error handlers on both client and server to prevent resource leaks.

Not handling cancellation: Clients may cancel streams unexpectedly. Check call.cancelled regularly in server streaming to avoid wasted processing.

Unbounded streams: Streams without termination conditions consume resources indefinitely. Implement timeouts, message limits, or explicit termination signals.

Blocking operations in stream handlers: Synchronous blocking operations in stream callbacks prevent other streams from processing. Use async/await and non-blocking I/O.

Inadequate connection management: Failing to clean up stream resources on disconnection leads to memory leaks. Always remove references to closed streams.

Best Practices Checklist

  • [ ] Implement proper backpressure handling using writable checks and drain events
  • [ ] Add comprehensive error handling for all stream events (data, error, end)
  • [ ] Set appropriate deadlines/timeouts for streams to prevent indefinite connections
  • [ ] Monitor stream cancellation and clean up resources accordingly
  • [ ] Use connection pooling for client-side stream management
  • [ ] Implement health checks to detect and recover from connection failures
  • [ ] Add metrics and logging for stream lifecycle events
  • [ ] Test stream behavior under network failures and high load
  • [ ] Document expected message ordering and delivery guarantees
  • [ ] Implement authentication and authorization for streaming endpoints
  • [ ] Use message batching where appropriate to reduce overhead
  • [ ] Consider using interceptors for cross-cutting concerns like logging and tracing

FAQ

When should I use server streaming vs polling? Use server streaming when you need real-time updates with minimal latency. Server streaming eliminates polling overhead and delivers updates immediately. Polling is acceptable for infrequent updates (>30 seconds) or when clients need to control update timing.

How do I handle stream reconnection after network failures? Implement exponential backoff retry logic on the client side. Include sequence numbers or timestamps in messages to enable resumption from the last successfully processed message. Consider using persistent storage to track stream state.

What's the maximum message size for streaming? gRPC has a default maximum message size of 4MB, but this is configurable. For larger payloads, chunk data into smaller messages. Streaming doesn't eliminate size limits per message, but allows unlimited total stream size.

Can I use streaming with HTTP/1.1? gRPC streaming requires HTTP/2 for multiplexing and flow control. HTTP/1.1 doesn't support the necessary features. Ensure your infrastructure supports HTTP/2 before implementing streaming patterns.

How do I implement authentication for long-lived streams? Use token-based authentication with refresh mechanisms. Include authentication metadata in the initial stream request and implement token refresh logic that doesn't interrupt the stream. Consider using interceptors for automatic token renewal.

What's the performance overhead of streaming vs unary RPCs? Streaming has lower per-message overhead once the connection is established. Initial connection setup costs are similar, but streaming amortizes this cost over many messages. For single request-response operations, unary RPCs are more efficient.

How do I test streaming implementations? Create mock streams that simulate various scenarios: slow consumers, network interruptions, and cancellations. Use tools like grpcurl for manual testing and implement integration tests that verify stream behavior under different conditions. Monitor resource usage during load testing to identify leaks.