Skip to content

Tutorial: Análise Avançada de Chamadas de Call Center

Este tutorial demonstra como construir um pipeline robusto para processar gravações de áudio de um call center. Ao final, você terá um fluxo automatizado que:

  1. Transcreve o áudio de chamadas identificando os diferentes interlocutores (diarização).
  2. Anonimiza dados sensíveis (como CPF, e-mail, telefone) diretamente na transcrição.
  3. Executa uma Análise Estruturada para extrair métricas de qualidade, sentimento e outros KPIs.
  4. Prepara os dados para serem enviados a um banco de dados, BI ou planilha.

Vamos combinar dois recursos poderosos da SipPulse AI: a Transcrição de Áudio Avançada e as Análises Estruturadas.

Pré-requisitos

  • Uma conta ativa na plataforma SipPulse AI.
  • Uma Análise Estruturada já criada na plataforma para avaliar as chamadas. (Ver Passo 1).
  • Gravações de chamadas em um formato de áudio compatível (ex: .wav, .mp3).
  • Conhecimento básico de Python ou TypeScript (Node.js) para executar os scripts.

Passo 1: Criar a Análise Estruturada

Antes de processar as chamadas, precisamos definir quais informações queremos extrair. Faremos isso com uma Análise Estruturada. Pense nela como um formulário que a IA preencherá para cada chamada.

Para um guia detalhado, consulte a documentação de Análises Estruturadas.

  1. Navegue até Análises Estruturadas e clique em Nova Análise.
  2. Dê um nome claro, como analise_qualidade_chamada.
  3. Na descrição, explique o objetivo: "Extrai métricas de qualidade, sentimento do cliente e resolução do problema a partir de transcrições de chamadas de suporte."
  4. Adicione os campos que você deseja extrair. Por exemplo:
    • resumo_chamada (Tipo: Texto): "Um resumo conciso de todo o diálogo."
    • sentimento_cliente (Tipo: Enum, Opções: positivo, negativo, neutro): "O sentimento geral do cliente durante a chamada."
    • problema_resolvido (Tipo: Booleano): "O problema do cliente foi resolvido? (true/false)"
    • nota_desempenho_agente (Tipo: Número): "Nota de 1 a 5 para o desempenho do atendente."
  5. Salve a análise e copie o ID dela. Você precisará dele no próximo passo.

Passo 2: Construir o Pipeline de Processamento

Agora, vamos criar o script que orquestra todo o processo. O fluxo será o seguinte para cada arquivo de áudio:

  1. Enviar para Transcrição: Chamar a API de STT com o parâmetro diarization e um objeto de configuração para anonymize, especificando as entidades a serem redigidas.
  2. Receber Transcrição Anônima: Obter o texto da transcrição já com os dados sensíveis ofuscados.
  3. Enviar para Análise: Usar o texto transcrito como entrada para a Análise Estruturada que criamos.
  4. Combinar Resultados: Juntar a transcrição e os insights estruturados em um único objeto.

Abaixo estão os exemplos de código completos para automatizar este fluxo.

python
import os
import requests
import json
from typing import List, Dict, Any

# --- Configuração ---
SIPPULSE_API_KEY = os.environ.get("SIPPULSE_API_KEY")
API_BASE_URL = "https://api.sippulse.ai"
# Cole o ID da Análise Estruturada criada no Passo 1
ANALYSIS_ID = "YOUR_STRUCTURED_ANALYSIS_ID"
# Defina as entidades que deseja anonimizar na transcrição
# Consulte a documentação de Anonimização para ver todas as entidades disponíveis
ANONYMIZE_CONFIG = {
    "entities": ["CPF", "EMAIL", "PERSON", "LOCATION", "NUMBER"],
    "sequence": 4 # Anonimiza números com 4 ou mais dígitos
}

def process_call_recording(file_path: str, analysis_id: str) -> Dict[str, Any]:
    """
    Transcreve, anonimiza e analisa uma única gravação de chamada.
    """
    if not SIPPULSE_API_KEY:
        raise ValueError("A variável de ambiente SIPPULSE_API_KEY não está definida.")

    headers = {"api-key": SIPPULSE_API_KEY}

    # 1. Transcrever com diarização e anonimização
    print(f"Processando arquivo: {file_path}")
    with open(file_path, "rb") as audio_file:
        files = {"file": (os.path.basename(file_path), audio_file)}
        payload = {
            "model": "pulse-precision",
            "format": "diarization", # Retorna texto com identificação de falantes
            "anonymize": json.dumps(ANONYMIZE_CONFIG) # Ativa e configura a anonimização
        }
        asr_response = requests.post(
            f"{API_BASE_URL}/v1/asr/transcribe",
            headers=headers,
            files=files,
            data=payload
        )
        asr_response.raise_for_status()
        transcription_data = asr_response.json()
        anonymized_text = transcription_data.get("text")

    if not anonymized_text:
        raise Exception("Falha ao obter texto anonimizado da transcrição.")

    # 2. Executar Análise Estruturada
    analysis_response = requests.post(
        f"{API_BASE_URL}/structured-analyses/{analysis_id}/execute",
        headers=headers,
        json={"content": anonymized_text}
    )
    analysis_response.raise_for_status()
    analysis_result = analysis_response.json().get("content")

    # 3. Combinar resultados
    return {
        "filename": os.path.basename(file_path),
        "transcription": transcription_data,
        "quality_analysis": analysis_result
    }

def save_to_database(data: Dict[str, Any]):
    """
    Função de exemplo para salvar os resultados.
    Substitua pela sua lógica de integração (ex: salvar em um banco de dados,
    enviar para uma API de BI ou adicionar a uma planilha Google Sheets).
    """
    print("\n--- Resultado Final para Armazenamento ---")
    print(json.dumps(data, indent=2, ensure_ascii=False))
    print("-----------------------------------------\n")


if __name__ == "__main__":
    # Simula uma lista de arquivos de uma pasta de gravações
    recordings_path = "./call_recordings" # Crie esta pasta e adicione seus áudios
    if not os.path.exists(recordings_path):
        os.makedirs(recordings_path)
        print(f"Pasta '{recordings_path}' criada. Adicione seus arquivos de áudio .wav ou .mp3 nela.")
    
    audio_files = [os.path.join(recordings_path, f) for f in os.listdir(recordings_path) if f.endswith(('.wav', '.mp3'))]

    if not audio_files:
        print("Nenhum arquivo de áudio encontrado na pasta 'call_recordings'.")

    for audio_file in audio_files:
        try:
            final_result = process_call_recording(audio_file, ANALYSIS_ID)
            save_to_database(final_result)
        except Exception as e:
            print(f"Erro ao processar o arquivo {audio_file}: {e}")
typescript
import fs from 'fs/promises';
import path from 'path';
import FormData from 'form-data';
import fetch from 'node-fetch'; // `npm install node-fetch`

// --- Configuração ---
const SIPPULSE_API_KEY = process.env.SIPPULSE_API_KEY;
const API_BASE_URL = 'https://api.sippulse.ai';
// Cole o ID da Análise Estruturada criada no Passo 1
const ANALYSIS_ID = 'YOUR_STRUCTURED_ANALYSIS_ID';
// Defina as entidades que deseja anonimizar na transcrição
// Consulte a documentação de Anonimização para ver todas as entidades disponíveis
const ANONYMIZE_CONFIG = {
    entities: ["CPF", "EMAIL", "PERSON", "LOCATION", "NUMBER"],
    sequence: 4, // Anonimiza números com 4 ou mais dígitos
};

async function processCallRecording(filePath: string, analysisId: string): Promise<any> {
  if (!SIPPULSE_API_KEY) {
    throw new Error('A variável de ambiente SIPPULSE_API_KEY não está definida.');
  }

  const headers = { 'api-key': SIPPULSE_API_KEY };

  // 1. Transcrever com diarização e anonimização
  console.log(`Processando arquivo: ${filePath}`);
  const fileBuffer = await fs.readFile(filePath);
  const form = new FormData();
  form.append('file', fileBuffer, path.basename(filePath));
  form.append('model', 'pulse-precision');
  form.append('format', 'diarization'); // Retorna texto com identificação de falantes
  form.append('anonymize', JSON.stringify(ANONYMIZE_CONFIG)); // Ativa e configura a anonimização

  const asrResponse = await fetch(`${API_BASE_URL}/v1/asr/transcribe`, {
    method: 'POST',
    headers: { ...headers, ...form.getHeaders() },
    body: form,
  });

  if (!asrResponse.ok) {
    throw new Error(`Erro na transcrição: ${await asrResponse.text()}`);
  }
  const transcriptionData: any = await asrResponse.json();
  const anonymizedText = transcriptionData?.text;

  if (!anonymizedText) {
    throw new Error('Falha ao obter texto anonimizado da transcrição.');
  }

  // 2. Executar Análise Estruturada
  const analysisResponse = await fetch(`${API_BASE_URL}/structured-analyses/${analysisId}/execute`, {
      method: 'POST',
      headers: { ...headers, 'Content-Type': 'application/json' },
      body: JSON.stringify({ content: anonymizedText }),
  });

  if (!analysisResponse.ok) {
      throw new Error(`Erro na análise estruturada: ${await analysisResponse.text()}`);
  }
  const analysisResult = (await analysisResponse.json() as any).content;

  // 3. Combinar resultados
  return {
    filename: path.basename(filePath),
    transcription: transcriptionData,
    quality_analysis: analysisResult,
  };
}

function saveToDatabase(data: any) {
  /**
   * Função de exemplo para salvar os resultados.
   * Substitua pela sua lógica de integração (ex: salvar em um banco de dados,
   * enviar para uma API de BI ou adicionar a uma planilha Google Sheets).
   */
  console.log('\n--- Resultado Final para Armazenamento ---');
  console.log(JSON.stringify(data, null, 2));
  console.log('-----------------------------------------\n');
}

(async () => {
  // Simula uma lista de arquivos de uma pasta de gravações
  const recordingsPath = './call_recordings'; // Crie esta pasta e adicione seus áudios
  try {
    await fs.access(recordingsPath);
  } catch {
    await fs.mkdir(recordingsPath);
    console.log(`Pasta '${recordingsPath}' criada. Adicione seus arquivos de áudio .wav ou .mp3 nela.`);
  }

  const files = await fs.readdir(recordingsPath);
  const audioFiles = files
    .filter(f => f.endsWith('.wav') || f.endsWith('.mp3'))
    .map(f => path.join(recordingsPath, f));

  if (audioFiles.length === 0) {
    console.log("Nenhum arquivo de áudio encontrado na pasta 'call_recordings'.");
    return;
  }

  for (const audioFile of audioFiles) {
    try {
      const finalResult = await processCallRecording(audioFile, ANALYSIS_ID);
      saveToDatabase(finalResult);
    } catch (error) {
      console.error(`Erro ao processar o arquivo ${audioFile}:`, error);
    }
  }
})();

Passo 3: Executar e Armazenar os Resultados

  1. Configure o Ambiente:
    • Crie uma pasta chamada call_recordings no mesmo diretório do seu script.
    • Coloque alguns arquivos de áudio de chamadas dentro dela.
    • Defina a variável de ambiente SIPPULSE_API_KEY com sua chave da API.
    • Substitua YOUR_STRUCTURED_ANALYSIS_ID no script pelo ID real da sua análise.
  2. Execute o Script:
    • Para Python: python nome_do_script.py
    • Para Node.js: node nome_do_script.js
  3. Verifique a Saída: O script irá iterar sobre cada arquivo de áudio, processá-lo e imprimir um objeto JSON final no console. Este objeto contém o nome do arquivo, a transcrição completa com diarização e o resultado da sua análise de qualidade.

A função save_to_database é um placeholder. É aqui que você adicionaria a lógica para enviar o JSON final para seu destino, como:

  • Inserir em uma tabela do PostgreSQL ou MongoDB.
  • Adicionar uma nova linha em uma planilha do Google Sheets.
  • Enviar para uma plataforma de BI como Power BI ou Looker.

Passo 4 (Opcional): Criando um Endpoint para Automação Contínua

Para uma automação completa, em vez de executar o script manualmente sobre uma pasta, você pode encapsular a lógica em um endpoint de API. Seu sistema de telefonia (ou qualquer outro sistema que gera as gravações) pode, então, simplesmente enviar o arquivo de áudio para este endpoint sempre que uma nova chamada for finalizada.

Isso cria um fluxo de trabalho desacoplado e escalável. Abaixo estão exemplos de como criar um servidor web simples para este propósito usando FastAPI (Python) e Express (Node.js).

Dica de Produção

Para ambientes de produção, é recomendado que o processamento do arquivo (que pode levar vários segundos) seja executado de forma assíncrona em uma fila de tarefas (como Celery para Python ou BullMQ para Node.js). O endpoint retornaria uma resposta imediata (202 Accepted) e o processamento ocorreria em segundo plano.

python
# Salve como `server.py`.
import os
import shutil
from fastapi import FastAPI, UploadFile, File, HTTPException

# Supondo que o script do Passo 2 foi salvo como `pipeline.py`
from pipeline import process_call_recording, save_to_database, ANALYSIS_ID

app = FastAPI()

@app.post("/process-recording/")
async def handle_recording_upload(file: UploadFile = File(...)):
    temp_dir = "./temp_recordings"
    os.makedirs(temp_dir, exist_ok=True)
    temp_file_path = os.path.join(temp_dir, file.filename)

    try:
        with open(temp_file_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)

        print(f"Recebido novo arquivo para processar: {file.filename}")
        # O FastAPI executa chamadas síncronas em um thread pool para não bloquear.
        final_result = process_call_recording(temp_file_path, ANALYSIS_ID)
        save_to_database(final_result)

        return {"filename": file.filename, "status": "processed"}
    except Exception as e:
        print(f"Erro ao processar {file.filename}: {e}")
        raise HTTPException(status_code=500, detail=f"Falha ao processar o arquivo: {str(e)}")
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

# Para executar:
# 1. Salve o código Python do Passo 2 em um arquivo chamado `pipeline.py`.
# 2. Instale FastAPI e Uvicorn: pip install "fastapi[all]"
# 3. Execute o servidor: uvicorn server:app --reload
typescript
// Salve como `server.js`.
import express from 'express';
import multer from 'multer';
import fs from 'fs/promises';
import path from 'path';

// Supondo que o código do Passo 2 foi salvo como `pipeline.js` e as funções foram exportadas.
import { processCallRecording, saveToDatabase, ANALYSIS_ID } from './pipeline.js';

const app = express();
const port = 3000;

const tempDir = './temp_recordings';
const upload = multer({ dest: tempDir });

app.post('/process-recording', upload.single('file'), async (req, res) => {
  if (!req.file) {
    return res.status(400).send({ message: 'Nenhum arquivo enviado.' });
  }

  const tempFilePath = req.file.path;
  console.log(`Recebido novo arquivo para processar: ${req.file.originalname}`);

  try {
    const finalResult = await processCallRecording(tempFilePath, ANALYSIS_ID);
    saveToDatabase(finalResult);

    res.status(200).send({ filename: req.file.originalname, status: 'processed' });
  } catch (error) {
    console.error(`Erro ao processar ${req.file.originalname}:`, error);
    res.status(500).send({ message: 'Falha ao processar o arquivo.', error: error.message });
  } finally {
    // Limpa o arquivo temporário
    await fs.unlink(tempFilePath);
  }
});

app.listen(port, async () => {
  await fs.mkdir(tempDir, { recursive: true });
  console.log(`Servidor de processamento ouvindo na porta ${port}`);
});

// Para executar:
// 1. Salve o código Node.js do Passo 2 em `pipeline.js` e exporte as funções necessárias.
// 2. Instale Express e Multer: npm install express multer
// 3. Execute o servidor: node server.js

Conclusão

Parabéns! Você criou um pipeline de ponta a ponta que transforma gravações de chamadas brutas em insights acionáveis e estruturados, garantindo a privacidade com a anonimização automática.

Este processo, que antes exigia horas de trabalho manual, agora pode ser executado em minutos, permitindo que sua equipe de qualidade se concentre em usar os dados para treinar atendentes e melhorar a experiência do cliente.