- The following example shows how to transcribe a audio stream using Transcribe’s
StreamingRecognizerequest. The stream can come from a file on disk or be directly from a microphone in real time.
Streaming from an audio file
-
We support several headered file formats including WAV, MP3, FLAC etc. For more details, please see the protocol buffer specification here.
-
The examples below use a WAV file as input to the streaming recognition. We will query the server for available models and use the first model to transcribe the speech.
import grpc
import cobaltspeech.transcribe.v5.transcribe_pb2_grpc as stub
import cobaltspeech.transcribe.v5.transcribe_pb2 as transcribe
serverAddress = "localhost:2727"
# Using a channel without TLS enabled.
channel = grpc.insecure_channel(serverAddress)
client = stub.TranscribeServiceStub(channel)
# Get server version.
versionResp = client.Version(transcribe.VersionRequest())
print(versionResp)
# Get list of models on the server.
modelResp = client.ListModels(transcribe.ListModelsRequest())
for model in modelResp.models:
print(model)
# Select a model ID from the list above. Going with the first model
# in this example.
modelID = modelResp.models[0].id
# Set the recognition config. We don't set the audio format and let the
# server auto-detect the format from the file header.
cfg = transcribe.RecognitionConfig(
model_id=modelID,
)
# Open audio file.
audio = open("test.wav", "rb")
# The first request to the server should only contain the
# recognition configuration. Subsequent requests should contain
# audio bytes. We can write a simple generator to do this.
def stream(cfg, audio, bufferSize=1024):
yield transcribe.StreamingRecognizeRequest(config=cfg)
data = audio.read(bufferSize)
while len(data) > 0:
yield transcribe.StreamingRecognizeRequest(
audio=transcribe.RecognitionAudio(data=data),
)
data = audio.read(bufferSize)
# We also define a callback function to execute for each response.
# The example below just prints the formatted transcript to stdout.
def processResponse(resp):
result = resp.result
hyp = result.alternatives[0] # 1-best hypothesis.
transcript = hyp.transcript_formatted # Formatted transcript.
start = hyp.start_time_ms / 1000.0 # Converting to seconds.
end = start + hyp.duration_ms / 1000.0 # Converting to seconds.
newLine = "\r" if result.is_partial else "\n\n" # Will not move to new line for partial results.
print(f"[{start:0.2f}:{end:0.2f}] {transcript}", end=newLine)
# Streaming requests to the server.
for resp in client.StreamingRecognize(stream(cfg, audio)):
processResponse(resp)package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
transcribe "github.com/cobaltspeech/go-genproto/cobaltspeech/transcribe/v5"
)
func main() {
const (
serverAddress = "localhost:2727"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), // Using a channel without TLS enabled.
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
grpc.FailOnNonTempDialError(true),
}
conn, err := grpc.DialContext(ctx, serverAddress, opts...)
if err != nil {
fmt.Printf("failed to dial gRPC connection: %v\n", err)
os.Exit(1)
}
client := transcribe.NewTranscribeServiceClient(conn)
// Get server version.
versionResp, err := client.Version(ctx, &transcribe.VersionRequest{})
if err != nil {
fmt.Printf("failed to get server version: %v\n", err)
os.Exit(1)
}
fmt.Printf("%v\n", versionResp)
// Get list model of models on the server.
modelResp, err := client.ListModels(ctx, &transcribe.ListModelsRequest{})
if err != nil {
fmt.Printf("failed to get model list: %v\n", err)
os.Exit(1)
}
for _, m := range modelResp.Models {
fmt.Println(m)
}
fmt.Println()
// Selecting the first model.
cfg := &transcribe.RecognitionConfig{
ModelId: modelResp.Models[0].Id,
}
// Opening audio file.
audio, err := os.Open("test.wav")
if err != nil {
fmt.Printf("failed to open audio file: %v\n", err)
os.Exit(1)
}
defer audio.Close()
// Starting recognition.
err = StreamingRecognize(ctx, client, cfg, audio, printTranscript)
if err != nil {
fmt.Printf("failed to run streaming recognition: %v\n", err)
os.Exit(1)
}
}
// StreamingRecognize wraps the bidirectional streaming API for performing
// speech recognition. It sets up recognition using the given cfg.
//
// Data is read from the given audio reader into a buffer and streamed to cubic
// server. The default buffer size may be overridden using Options when creating
// the Client.
//
// As results are received from Transcribe server, they will be sent to the
// provided handlerFunc.
//
// If any error occurs while reading the audio or sending it to the server, this
// method will immediately exit, returning that error.
//
// This function returns only after all results have been passed to the
// resultHandler.
func StreamingRecognize(
ctx context.Context,
client transcribe.TranscribeServiceClient,
cfg *transcribe.RecognitionConfig,
audio io.Reader,
handlerFunc func(*transcribe.StreamingRecognizeResponse),
) error {
const (
streamingBufSize = 1024
)
// Creating stream.
stream, err := client.StreamingRecognize(ctx)
if err != nil {
return err
}
// There are two concurrent processes going on. We will create a new
// goroutine to read audio and stream it to the server. This goroutine
// will receive results from the stream. Errors could occur in both
// go routines. We therefore setup a channel, errCh, to hold these
// errors. Both go routines are designed to send up to one error, and
// return immediately. Therefore we use a buffered channel with a
// capacity of two.
errCh := make(chan error, 2)
// start streaming audio in a separate goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
if err := sendAudio(stream, cfg, audio, streamingBufSize); err != nil && !errors.Is(err, io.EOF) {
// if sendAudio encountered io.EOF, it's only a
// notification that the stream has closed. The actual
// status will be obtained in a subsequent Recv call, in
// the other goroutine below. We therefore only forward
// non-EOF errors.
errCh <- err
}
wg.Done()
}()
// Receive results from the stream.
for {
in, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
errCh <- err
break
}
handlerFunc(in)
}
wg.Wait()
select {
case err := <-errCh:
// There may be more than one error in the channel, but it is
// very likely they are related (e.g. connection reset causing
// both the send and recv to fail) and we therefore return the
// first error and discard the other.
return err
default:
return nil
}
}
// printTranscript is a callback function given to StreamingRecognize method to
// print results that are returned though the gRPC stream.
func printTranscript(resp *transcribe.StreamingRecognizeResponse) {
if resp.Error != nil {
fmt.Printf("\n[ERROR] server returned an error: %v\n", resp.Error)
return
}
hyp := resp.Result.Alternatives[0]
startTime := float32(hyp.StartTimeMs) / 1000.0
endTime := startTime + float32(hyp.DurationMs)/1000.0
if resp.Result.IsPartial {
fmt.Printf("\r[%0.2f:%0.2f] %s", startTime, endTime, hyp.TranscriptFormatted)
} else {
fmt.Printf("[%0.2f:%0.2f] %s\n\n", startTime, endTime, hyp.TranscriptFormatted)
}
}
// sendAudio sends audio to a stream.
func sendAudio(
stream transcribe.TranscribeService_StreamingRecognizeClient,
cfg *transcribe.RecognitionConfig,
audio io.Reader,
bufSize uint32,
) error {
// The first message needs to be a config message, and all subsequent
// messages must be audio messages.
// Send the recognition config
if err := stream.Send(&transcribe.StreamingRecognizeRequest{
Request: &transcribe.StreamingRecognizeRequest_Config{Config: cfg},
}); err != nil {
// if this failed, we don't need to CloseSend
return err
}
// Stream the audio.
buf := make([]byte, bufSize)
for {
n, err := audio.Read(buf)
if n > 0 {
if err2 := stream.Send(&transcribe.StreamingRecognizeRequest{
Request: &transcribe.StreamingRecognizeRequest_Audio{
Audio: &transcribe.RecognitionAudio{Data: buf[:n]},
},
}); err2 != nil {
// if we couldn't Send, the stream has
// encountered an error and we don't need to
// CloseSend.
return err2
}
}
if err != nil {
// err could be io.EOF, or some other error reading from
// audio. In any case, we need to CloseSend, send the
// appropriate error to errCh and return from the function
if err2 := stream.CloseSend(); err2 != nil {
return err2
}
if err != io.EOF {
return err
}
return nil
}
}
}Streaming from microphone
-
Streaming audio from microphone input basically requires a reader interface that can provided audio samples recorded from a microphone; typically this requires interaction with system libraries. Another option is to use an external command line tool like
soxto record and pipe audio into the client. -
The examples below use the latter approach by using the
reccommand provided withsoxto record and stream the audio.
#!/usr/bin/env python3
# This example assumes sox is installed on the system and is available
# in the system's PATH variable. Instead of opening a regular file from
# disk, we open a subprocess that executes sox's rec command to record
# audio from the system's default microphone.
import grpc
import cobaltspeech.transcribe.v5.transcribe_pb2_grpc as stub
import cobaltspeech.transcribe.v5.transcribe_pb2 as transcribe
import subprocess
serverAddress = "localhost:2727"
# Using a channel without TLS enabled.
channel = grpc.insecure_channel(serverAddress)
client = stub.TranscribeServiceStub(channel)
# Get server version.
versionResp = client.Version(transcribe.VersionRequest())
print(versionResp)
# Get list of models on the server.
modelResp = client.ListModels(transcribe.ListModelsRequest())
for model in modelResp.models:
print(model)
# Select a model ID from the list above. Going with the first model
# in this example.
m = modelResp.models[0]
modelID = m.id
# Setting audio format to be raw 16-bit signed little endian audio samples
# recorded at the sample rate expected by the model.
cfg = transcribe.RecognitionConfig(
model_id=modelID,
audio_format_raw=transcribe.AudioFormatRAW(
encoding="AUDIO_ENCODING_SIGNED",
bit_depth=16,
byte_order="BYTE_ORDER_LITTLE_ENDIAN",
sample_rate=m.attributes.sample_rate,
channels=1,
)
)
# Open microphone stream using sox's rec command and record
# audio using the config specified above.
cmd = f"rec --no-show-progress -t raw -r {m.attributes.sample_rate} -e signed -b 16 -L -c 1 -"
mic = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
audio = mic.stdout
try:
_ = audio.read(1024) # Trying to read some bytes as sanity check.
except Exception as err:
print(f"[ERROR] failed to read audio from mic stream: {err}")
print("\n[INFO] recording from microphone ... Press ctrl + c to exit\n")
# The first request to the server should only contain the
# recognition configuration. Subsequent requests should contain
# audio bytes. We can write a simple generator to do this.
def stream(cfg, audio, bufferSize=1024):
yield transcribe.StreamingRecognizeRequest(config=cfg)
data = audio.read(bufferSize)
while len(data) > 0:
yield transcribe.StreamingRecognizeRequest(
audio=transcribe.RecognitionAudio(data=data),
)
data = audio.read(bufferSize)
# We also define a callback function to execute for each response.
# The example below just prints the formatted transcript to stdout.
def processResponse(resp):
result = resp.result
hyp = result.alternatives[0] # 1-best hypothesis.
transcript = hyp.transcript_formatted # Formatted transcript.
start = hyp.start_time_ms / 1000.0 # Converting to seconds.
end = start + hyp.duration_ms / 1000.0 # Converting to seconds.
newLine = "\r" if result.is_partial else "\n\n" # Will not move to new line for partial results.
print(f"[{start:0.2f}:{end:0.2f}] {transcript}", end=newLine)
# Streaming requests to the server.
try:
for resp in client.StreamingRecognize(stream(cfg, audio)):
processResponse(resp)
except KeyboardInterrupt:
# Stop streaming when ctrl + c pressed.
pass
except Exception as err:
print(f"[ERROR] failed to stream audio: {err}")
audio.close()
mic.kill()package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
transcribe "github.com/cobaltspeech/go-genproto/cobaltspeech/transcribe/v5"
)
func main() {
const (
serverAddress = "localhost:2727"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), // Using a channel without TLS enabled.
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
grpc.FailOnNonTempDialError(true),
}
conn, err := grpc.DialContext(ctx, serverAddress, opts...)
if err != nil {
fmt.Printf("failed to dial gRPC connection: %v\n", err)
os.Exit(1)
}
client := transcribe.NewTranscribeServiceClient(conn)
// Get server version.
versionResp, err := client.Version(ctx, &transcribe.VersionRequest{})
if err != nil {
fmt.Printf("failed to get server version: %v\n", err)
os.Exit(1)
}
fmt.Printf("%v\n", versionResp)
// Get list model of models on the server.
modelResp, err := client.ListModels(ctx, &transcribe.ListModelsRequest{})
if err != nil {
fmt.Printf("failed to get model list: %v\n", err)
os.Exit(1)
}
for _, m := range modelResp.Models {
fmt.Println(m)
}
fmt.Println()
// Selecting first model.
m := modelResp.Models[0]
// Setting audio format to be raw 16-bit signed little endian audio samples
// recorded at the sample rate expected by the model.
cfg := &transcribe.RecognitionConfig{
ModelId: m.Id,
AudioFormat: &transcribe.RecognitionConfig_AudioFormatRaw{
AudioFormatRaw: &transcribe.AudioFormatRAW{
Encoding: transcribe.AudioEncoding_AUDIO_ENCODING_SIGNED,
SampleRate: m.Attributes.SampleRate,
BitDepth: 16,
ByteOrder: transcribe.ByteOrder_BYTE_ORDER_LITTLE_ENDIAN,
Channels: 1,
},
},
}
// Open microphone stream using sox's rec command and record
// audio using the config specified above.
args := fmt.Sprintf("--no-show-progress -t raw -r %d -e signed -b 16 -L -c 1 -", m.Attributes.SampleRate)
cmd := exec.CommandContext(ctx, "rec", strings.Fields(args)...)
audio, err := cmd.StdoutPipe()
if err != nil {
fmt.Printf("failed to open microphone stream: %v\n", err)
os.Exit(1)
}
// Starting routines to record from microphone and stream to server
// using an errgroup.Group that returns if either one encounters an error.
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
fmt.Printf("\n[INFO] recording from microphone ... Press ctrl + c to exit\n")
if err := cmd.Run(); err != nil {
return fmt.Errorf("record from microphone: %w", err)
}
return nil
})
eg.Go(func() error { return StreamingRecognize(ctx, client, cfg, audio, printTranscript) })
// Also using a routine to monitor for interrupts.
eg.Go(func() error {
const maxInterrupts = 10
interrupt := make(chan os.Signal, maxInterrupts)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
<-interrupt
cancel()
return ctx.Err()
})
if err := eg.Wait(); err != nil && !errors.Is(err, ctx.Err()) {
fmt.Printf("failed to run streaming recognition: %v\n", err)
}
}
// StreamingRecognize wraps the bidirectional streaming API for performing
// speech recognition. It sets up recognition using the given cfg.
//
// Data is read from the given audio reader into a buffer and streamed to cubic
// server. The default buffer size may be overridden using Options when creating
// the Client.
//
// As results are received from Transcribe server, they will be sent to the
// provided handlerFunc.
//
// If any error occurs while reading the audio or sending it to the server, this
// method will immediately exit, returning that error.
//
// This function returns only after all results have been passed to the
// resultHandler.
func StreamingRecognize(
ctx context.Context,
client transcribe.TranscribeServiceClient,
cfg *transcribe.RecognitionConfig,
audio io.Reader,
handlerFunc func(*transcribe.StreamingRecognizeResponse),
) error {
const (
streamingBufSize = 1024
)
// Creating stream.
stream, err := client.StreamingRecognize(ctx)
if err != nil {
return err
}
// There are two concurrent processes going on. We will create a new
// goroutine to read audio and stream it to the server. This goroutine
// will receive results from the stream. Errors could occur in both
// go routines. We therefore setup a channel, errCh, to hold these
// errors. Both go routines are designed to send up to one error, and
// return immediately. Therefore we use a buffered channel with a
// capacity of two.
errCh := make(chan error, 2)
// start streaming audio in a separate goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
if err := sendAudio(stream, cfg, audio, streamingBufSize); err != nil && !errors.Is(err, io.EOF) {
// if sendAudio encountered io.EOF, it's only a
// notification that the stream has closed. The actual
// status will be obtained in a subsequent Recv call, in
// the other goroutine below. We therefore only forward
// non-EOF errors.
errCh <- err
}
wg.Done()
}()
// Receive results from the stream.
for {
in, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
errCh <- err
break
}
handlerFunc(in)
}
wg.Wait()
select {
case err := <-errCh:
// There may be more than one error in the channel, but it is
// very likely they are related (e.g. connection reset causing
// both the send and recv to fail) and we therefore return the
// first error and discard the other.
return err
default:
return nil
}
}
// printTranscript is a callback function given to StreamingRecognize method to
// print results that are returned though the gRPC stream.
func printTranscript(resp *transcribe.StreamingRecognizeResponse) {
if resp.Error != nil {
fmt.Printf("\n[ERROR] server returned an error: %v\n", resp.Error)
return
}
hyp := resp.Result.Alternatives[0]
startTime := float32(hyp.StartTimeMs) / 1000.0
endTime := startTime + float32(hyp.DurationMs)/1000.0
if resp.Result.IsPartial {
fmt.Printf("\r[%0.2f:%0.2f] %s", startTime, endTime, hyp.TranscriptFormatted)
} else {
fmt.Printf("[%0.2f:%0.2f] %s\n\n", startTime, endTime, hyp.TranscriptFormatted)
}
}
// sendAudio sends audio to a stream.
func sendAudio(
stream transcribe.TranscribeService_StreamingRecognizeClient,
cfg *transcribe.RecognitionConfig,
audio io.Reader,
bufSize uint32,
) error {
// The first message needs to be a config message, and all subsequent
// messages must be audio messages.
// Send the recognition config
if err := stream.Send(&transcribe.StreamingRecognizeRequest{
Request: &transcribe.StreamingRecognizeRequest_Config{Config: cfg},
}); err != nil {
// if this failed, we don't need to CloseSend
return err
}
// Stream the audio.
buf := make([]byte, bufSize)
for {
n, err := audio.Read(buf)
if n > 0 {
if err2 := stream.Send(&transcribe.StreamingRecognizeRequest{
Request: &transcribe.StreamingRecognizeRequest_Audio{
Audio: &transcribe.RecognitionAudio{Data: buf[:n]},
},
}); err2 != nil {
// if we couldn't Send, the stream has
// encountered an error and we don't need to
// CloseSend.
return err2
}
}
if err != nil {
// err could be io.EOF, or some other error reading from
// audio. In any case, we need to CloseSend, send the
// appropriate error to errCh and return from the function
if err2 := stream.CloseSend(); err2 != nil {
return err2
}
if err != io.EOF {
return err
}
return nil
}
}
}