Skip to main content

Streaming

Connect supports several types of streaming RPCs. Streaming is exciting it's fundamentally different from the web's typical request-response model, and in the right circumstances it can be very efficient. If you've been writing the same pagination or polling code for years, streaming may look like the answer to all your problems.

Temper your enthusiasm. Streaming also comes with many drawbacks:

  • It requires excellent HTTP libraries. At the very least, the client and server must be able to stream HTTP/1.1 request and response bodies. For bidirectional streaming, both parties must support HTTP/2. Long-lived streams are much more likely to encounter bugs and edge cases in HTTP/2 flow control.
  • It requires excellent proxies. Every proxy between the server and client including those run by cloud providers must support HTTP/2.
  • It weakens the protections offered to your unary handlers, since streaming typically requires proxies to be configured with much longer timeouts.
  • It requires complex tools. Streaming RPC protocols are much more involved than unary protocols, so cURL and your browser's network inspector are useless.

In general, streaming ties your application more closely to your networking infrastructure and makes your application inaccessible to less-sophisticated clients. You can minimize these downsides by keeping streams short-lived.

Also, if your http.Server has the ReadTimeout or WriteTimeout field configured, it applies to the entire operation duration, even for streaming calls. See the FAQ for more information.

All that said, connect-go fully supports all three types of streaming. All streaming subtypes work with the gRPC, gRPC-Web, and Connect protocols.

Streaming variants

In client streaming, the client sends multiple messages. Once the server receives all the messages, it responds with a single message. In Protobuf schemas, client streaming methods look like this:

service GreetService {
rpc Greet(stream GreetRequest) returns (GreetResponse) {}
}

In Go, client streaming RPCs use the ClientStream and ClientStreamForClient types.

In server streaming, the client sends a single message and the server responds with multiple messages. In Protobuf schemas, server streaming methods look like this:

service GreetService {
rpc Greet(GreetRequest) returns (stream GreetResponse) {}
}

In Go, server streaming RPCs use the ServerStream and ServerStreamForClient types.

In bidirectional streaming (often called bidi), the client and server may both send multiple messages. Often, the exchange is structured like a conversation: the client sends a message, the server responds, the client sends another message, and so on. Keep in mind that this always requires end-to-end HTTP/2 support (regardless of RPC protocol)! net/http clients and servers support HTTP/2 by default if you're using TLS, but they need some special configuration to support HTTP/2 without TLS. In Protobuf schemas, bidi streaming methods look like this:

service GreetService {
rpc Greet(stream GreetRequest) returns (stream GreetResponse) {}
}

In Go, bidi streaming RPCs use the BidiStream and BidiStreamForClient types.

HTTP representation

In all three protocols, streaming responses always have an HTTP status of 200 OK. This may seem unusual, but it's unavoidable: the server may encounter an error after sending a few messages, when the HTTP status has already been sent to the client. Rather than relying on the HTTP status, streaming handlers encode any errors in HTTP trailers or at the end of the response body (depending on the protocol).

The body of streaming requests and responses envelopes your schema-defined messages with a few bytes of protocol-specific binary framing data. Because of the interspersed framing data, the payloads are no longer valid Protobuf or JSON: instead, they use protocol-specific Content-Types like application/connect+proto, application/grpc+json, or application/grpc-web+proto.

Headers and trailers

As in unary RPC, headers are plain HTTP headers, with the same ASCII-only restrictions and binary header support.

Each protocol sends response trailers differently: they may be sent as HTTP trailers, a block of HTTP-formatted data at the end of the response body, or a blob of JSON at the end of the body. Regardless of the wire encoding, all three protocols give trailers the same semantics and restrictions as headers.

Each of connect-go's stream types either exposes headers and trailers on a Request or Response (just like unary RPCs), or the stream itself has methods to access metadata. These APIs work identically regardless of the protocol in use.

Interceptors

Streaming interceptors are naturally more complex than unary interceptors. Rather than using UnaryInterceptorFunc, streaming interceptors must implement the full Interceptor interface. This may require implementing a StreamingClientConn or StreamingHandlerConn wrapper.

An example

Let's start by amending the GreetService we defined in Getting Started to make the Greet method use client streaming:

syntax = "proto3";

package greet.v1;

option go_package = "example/gen/greet/v1;greetv1";

message GreetRequest {
string name = 1;
}

message GreetResponse {
string greeting = 1;
}

service GreetService {
rpc Greet(stream GreetRequest) returns (GreetResponse) {}
}

After running buf generate to update our generated code, we can amend our handler implementation in cmd/server/main.go:

package main

import (
"context"
"errors"
"fmt"
"log"
"net/http"
"strings"

greetv1 "example/gen/greet/v1"
"example/gen/greet/v1/greetv1connect"

"connectrpc.com/connect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)

type GreetServer struct{}

func (s *GreetServer) Greet(
ctx context.Context,
stream *connect.ClientStream[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
log.Println("Request headers: ", stream.RequestHeader())
var greeting strings.Builder
for stream.Receive() {
g := fmt.Sprintf("Hello, %s!\n", stream.Msg().Name)
if _, err := greeting.WriteString(g); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}
if err := stream.Err(); err != nil {
return nil, connect.NewError(connect.CodeUnknown, err)
}
res := connect.NewResponse(&greetv1.GreetResponse{
Greeting: greeting.String(),
})
res.Header().Set("Greet-Version", "v1")
return res, nil
}

func main() {
greeter := &GreetServer{}
mux := http.NewServeMux()
path, handler := greetv1connect.NewGreetServiceHandler(greeter)
mux.Handle(path, handler)
http.ListenAndServe(
"localhost:8080",
// Use h2c so we can serve HTTP/2 without TLS.
h2c.NewHandler(mux, &http2.Server{}),
)
}

Now that we've implemented our new client streaming RPC, we'll also need to update our simple authentication interceptor. To support streaming, we must implement the full Interceptor interface:

const tokenHeader = "Acme-Token"

var errNoToken = errors.New("no token provided")

type authInterceptor struct {}

func NewAuthInterceptor() *authInterceptor {
return &authInterceptor{}
}

func (i *authInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
// Same as previous UnaryInterceptorFunc.
return connect.UnaryFunc(func(
ctx context.Context,
req connect.AnyRequest,
) (connect.AnyResponse, error) {
if req.Spec().IsClient {
// Send a token with client requests.
req.Header().Set(tokenHeader, "sample")
} else if req.Header().Get(tokenHeader) == "" {
// Check token in handlers.
return nil, connect.NewError(connect.CodeUnauthenticated, errNoToken)
}
return next(ctx, req)
})
}

func (*authInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return connect.StreamingClientFunc(func(
ctx context.Context,
spec connect.Spec,
) connect.StreamingClientConn {
conn := next(ctx, spec)
conn.RequestHeader().Set(tokenHeader, "sample")
return conn
})
}

func (i *authInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return connect.StreamingHandlerFunc(func(
ctx context.Context,
conn connect.StreamingHandlerConn,
) error {
if conn.RequestHeader().Get(tokenHeader) == "" {
return connect.NewError(connect.CodeUnauthenticated, errNoToken)
}
return next(ctx, conn)
})
}

We apply our interceptor just as we did before, using WithInterceptors.