This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Streaming Recognition

Describes how to stream audio to Transcribe server.
  • The following example shows how to transcribe a audio stream using Transcribe’s StreamingRecognize request. The stream can come from a file on disk or be directly from a microphone in real time.

Streaming from an audio file

  • We support several headered file formats including WAV, MP3, FLAC etc. For more details, please see the protocol buffer specification here.

  • The examples below use a WAV file as input to the streaming recognition. We will query the server for available models and use the first model to transcribe the speech.

import grpc
import cobaltspeech.transcribe.v5.transcribe_pb2_grpc as stub
import cobaltspeech.transcribe.v5.transcribe_pb2 as transcribe

serverAddress = "localhost:2727"

# Using a channel without TLS enabled.
channel = grpc.insecure_channel(serverAddress)
client = stub.TranscribeServiceStub(channel)

# Get server version.
versionResp = client.Version(transcribe.VersionRequest())
print(versionResp)

# Get list of models on the server.
modelResp = client.ListModels(transcribe.ListModelsRequest())
for model in modelResp.models:
    print(model)

# Select a model ID from the list above. Going with the first model
# in this example.
modelID = modelResp.models[0].id

# Set the recognition config. We don't set the audio format and let the
# server auto-detect the format from the file header.
cfg = transcribe.RecognitionConfig(
    model_id=modelID,
)

# Open audio file.
audio = open("test.wav", "rb")

# The first request to the server should only contain the
# recognition configuration. Subsequent requests should contain
# audio bytes. We can write a simple generator to do this.
def stream(cfg, audio, bufferSize=1024):
    yield transcribe.StreamingRecognizeRequest(config=cfg)
    
    data = audio.read(bufferSize)
    while len(data) > 0:
        yield transcribe.StreamingRecognizeRequest(
          audio=transcribe.RecognitionAudio(data=data),
        )
        data = audio.read(bufferSize)

# We also define a callback function to execute for each response.
# The example below just prints the formatted transcript to stdout.
def processResponse(resp):
    result = resp.result
    hyp = result.alternatives[0]                    # 1-best hypothesis.
    transcript = hyp.transcript_formatted           # Formatted transcript.
    start = hyp.start_time_ms / 1000.0              # Converting to seconds.
    end = start + hyp.duration_ms / 1000.0          # Converting to seconds.
    newLine = "\r" if result.is_partial else "\n\n" # Will not move to new line for partial results.
    print(f"[{start:0.2f}:{end:0.2f}] {transcript}", end=newLine)

# Streaming requests to the server.
for resp in client.StreamingRecognize(stream(cfg, audio)):
    processResponse(resp)
package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"sync"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	transcribe "github.com/cobaltspeech/go-genproto/cobaltspeech/transcribe/v5"
)

func main() {
	const (
		serverAddress = "localhost:2727"
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()), // Using a channel without TLS enabled.
		grpc.WithBlock(),
		grpc.WithReturnConnectionError(),
		grpc.FailOnNonTempDialError(true),
	}

	conn, err := grpc.DialContext(ctx, serverAddress, opts...)
	if err != nil {
		fmt.Printf("failed to dial gRPC connection: %v\n", err)
		os.Exit(1)
	}

	client := transcribe.NewTranscribeServiceClient(conn)

	// Get server version.
	versionResp, err := client.Version(ctx, &transcribe.VersionRequest{})
	if err != nil {
		fmt.Printf("failed to get server version: %v\n", err)
		os.Exit(1)
	}

	fmt.Printf("%v\n", versionResp)

	// Get list model of models on the server.
	modelResp, err := client.ListModels(ctx, &transcribe.ListModelsRequest{})
	if err != nil {
		fmt.Printf("failed to get model list: %v\n", err)
		os.Exit(1)
	}

	for _, m := range modelResp.Models {
		fmt.Println(m)
	}
	fmt.Println()

	// Selecting the first model.
	cfg := &transcribe.RecognitionConfig{
		ModelId: modelResp.Models[0].Id,
	}

	// Opening audio file.
	audio, err := os.Open("test.wav")
	if err != nil {
		fmt.Printf("failed to open audio file: %v\n", err)
		os.Exit(1)
	}

	defer audio.Close()

	// Starting recognition.
	err = StreamingRecognize(ctx, client, cfg, audio, printTranscript)
	if err != nil {
		fmt.Printf("failed to run streaming recognition: %v\n", err)
		os.Exit(1)
	}
}

// StreamingRecognize wraps the bidirectional streaming API for performing
// speech recognition. It sets up recognition using the given cfg.
//
// Data is read from the given audio reader into a buffer and streamed to cubic
// server. The default buffer size may be overridden using Options when creating
// the Client.
//
// As results are received from Transcribe server, they will be sent to the
// provided handlerFunc.
//
// If any error occurs while reading the audio or sending it to the server, this
// method will immediately exit, returning that error.
//
// This function returns only after all results have been passed to the
// resultHandler.
func StreamingRecognize(
	ctx context.Context,
	client transcribe.TranscribeServiceClient,
	cfg *transcribe.RecognitionConfig,
	audio io.Reader,
	handlerFunc func(*transcribe.StreamingRecognizeResponse),
) error {
	const (
		streamingBufSize = 1024
	)

	// Creating stream.
	stream, err := client.StreamingRecognize(ctx)
	if err != nil {
		return err
	}

	// There are two concurrent processes going on. We will create a new
	// goroutine to read audio and stream it to the server.  This goroutine
	// will receive results from the stream.  Errors could occur in both
	// go routines.  We therefore setup a channel, errCh, to hold these
	// errors. Both go routines are designed to send up to one error, and
	// return immediately. Therefore we use a buffered channel with a
	// capacity of two.
	errCh := make(chan error, 2)

	// start streaming audio in a separate goroutine
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		if err := sendAudio(stream, cfg, audio, streamingBufSize); err != nil && !errors.Is(err, io.EOF) {
			// if sendAudio encountered io.EOF, it's only a
			// notification that the stream has closed.  The actual
			// status will be obtained in a subsequent Recv call, in
			// the other goroutine below.  We therefore only forward
			// non-EOF errors.
			errCh <- err
		}

		wg.Done()
	}()

	// Receive results from the stream.
	for {
		in, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			break
		}

		if err != nil {
			errCh <- err
			break
		}

		handlerFunc(in)
	}

	wg.Wait()

	select {
	case err := <-errCh:
		// There may be more than one error in the channel, but it is
		// very likely they are related (e.g. connection reset causing
		// both the send and recv to fail) and we therefore return the
		// first error and discard the other.
		return err
	default:
		return nil
	}
}

// printTranscript is a callback function given to StreamingRecognize method to
// print results that are returned though the gRPC stream.
func printTranscript(resp *transcribe.StreamingRecognizeResponse) {
	if resp.Error != nil {
		fmt.Printf("\n[ERROR] server returned an error: %v\n", resp.Error)
		return
	}

	hyp := resp.Result.Alternatives[0]
	startTime := float32(hyp.StartTimeMs) / 1000.0
	endTime := startTime + float32(hyp.DurationMs)/1000.0

	if resp.Result.IsPartial {
		fmt.Printf("\r[%0.2f:%0.2f] %s", startTime, endTime, hyp.TranscriptFormatted)
	} else {
		fmt.Printf("[%0.2f:%0.2f] %s\n\n", startTime, endTime, hyp.TranscriptFormatted)
	}
}

// sendAudio sends audio to a stream.
func sendAudio(
	stream transcribe.TranscribeService_StreamingRecognizeClient,
	cfg *transcribe.RecognitionConfig,
	audio io.Reader,
	bufSize uint32,
) error {
	// The first message needs to be a config message, and all subsequent
	// messages must be audio messages.

	// Send the recognition config
	if err := stream.Send(&transcribe.StreamingRecognizeRequest{
		Request: &transcribe.StreamingRecognizeRequest_Config{Config: cfg},
	}); err != nil {
		// if this failed, we don't need to CloseSend
		return err
	}

	// Stream the audio.
	buf := make([]byte, bufSize)
	for {
		n, err := audio.Read(buf)
		if n > 0 {
			if err2 := stream.Send(&transcribe.StreamingRecognizeRequest{
				Request: &transcribe.StreamingRecognizeRequest_Audio{
					Audio: &transcribe.RecognitionAudio{Data: buf[:n]},
				},
			}); err2 != nil {
				// if we couldn't Send, the stream has
				// encountered an error and we don't need to
				// CloseSend.
				return err2
			}
		}

		if err != nil {
			// err could be io.EOF, or some other error reading from
			// audio.  In any case, we need to CloseSend, send the
			// appropriate error to errCh and return from the function
			if err2 := stream.CloseSend(); err2 != nil {
				return err2
			}
			if err != io.EOF {
				return err
			}
			return nil

		}
	}
}

Streaming from microphone

  • Streaming audio from microphone input basically requires a reader interface that can provided audio samples recorded from a microphone; typically this requires interaction with system libraries. Another option is to use an external command line tool like sox to record and pipe audio into the client.

  • The examples below use the latter approach by using the rec command provided with sox to record and stream the audio.

#!/usr/bin/env python3

# This example assumes sox is installed on the system and is available
# in the system's PATH variable. Instead of opening a regular file from
# disk, we open a subprocess that executes sox's rec command to record
# audio from the system's default microphone.

import grpc
import cobaltspeech.transcribe.v5.transcribe_pb2_grpc as stub
import cobaltspeech.transcribe.v5.transcribe_pb2 as transcribe
import subprocess

serverAddress = "localhost:2727"

# Using a channel without TLS enabled.
channel = grpc.insecure_channel(serverAddress)
client = stub.TranscribeServiceStub(channel)

# Get server version.
versionResp = client.Version(transcribe.VersionRequest())
print(versionResp)

# Get list of models on the server.
modelResp = client.ListModels(transcribe.ListModelsRequest())
for model in modelResp.models:
    print(model)

# Select a model ID from the list above. Going with the first model
# in this example.
m = modelResp.models[0]
modelID = m.id

# Setting audio format to be raw 16-bit signed little endian audio samples
# recorded at the sample rate expected by the model.
cfg = transcribe.RecognitionConfig(
    model_id=modelID,
	audio_format_raw=transcribe.AudioFormatRAW(
      encoding="AUDIO_ENCODING_SIGNED",
      bit_depth=16,
      byte_order="BYTE_ORDER_LITTLE_ENDIAN",
      sample_rate=m.attributes.sample_rate,
      channels=1,
    )
)

# Open microphone stream using sox's rec command and record
# audio using the config specified above.
cmd = f"rec --no-show-progress -t raw -r {m.attributes.sample_rate} -e signed -b 16 -L -c 1 -"
mic = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
audio = mic.stdout

try:
  _ = audio.read(1024) # Trying to read some bytes as sanity check.
except Exception as err:
  	print(f"[ERROR] failed to read audio from mic stream: {err}")

print("\n[INFO] recording from microphone ... Press ctrl + c to exit\n")

# The first request to the server should only contain the
# recognition configuration. Subsequent requests should contain
# audio bytes. We can write a simple generator to do this.
def stream(cfg, audio, bufferSize=1024):
    yield transcribe.StreamingRecognizeRequest(config=cfg)
    
    data = audio.read(bufferSize)
    while len(data) > 0:
        yield transcribe.StreamingRecognizeRequest(
          audio=transcribe.RecognitionAudio(data=data),
        )
        data = audio.read(bufferSize)

# We also define a callback function to execute for each response.
# The example below just prints the formatted transcript to stdout.
def processResponse(resp):
    result = resp.result
    hyp = result.alternatives[0]                    # 1-best hypothesis.
    transcript = hyp.transcript_formatted           # Formatted transcript.
    start = hyp.start_time_ms / 1000.0              # Converting to seconds.
    end = start + hyp.duration_ms / 1000.0          # Converting to seconds.
    newLine = "\r" if result.is_partial else "\n\n" # Will not move to new line for partial results.
    print(f"[{start:0.2f}:{end:0.2f}] {transcript}", end=newLine)

# Streaming requests to the server.
try:
  for resp in client.StreamingRecognize(stream(cfg, audio)):
      processResponse(resp)
except KeyboardInterrupt:
	# Stop streaming when ctrl + c pressed.
	pass
except Exception as err:
	print(f"[ERROR] failed to stream audio: {err}")

audio.close()
mic.kill()
package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"os/exec"
	"os/signal"
	"strings"
	"sync"
	"syscall"

	"golang.org/x/sync/errgroup"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	transcribe "github.com/cobaltspeech/go-genproto/cobaltspeech/transcribe/v5"
)

func main() {
	const (
		serverAddress = "localhost:2727"
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()), // Using a channel without TLS enabled.
		grpc.WithBlock(),
		grpc.WithReturnConnectionError(),
		grpc.FailOnNonTempDialError(true),
	}

	conn, err := grpc.DialContext(ctx, serverAddress, opts...)
	if err != nil {
		fmt.Printf("failed to dial gRPC connection: %v\n", err)
		os.Exit(1)
	}

	client := transcribe.NewTranscribeServiceClient(conn)

	// Get server version.
	versionResp, err := client.Version(ctx, &transcribe.VersionRequest{})
	if err != nil {
		fmt.Printf("failed to get server version: %v\n", err)
		os.Exit(1)
	}

	fmt.Printf("%v\n", versionResp)

	// Get list model of models on the server.
	modelResp, err := client.ListModels(ctx, &transcribe.ListModelsRequest{})
	if err != nil {
		fmt.Printf("failed to get model list: %v\n", err)
		os.Exit(1)
	}

	for _, m := range modelResp.Models {
		fmt.Println(m)
	}
	fmt.Println()

	// Selecting first model.
	m := modelResp.Models[0]

	// Setting audio format to be raw 16-bit signed little endian audio samples
	// recorded at the sample rate expected by the model.
	cfg := &transcribe.RecognitionConfig{
		ModelId: m.Id,
		AudioFormat: &transcribe.RecognitionConfig_AudioFormatRaw{
			AudioFormatRaw: &transcribe.AudioFormatRAW{
				Encoding:   transcribe.AudioEncoding_AUDIO_ENCODING_SIGNED,
				SampleRate: m.Attributes.SampleRate,
				BitDepth:   16,
				ByteOrder:  transcribe.ByteOrder_BYTE_ORDER_LITTLE_ENDIAN,
				Channels:   1,
			},
		},
	}

	// Open microphone stream using sox's rec command and record
	// audio using the config specified above.
	args := fmt.Sprintf("--no-show-progress -t raw -r %d -e signed -b 16 -L -c 1 -", m.Attributes.SampleRate)
	cmd := exec.CommandContext(ctx, "rec", strings.Fields(args)...)

	audio, err := cmd.StdoutPipe()
	if err != nil {
		fmt.Printf("failed to open microphone stream: %v\n", err)
		os.Exit(1)
	}

	// Starting routines to record from microphone and stream to server
	// using an errgroup.Group that returns if either one encounters an error.
	eg, ctx := errgroup.WithContext(ctx)

	eg.Go(func() error {
		fmt.Printf("\n[INFO] recording from microphone ... Press ctrl + c to exit\n")

		if err := cmd.Run(); err != nil {
			return fmt.Errorf("record from microphone: %w", err)
		}

		return nil
	})

	eg.Go(func() error { return StreamingRecognize(ctx, client, cfg, audio, printTranscript) })

	// Also using a routine to monitor for interrupts.
	eg.Go(func() error {
		const maxInterrupts = 10
		interrupt := make(chan os.Signal, maxInterrupts)
		signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

		<-interrupt
		cancel()

		return ctx.Err()
	})

	if err := eg.Wait(); err != nil && !errors.Is(err, ctx.Err()) {
		fmt.Printf("failed to run streaming recognition: %v\n", err)
	}
}

// StreamingRecognize wraps the bidirectional streaming API for performing
// speech recognition. It sets up recognition using the given cfg.
//
// Data is read from the given audio reader into a buffer and streamed to cubic
// server. The default buffer size may be overridden using Options when creating
// the Client.
//
// As results are received from Transcribe server, they will be sent to the
// provided handlerFunc.
//
// If any error occurs while reading the audio or sending it to the server, this
// method will immediately exit, returning that error.
//
// This function returns only after all results have been passed to the
// resultHandler.
func StreamingRecognize(
	ctx context.Context,
	client transcribe.TranscribeServiceClient,
	cfg *transcribe.RecognitionConfig,
	audio io.Reader,
	handlerFunc func(*transcribe.StreamingRecognizeResponse),
) error {
	const (
		streamingBufSize = 1024
	)

	// Creating stream.
	stream, err := client.StreamingRecognize(ctx)
	if err != nil {
		return err
	}

	// There are two concurrent processes going on. We will create a new
	// goroutine to read audio and stream it to the server.  This goroutine
	// will receive results from the stream.  Errors could occur in both
	// go routines.  We therefore setup a channel, errCh, to hold these
	// errors. Both go routines are designed to send up to one error, and
	// return immediately. Therefore we use a buffered channel with a
	// capacity of two.
	errCh := make(chan error, 2)

	// start streaming audio in a separate goroutine
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		if err := sendAudio(stream, cfg, audio, streamingBufSize); err != nil && !errors.Is(err, io.EOF) {
			// if sendAudio encountered io.EOF, it's only a
			// notification that the stream has closed.  The actual
			// status will be obtained in a subsequent Recv call, in
			// the other goroutine below.  We therefore only forward
			// non-EOF errors.
			errCh <- err
		}

		wg.Done()
	}()

	// Receive results from the stream.
	for {
		in, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			break
		}

		if err != nil {
			errCh <- err
			break
		}

		handlerFunc(in)
	}

	wg.Wait()

	select {
	case err := <-errCh:
		// There may be more than one error in the channel, but it is
		// very likely they are related (e.g. connection reset causing
		// both the send and recv to fail) and we therefore return the
		// first error and discard the other.
		return err
	default:
		return nil
	}
}

// printTranscript is a callback function given to StreamingRecognize method to
// print results that are returned though the gRPC stream.
func printTranscript(resp *transcribe.StreamingRecognizeResponse) {
	if resp.Error != nil {
		fmt.Printf("\n[ERROR] server returned an error: %v\n", resp.Error)
		return
	}

	hyp := resp.Result.Alternatives[0]
	startTime := float32(hyp.StartTimeMs) / 1000.0
	endTime := startTime + float32(hyp.DurationMs)/1000.0

	if resp.Result.IsPartial {
		fmt.Printf("\r[%0.2f:%0.2f] %s", startTime, endTime, hyp.TranscriptFormatted)
	} else {
		fmt.Printf("[%0.2f:%0.2f] %s\n\n", startTime, endTime, hyp.TranscriptFormatted)
	}
}

// sendAudio sends audio to a stream.
func sendAudio(
	stream transcribe.TranscribeService_StreamingRecognizeClient,
	cfg *transcribe.RecognitionConfig,
	audio io.Reader,
	bufSize uint32,
) error {
	// The first message needs to be a config message, and all subsequent
	// messages must be audio messages.

	// Send the recognition config
	if err := stream.Send(&transcribe.StreamingRecognizeRequest{
		Request: &transcribe.StreamingRecognizeRequest_Config{Config: cfg},
	}); err != nil {
		// if this failed, we don't need to CloseSend
		return err
	}

	// Stream the audio.
	buf := make([]byte, bufSize)
	for {
		n, err := audio.Read(buf)
		if n > 0 {
			if err2 := stream.Send(&transcribe.StreamingRecognizeRequest{
				Request: &transcribe.StreamingRecognizeRequest_Audio{
					Audio: &transcribe.RecognitionAudio{Data: buf[:n]},
				},
			}); err2 != nil {
				// if we couldn't Send, the stream has
				// encountered an error and we don't need to
				// CloseSend.
				return err2
			}
		}

		if err != nil {
			// err could be io.EOF, or some other error reading from
			// audio.  In any case, we need to CloseSend, send the
			// appropriate error to errCh and return from the function
			if err2 := stream.CloseSend(); err2 != nil {
				return err2
			}
			if err != io.EOF {
				return err
			}
			return nil

		}
	}
}