Skip to content

API Testing — gRPC & WebSocket

gRPC Testing

Architecture

gRPC tests use generated stubs. Never call the raw channel directly from tests.

Test  →  ServiceStubWrapper  →  Generated Stub  →  gRPC Channel  →  Service

Stub Wrapper

import grpc
import logging
from generated import user_pb2, user_pb2_grpc

log = logging.getLogger(__name__)


class UserGrpcClient:
    def __init__(self, host: str, port: int) -> None:
        self._channel = grpc.insecure_channel(f"{host}:{port}")
        self._stub = user_pb2_grpc.UserServiceStub(self._channel)
        log.debug("gRPC channel opened: %s:%s", host, port)

    def create_user(self, email: str, role: str = "USER") -> user_pb2.UserResponse:
        request = user_pb2.CreateUserRequest(email=email, role=role)
        log.debug("gRPC CreateUser: %s", email)
        return self._stub.CreateUser(request)

    def get_user(self, user_id: str) -> user_pb2.UserResponse:
        request = user_pb2.GetUserRequest(id=user_id)
        return self._stub.GetUser(request)

    def close(self) -> None:
        self._channel.close()
        log.debug("gRPC channel closed")

gRPC Test Examples

import grpc
import pytest


def test_create_user_via_grpc(grpc_client):
    response = grpc_client.create_user(email="grpc-test@example.com")

    assert response.id != ""
    assert response.email == "grpc-test@example.com"


def test_get_nonexistent_user_raises_not_found(grpc_client):
    with pytest.raises(grpc.RpcError) as exc_info:
        grpc_client.get_user("nonexistent-id")

    assert exc_info.value.code() == grpc.StatusCode.NOT_FOUND

Proto Contract Validation

Test that the response contains all expected fields — catches proto schema drift:

def test_user_response_has_required_fields(grpc_client):
    user = grpc_client.create_user(email="contract@example.com")

    assert user.HasField("id")
    assert user.HasField("email")
    assert user.HasField("created_at")
    assert user.role in ("USER", "ADMIN")

WebSocket Testing

WebSocket Client Wrapper

import asyncio
import json
import logging
import websockets

log = logging.getLogger(__name__)


class WebSocketClient:
    def __init__(self, url: str) -> None:
        self._url = url
        self._ws = None

    async def connect(self) -> None:
        self._ws = await websockets.connect(self._url)
        log.debug("WebSocket connected: %s", self._url)

    async def send(self, event: str, data: dict) -> None:
        message = json.dumps({"event": event, "data": data})
        log.debug("WS send: %s", message)
        await self._ws.send(message)

    async def receive(self, timeout: float = 5.0) -> dict:
        raw = await asyncio.wait_for(self._ws.recv(), timeout=timeout)
        message = json.loads(raw)
        log.debug("WS recv: %s", message)
        return message

    async def close(self) -> None:
        if self._ws:
            await self._ws.close()
            log.debug("WebSocket closed")

Event-Driven Test Examples

import pytest


@pytest.mark.asyncio
async def test_subscribe_receives_order_update(ws_client, api_client, verified_user):
    await ws_client.connect()
    await ws_client.send("subscribe", {"channel": f"orders:{verified_user['id']}"})

    confirm = await ws_client.receive(timeout=2.0)
    assert confirm["event"] == "subscribed"

    order = api_client.orders.create(user_id=verified_user["id"])

    update = await ws_client.receive(timeout=5.0)
    assert update["event"] == "order.created"
    assert update["data"]["order_id"] == order["id"]

    await ws_client.close()

Message Validation

def validate_ws_message(message: dict, expected_event: str) -> None:
    assert "event" in message, f"Missing 'event' key in: {message}"
    assert "data" in message, f"Missing 'data' key in: {message}"
    assert message["event"] == expected_event, (
        f"Expected event '{expected_event}', got '{message['event']}'"
    )

Connection Lifecycle Tests

@pytest.mark.asyncio
async def test_unauthorized_connection_rejected(ws_url):
    with pytest.raises(websockets.exceptions.InvalidStatusCode) as exc_info:
        async with websockets.connect(ws_url) as ws:
            pass

    assert exc_info.value.status_code == 401


@pytest.mark.asyncio
async def test_disconnect_cleans_up_subscription(ws_client, verified_user):
    await ws_client.connect()
    await ws_client.send("subscribe", {"channel": f"orders:{verified_user['id']}"})
    await ws_client.close()

    # Reconnect — should not receive stale events from closed session
    await ws_client.connect()
    messages = []
    try:
        msg = await ws_client.receive(timeout=1.0)
        messages.append(msg)
    except asyncio.TimeoutError:
        pass

    assert not any(m.get("event") == "subscribed" for m in messages)

gRPC vs WebSocket Testing Comparison

Aspect gRPC WebSocket
Protocol HTTP/2 + Protobuf WS frames + JSON/binary
Test style Synchronous RPC Async event-driven
Error model grpc.StatusCode Custom event + close code
Tooling Generated stubs websockets + asyncio
CI complexity Needs proto compilation Standard Python