I found I often need to write unit-tests for gRPC services, so I guess it is good to write down all the boilerplates here.
This is the play using bufconn
to mock the dialer
.
import (
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
srvpb "your-service-package/proto/srvpb"
)
// Assume we are testing whether an client side concurrency limiter interceptor
// is working properly. If the client tries to make more than allowed number of
// concurrent requests, server shall return error.
//
// This is an example where just 'mocking the client side interface' is not
// enough to cover the logic.
func TestConcurrencyLimit(t *testing.T) {
ctx := context.Background()
serverAllowedConcurrency := int64(10)
// newTestHandler must return an impl of the gRPC service server side interface.
h := newTestHandler(serverAllowedConcurrency)
srv := grpc.NewServer()
srvpb.RegisterYourServiceServer(srv, h)
// Dialer setup. Assume we reuse dialer in test cases.
listener := bufconn.Listen(1024)
go srv.Serve(listener)
defer srv.GracefulStop()
dialer := func(ctx context.Context, addr string) (net.Conn, error) {
// You can check the `addr` here if you want to verify the client is using the correct address.
// E.g. (for my use case in #company-s)
// assert.Equal(t, "fr-shard-metadata-qa.<region>.<env>.<domain>.stripe.net:8081", addr)
// Ofc you can also simulate 'dial timeout' by hanging here.
return listener.Dial()
}
t.Run("Case 1", func(t *testing.T) {
conn, err := grpc.DialContext(ctx, "bufconn",
append(l.DialOptions(),
// IMPORTANT: use the dailer
grpc.WithContextDialer(dialer),
// For testing, you may need to explicitly specify the
// transportation credential to 'insecure'.
grpc.WithTransportCredentials(insecure.NewCredentials()),
// You can add more options for testing. E.g. interceptors, etc.
)...,
)
assert.NoError(t, err)
defer conn.Close()
client := srvpb.NewYourServiceClient(conn)
// Now, you can call RPCs through `client`.
})
}
// testHandler is a test implementation of the YourService interface. It has
// a semaphore to limit the number of concurrent requests. Once the semaphore is
// exhausted, it will return a ResourceExhausted error.
type testHandler struct {
srv.UnimplementedYourServiceServer
slots *semaphore.Weighted
}
func newTestHandler(slots int64) *testHandler {
return &testHandler{slots: semaphore.NewWeighted(slots)}
}
func (h *testHandler) SayHello(ctx context.Context, in *srvpb.SayHelloRequest) (*srvpb.SayHelloResponse, error) {
// Acquire a slot from the semaphore. If none are available, return a
// ResourceExhausted error.
if h.slots.TryAcquire(1) {
defer h.slots.Release(1)
// Sleep for a short time to simulate a slow request.
time.Sleep(100 * time.Millisecond)
return &srvpb.SayHelloResponse{}, nil
} else {
return nil, status.Errorf(codes.ResourceExhausted, "slots exhausted")
}
}
// for this specific example, a `reset` method on the handler is recommended
// so the same server can be reused across test cases.