Speech-to-Text with Whisper
Basic Transcription
Copy
from openai import OpenAI
from pathlib import Path
def transcribe_audio(audio_path: str) -> str:
"""Transcribe audio file using Whisper."""
client = OpenAI()
with open(audio_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
return transcript.text
def transcribe_with_timestamps(audio_path: str) -> dict:
"""Transcribe with word-level timestamps."""
client = OpenAI()
with open(audio_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["word", "segment"]
)
return {
"text": transcript.text,
"segments": transcript.segments,
"words": transcript.words
}
# Usage
result = transcribe_with_timestamps("meeting.mp3")
print(f"Full transcript: {result['text'][:200]}...")
print("\nSegments:")
for segment in result['segments'][:3]:
print(f" [{segment['start']:.1f}s - {segment['end']:.1f}s] {segment['text']}")
Multi-Language Support
Copy
from openai import OpenAI
class MultiLanguageTranscriber:
"""Transcribe audio in multiple languages."""
def __init__(self):
self.client = OpenAI()
def transcribe(
self,
audio_path: str,
language: str = None,
translate: bool = False
) -> dict:
"""Transcribe or translate audio."""
with open(audio_path, "rb") as audio_file:
if translate:
# Translate to English
result = self.client.audio.translations.create(
model="whisper-1",
file=audio_file
)
else:
# Transcribe in original language
kwargs = {"model": "whisper-1", "file": audio_file}
if language:
kwargs["language"] = language
result = self.client.audio.transcriptions.create(**kwargs)
return {"text": result.text, "translated": translate}
def detect_and_transcribe(self, audio_path: str) -> dict:
"""Detect language and transcribe."""
# First pass: get verbose response to detect language
with open(audio_path, "rb") as audio_file:
result = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json"
)
return {
"text": result.text,
"language": result.language,
"duration": result.duration
}
# Usage
transcriber = MultiLanguageTranscriber()
# Transcribe in specific language
result = transcriber.transcribe("spanish_audio.mp3", language="es")
print(f"Spanish: {result['text']}")
# Translate to English
result = transcriber.transcribe("french_audio.mp3", translate=True)
print(f"Translated: {result['text']}")
Audio Processing Pipeline
Copy
from openai import OpenAI
from pathlib import Path
from dataclasses import dataclass
import subprocess
import tempfile
import os
@dataclass
class AudioChunk:
"""A chunk of processed audio."""
path: str
start_time: float
duration: float
class AudioProcessor:
"""Process audio files for transcription."""
SUPPORTED_FORMATS = [".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"]
MAX_FILE_SIZE = 25 * 1024 * 1024 # 25 MB
def __init__(self):
self.client = OpenAI()
def validate_file(self, path: str) -> bool:
"""Check if file is valid for transcription."""
file_path = Path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {path}")
if file_path.suffix.lower() not in self.SUPPORTED_FORMATS:
raise ValueError(f"Unsupported format: {file_path.suffix}")
if file_path.stat().st_size > self.MAX_FILE_SIZE:
return False # Needs chunking
return True
def get_duration(self, path: str) -> float:
"""Get audio duration in seconds using ffprobe."""
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path
]
result = subprocess.run(cmd, capture_output=True, text=True)
return float(result.stdout.strip())
def split_audio(
self,
path: str,
chunk_duration: float = 600 # 10 minutes
) -> list[AudioChunk]:
"""Split audio into chunks."""
total_duration = self.get_duration(path)
chunks = []
temp_dir = tempfile.mkdtemp()
current_time = 0
chunk_num = 0
while current_time < total_duration:
chunk_path = os.path.join(temp_dir, f"chunk_{chunk_num}.mp3")
duration = min(chunk_duration, total_duration - current_time)
# Use ffmpeg to extract chunk
cmd = [
"ffmpeg", "-y",
"-i", path,
"-ss", str(current_time),
"-t", str(duration),
"-acodec", "libmp3lame",
"-q:a", "2",
chunk_path
]
subprocess.run(cmd, capture_output=True)
chunks.append(AudioChunk(
path=chunk_path,
start_time=current_time,
duration=duration
))
current_time += duration
chunk_num += 1
return chunks
def transcribe_long_audio(
self,
path: str,
chunk_duration: float = 600
) -> str:
"""Transcribe long audio files by chunking."""
if self.validate_file(path):
# File is small enough, transcribe directly
with open(path, "rb") as f:
result = self.client.audio.transcriptions.create(
model="whisper-1",
file=f
)
return result.text
# Split and transcribe chunks
chunks = self.split_audio(path, chunk_duration)
transcripts = []
for chunk in chunks:
with open(chunk.path, "rb") as f:
result = self.client.audio.transcriptions.create(
model="whisper-1",
file=f
)
transcripts.append(result.text)
# Clean up chunk file
os.remove(chunk.path)
return " ".join(transcripts)
# Usage
processor = AudioProcessor()
# Transcribe a long podcast episode
transcript = processor.transcribe_long_audio("podcast_episode.mp3")
print(f"Transcript ({len(transcript)} chars): {transcript[:500]}...")
Text-to-Speech
OpenAI TTS
Copy
from openai import OpenAI
from pathlib import Path
def text_to_speech(
text: str,
output_path: str,
voice: str = "alloy",
model: str = "tts-1"
) -> str:
"""Convert text to speech."""
client = OpenAI()
# Available voices: alloy, echo, fable, onyx, nova, shimmer
response = client.audio.speech.create(
model=model, # tts-1 (fast) or tts-1-hd (quality)
voice=voice,
input=text
)
response.stream_to_file(output_path)
return output_path
def text_to_speech_streaming(text: str, voice: str = "alloy"):
"""Stream TTS audio."""
client = OpenAI()
response = client.audio.speech.create(
model="tts-1",
voice=voice,
input=text,
response_format="opus" # Good for streaming
)
# Iterate over audio chunks
for chunk in response.iter_bytes():
yield chunk
# Usage
# Generate speech file
text_to_speech(
"Hello! This is a test of the text-to-speech system.",
"output.mp3",
voice="nova"
)
# Streaming usage
for audio_chunk in text_to_speech_streaming("Streaming audio test"):
# Process or play audio chunk
pass
Voice Selection System
Copy
from openai import OpenAI
from dataclasses import dataclass
from enum import Enum
class VoiceStyle(Enum):
PROFESSIONAL = "professional"
CASUAL = "casual"
ENERGETIC = "energetic"
CALM = "calm"
NARRATIVE = "narrative"
@dataclass
class VoiceProfile:
"""Profile for a TTS voice."""
name: str
openai_voice: str
style: VoiceStyle
description: str
class VoiceSelector:
"""Select appropriate voice for content."""
VOICES = [
VoiceProfile("Professional", "onyx", VoiceStyle.PROFESSIONAL,
"Deep, authoritative voice for business content"),
VoiceProfile("Friendly", "nova", VoiceStyle.CASUAL,
"Warm, approachable voice for casual content"),
VoiceProfile("Dynamic", "echo", VoiceStyle.ENERGETIC,
"Energetic voice for marketing and presentations"),
VoiceProfile("Soothing", "shimmer", VoiceStyle.CALM,
"Calm, gentle voice for meditation and relaxation"),
VoiceProfile("Storyteller", "fable", VoiceStyle.NARRATIVE,
"Expressive voice for stories and narratives"),
VoiceProfile("Neutral", "alloy", VoiceStyle.CASUAL,
"Balanced, versatile voice for general use"),
]
def __init__(self):
self.client = OpenAI()
def select_voice(
self,
content_type: str = None,
style: VoiceStyle = None
) -> VoiceProfile:
"""Select voice based on content or style."""
if style:
matches = [v for v in self.VOICES if v.style == style]
return matches[0] if matches else self.VOICES[0]
# Auto-detect based on content type
content_voice_map = {
"business": VoiceStyle.PROFESSIONAL,
"tutorial": VoiceStyle.CASUAL,
"marketing": VoiceStyle.ENERGETIC,
"meditation": VoiceStyle.CALM,
"story": VoiceStyle.NARRATIVE,
}
style = content_voice_map.get(content_type, VoiceStyle.CASUAL)
return self.select_voice(style=style)
def generate_speech(
self,
text: str,
output_path: str,
content_type: str = None,
style: VoiceStyle = None,
hd_quality: bool = False
) -> dict:
"""Generate speech with auto-selected voice."""
voice = self.select_voice(content_type, style)
response = self.client.audio.speech.create(
model="tts-1-hd" if hd_quality else "tts-1",
voice=voice.openai_voice,
input=text
)
response.stream_to_file(output_path)
return {
"path": output_path,
"voice": voice.name,
"openai_voice": voice.openai_voice,
"style": voice.style.value
}
# Usage
selector = VoiceSelector()
# Generate business presentation audio
result = selector.generate_speech(
"Welcome to our quarterly earnings report...",
"presentation.mp3",
content_type="business",
hd_quality=True
)
print(f"Generated with voice: {result['voice']}")
# Generate meditation audio
result = selector.generate_speech(
"Take a deep breath and relax...",
"meditation.mp3",
style=VoiceStyle.CALM
)
print(f"Generated with voice: {result['voice']}")
Real-Time Transcription
WebSocket-Based Transcription
Copy
import asyncio
import websockets
import json
from openai import OpenAI
import tempfile
import wave
import struct
class RealtimeTranscriber:
"""Real-time audio transcription."""
def __init__(
self,
chunk_duration: float = 5.0,
sample_rate: int = 16000
):
self.client = OpenAI()
self.chunk_duration = chunk_duration
self.sample_rate = sample_rate
self.buffer = []
async def process_audio_stream(
self,
audio_generator,
callback
):
"""Process streaming audio and transcribe."""
samples_per_chunk = int(self.sample_rate * self.chunk_duration)
async for audio_data in audio_generator:
self.buffer.extend(audio_data)
while len(self.buffer) >= samples_per_chunk:
chunk = self.buffer[:samples_per_chunk]
self.buffer = self.buffer[samples_per_chunk:]
# Transcribe chunk
transcript = await self._transcribe_chunk(chunk)
if transcript:
await callback(transcript)
async def _transcribe_chunk(self, audio_samples: list) -> str:
"""Transcribe an audio chunk."""
# Save to temporary WAV file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
with wave.open(f.name, "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2) # 16-bit
wav.setframerate(self.sample_rate)
# Convert float samples to 16-bit integers
int_samples = [int(s * 32767) for s in audio_samples]
wav.writeframes(struct.pack(f"{len(int_samples)}h", *int_samples))
# Transcribe
with open(f.name, "rb") as audio_file:
result = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
return result.text
class VoiceAssistant:
"""Voice-based assistant with real-time transcription."""
def __init__(self):
self.client = OpenAI()
self.transcriber = RealtimeTranscriber()
self.conversation_history = []
def process_voice_input(self, audio_path: str) -> dict:
"""Process voice input and generate response."""
# Transcribe
with open(audio_path, "rb") as f:
transcription = self.client.audio.transcriptions.create(
model="whisper-1",
file=f
)
user_text = transcription.text
# Add to conversation
self.conversation_history.append({
"role": "user",
"content": user_text
})
# Generate response
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful voice assistant. Keep responses concise and conversational."},
*self.conversation_history
]
)
assistant_text = response.choices[0].message.content
self.conversation_history.append({
"role": "assistant",
"content": assistant_text
})
# Generate speech response
audio_response = self.client.audio.speech.create(
model="tts-1",
voice="nova",
input=assistant_text
)
# Save response audio
response_path = "response.mp3"
audio_response.stream_to_file(response_path)
return {
"user_text": user_text,
"assistant_text": assistant_text,
"audio_path": response_path
}
# Usage
assistant = VoiceAssistant()
# Process voice input
result = assistant.process_voice_input("user_question.mp3")
print(f"User said: {result['user_text']}")
print(f"Assistant: {result['assistant_text']}")
print(f"Audio response: {result['audio_path']}")
Audio Analysis
Speaker Diarization Setup
Copy
from openai import OpenAI
from dataclasses import dataclass
import json
@dataclass
class SpeakerSegment:
"""A segment of speech from one speaker."""
speaker_id: str
start_time: float
end_time: float
text: str
class MeetingTranscriber:
"""Transcribe meetings with speaker identification."""
def __init__(self):
self.client = OpenAI()
def transcribe_meeting(self, audio_path: str) -> dict:
"""Transcribe a meeting and analyze speakers."""
# Get detailed transcription
with open(audio_path, "rb") as f:
transcription = self.client.audio.transcriptions.create(
model="whisper-1",
file=f,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
# Use LLM to identify speakers and structure content
segments_text = "\n".join([
f"[{s['start']:.1f}s - {s['end']:.1f}s]: {s['text']}"
for s in transcription.segments
])
analysis_prompt = f"""Analyze this meeting transcript and identify different speakers.
Label speakers as Speaker 1, Speaker 2, etc.
Transcript with timestamps:
{segments_text}
Return as JSON:
{{
"speakers": [{{"id": "Speaker 1", "description": "brief description"}}],
"segments": [
{{"speaker": "Speaker 1", "start": 0.0, "end": 5.0, "text": "..."}}
],
"summary": "brief meeting summary",
"action_items": ["list of action items"]
}}"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": analysis_prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def generate_meeting_notes(self, analysis: dict) -> str:
"""Generate formatted meeting notes."""
notes = ["# Meeting Notes\n"]
notes.append("## Summary")
notes.append(analysis.get("summary", "No summary available."))
notes.append("")
notes.append("## Participants")
for speaker in analysis.get("speakers", []):
notes.append(f"- {speaker['id']}: {speaker.get('description', '')}")
notes.append("")
notes.append("## Transcript")
for segment in analysis.get("segments", []):
notes.append(f"**{segment['speaker']}** ({segment['start']:.0f}s): {segment['text']}")
notes.append("")
notes.append("## Action Items")
for item in analysis.get("action_items", []):
notes.append(f"- [ ] {item}")
return "\n".join(notes)
# Usage
transcriber = MeetingTranscriber()
analysis = transcriber.transcribe_meeting("team_meeting.mp3")
notes = transcriber.generate_meeting_notes(analysis)
print(notes)
Audio Content Analysis
Copy
from openai import OpenAI
import json
class AudioAnalyzer:
"""Analyze audio content for various purposes."""
def __init__(self):
self.client = OpenAI()
def analyze_podcast(self, audio_path: str) -> dict:
"""Analyze a podcast episode."""
# Transcribe
with open(audio_path, "rb") as f:
transcription = self.client.audio.transcriptions.create(
model="whisper-1",
file=f
)
# Analyze content
prompt = f"""Analyze this podcast transcript:
{transcription.text}
Provide a comprehensive analysis as JSON:
{{
"title_suggestion": "suggested episode title",
"topics": ["main topics discussed"],
"key_points": ["key takeaways"],
"quotes": ["notable quotable moments"],
"sentiment": "overall tone (positive/neutral/negative)",
"target_audience": "who would benefit from this",
"seo_keywords": ["relevant keywords"],
"chapter_markers": [
{{"title": "Introduction", "description": "brief description"}}
],
"summary": "2-3 paragraph summary"
}}"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def transcribe_and_translate(
self,
audio_path: str,
target_languages: list[str]
) -> dict:
"""Transcribe and translate to multiple languages."""
# Get English transcript
with open(audio_path, "rb") as f:
english = self.client.audio.translations.create(
model="whisper-1",
file=f
)
translations = {"en": english.text}
# Translate to target languages
for lang in target_languages:
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"Translate the following text to {lang}. Maintain the original meaning and tone."
},
{"role": "user", "content": english.text}
]
)
translations[lang] = response.choices[0].message.content
return translations
# Usage
analyzer = AudioAnalyzer()
# Analyze podcast
analysis = analyzer.analyze_podcast("episode.mp3")
print(f"Suggested title: {analysis['title_suggestion']}")
print(f"Topics: {analysis['topics']}")
# Multi-language transcription
translations = analyzer.transcribe_and_translate(
"speech.mp3",
["es", "fr", "de"]
)
for lang, text in translations.items():
print(f"{lang}: {text[:100]}...")
Voice AI Best Practices
- Pre-process audio to reduce noise and normalize volume
- Use appropriate sample rates (16kHz minimum for Whisper)
- Chunk long audio files to stay within API limits
- Cache transcriptions to avoid repeated API calls
- Consider latency requirements when choosing models
Practice Exercise
Build a voice-enabled assistant that:- Accepts continuous voice input
- Transcribes in real-time with speaker detection
- Generates natural voice responses
- Supports multiple languages
- Provides meeting summaries and action items
- Low-latency response times
- Graceful handling of audio quality issues
- Natural conversation flow
- Persistent conversation context