Applications
Tutoriels
Guides pas à pas pour créer des applications IA
Des tutoriels pratiques pour construire des applications d'intelligence artificielle de A à Z.
Tutoriel 1 : Chatbot RAG complet
Créer un chatbot qui répond aux questions sur vos documents.
Prérequis
pip install anthropic chromadb sentence-transformers pypdfÉtape 1 : Charger les documents
from pypdf import PdfReader
from pathlib import Path
def load_pdf(path: str) -> str:
"""Charge le texte d'un PDF."""
reader = PdfReader(path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def load_documents(folder: str) -> list[dict]:
"""Charge tous les documents d'un dossier."""
documents = []
folder_path = Path(folder)
for file_path in folder_path.glob("*.pdf"):
text = load_pdf(str(file_path))
documents.append({
"source": file_path.name,
"content": text
})
for file_path in folder_path.glob("*.txt"):
text = file_path.read_text()
documents.append({
"source": file_path.name,
"content": text
})
print(f"Chargé {len(documents)} documents")
return documentsÉtape 2 : Découper en chunks
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
"""Découpe le texte en chunks avec chevauchement."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# Essayer de couper à une fin de phrase
if end < len(text):
# Chercher le dernier point dans la zone
last_period = text.rfind('.', start + chunk_size - 100, end)
if last_period > start:
end = last_period + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap
return chunks
def process_documents(documents: list[dict]) -> list[dict]:
"""Découpe tous les documents en chunks."""
all_chunks = []
for doc in documents:
chunks = chunk_text(doc["content"])
for i, chunk in enumerate(chunks):
all_chunks.append({
"id": f"{doc['source']}_{i}",
"source": doc["source"],
"content": chunk
})
print(f"Créé {len(all_chunks)} chunks")
return all_chunksÉtape 3 : Créer la base vectorielle
import chromadb
from sentence_transformers import SentenceTransformer
def create_vector_store(chunks: list[dict], collection_name: str = "documents"):
"""Crée la base vectorielle avec les chunks."""
# Initialiser le modèle d'embedding
embedder = SentenceTransformer('all-MiniLM-L6-v2')
# Créer le client Chroma
client = chromadb.PersistentClient(path="./chroma_db")
# Supprimer la collection si elle existe
try:
client.delete_collection(collection_name)
except:
pass
# Créer la collection
collection = client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
# Embedder et ajouter les chunks par lots
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
# Générer les embeddings
texts = [c["content"] for c in batch]
embeddings = embedder.encode(texts).tolist()
# Ajouter à la collection
collection.add(
ids=[c["id"] for c in batch],
embeddings=embeddings,
documents=texts,
metadatas=[{"source": c["source"]} for c in batch]
)
print(f"Indexé {min(i+batch_size, len(chunks))}/{len(chunks)}")
print(f"Base vectorielle créée avec {len(chunks)} chunks")
return collectionÉtape 4 : Créer le chatbot
from anthropic import Anthropic
from sentence_transformers import SentenceTransformer
import chromadb
class RAGChatbot:
def __init__(self, collection_name: str = "documents"):
self.llm = Anthropic()
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
self.client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.client.get_collection(collection_name)
self.conversation = []
def search(self, query: str, n_results: int = 5) -> list[dict]:
"""Recherche les chunks pertinents."""
query_embedding = self.embedder.encode(query).tolist()
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
return [
{
"content": doc,
"source": meta["source"],
"score": 1 - dist # Convertir distance en similarité
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
]
def chat(self, user_message: str) -> str:
"""Répond à une question en utilisant le RAG."""
# Rechercher le contexte
relevant_docs = self.search(user_message)
# Construire le contexte
context = "\n\n---\n\n".join([
f"[Source: {doc['source']}]\n{doc['content']}"
for doc in relevant_docs
])
# Construire le prompt système
system_prompt = """Tu es un assistant qui répond aux questions en utilisant uniquement les informations fournies dans le contexte.
Règles:
- Utilise UNIQUEMENT les informations du contexte
- Si la réponse n'est pas dans le contexte, dis-le clairement
- Cite les sources quand c'est pertinent
- Sois précis et concis"""
# Ajouter le message utilisateur avec contexte
augmented_message = f"""CONTEXTE:
{context}
QUESTION: {user_message}"""
self.conversation.append({
"role": "user",
"content": augmented_message
})
# Appeler le LLM
response = self.llm.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=system_prompt,
messages=self.conversation
)
assistant_message = response.content[0].text
self.conversation.append({
"role": "assistant",
"content": assistant_message
})
return assistant_message
def reset(self):
"""Réinitialise la conversation."""
self.conversation = []Étape 5 : Utilisation
# Indexation (à faire une fois)
documents = load_documents("./mes_documents")
chunks = process_documents(documents)
create_vector_store(chunks)
# Utilisation du chatbot
chatbot = RAGChatbot()
while True:
question = input("\nVous: ")
if question.lower() in ['quit', 'exit', 'q']:
break
response = chatbot.chat(question)
print(f"\nAssistant: {response}")Tutoriel 2 : Classification d'images
Créer un classificateur d'images personnalisé avec transfer learning.
Prérequis
pip install torch torchvision pillow matplotlibÉtape 1 : Préparer les données
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
def prepare_data(data_dir: str, batch_size: int = 32):
"""Prépare les dataloaders pour l'entraînement."""
# Transformations pour l'entraînement (avec augmentation)
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# Transformations pour la validation
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# Charger les datasets
train_dataset = datasets.ImageFolder(f"{data_dir}/train", train_transform)
val_dataset = datasets.ImageFolder(f"{data_dir}/val", val_transform)
# Créer les dataloaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
print(f"Classes: {train_dataset.classes}")
print(f"Train: {len(train_dataset)} images")
print(f"Val: {len(val_dataset)} images")
return train_loader, val_loader, train_dataset.classesÉtape 2 : Créer le modèle
import torch.nn as nn
from torchvision import models
def create_model(num_classes: int, pretrained: bool = True):
"""Crée un modèle ResNet avec transfer learning."""
# Charger ResNet pré-entraîné
model = models.resnet50(pretrained=pretrained)
# Geler les couches de base
for param in model.parameters():
param.requires_grad = False
# Remplacer la dernière couche
num_features = model.fc.in_features
model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes)
)
return modelÉtape 3 : Entraîner le modèle
import torch.optim as optim
from tqdm import tqdm
def train_model(model, train_loader, val_loader, num_epochs: int = 10):
"""Entraîne le modèle."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
best_acc = 0.0
for epoch in range(num_epochs):
print(f"\nEpoch {epoch+1}/{num_epochs}")
print("-" * 20)
# Phase d'entraînement
model.train()
train_loss = 0.0
train_correct = 0
for inputs, labels in tqdm(train_loader, desc="Training"):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
_, preds = torch.max(outputs, 1)
train_correct += (preds == labels).sum().item()
train_loss = train_loss / len(train_loader.dataset)
train_acc = train_correct / len(train_loader.dataset)
# Phase de validation
model.train(False)
val_loss = 0.0
val_correct = 0
with torch.no_grad():
for inputs, labels in tqdm(val_loader, desc="Validation"):
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
_, preds = torch.max(outputs, 1)
val_correct += (preds == labels).sum().item()
val_loss = val_loss / len(val_loader.dataset)
val_acc = val_correct / len(val_loader.dataset)
print(f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")
# Sauvegarder le meilleur modèle
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), "best_model.pth")
print("Meilleur modèle sauvegardé!")
scheduler.step()
print(f"\nMeilleure accuracy: {best_acc:.4f}")
return modelÉtape 4 : Utiliser le modèle
from PIL import Image
def predict(model, image_path: str, classes: list[str]):
"""Prédit la classe d'une image."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.train(False)
# Préprocessing
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# Charger et transformer l'image
image = Image.open(image_path).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device)
# Prédiction
with torch.no_grad():
outputs = model(input_tensor)
probabilities = torch.nn.functional.softmax(outputs[0], dim=0)
# Top 3 prédictions
top3_prob, top3_idx = torch.topk(probabilities, 3)
results = []
for prob, idx in zip(top3_prob, top3_idx):
results.append({
"class": classes[idx],
"probability": prob.item()
})
return results
# Exemple d'utilisation
train_loader, val_loader, classes = prepare_data("./data")
model = create_model(num_classes=len(classes))
model = train_model(model, train_loader, val_loader, num_epochs=10)
# Prédiction
results = predict(model, "test_image.jpg", classes)
for r in results:
print(f"{r['class']}: {r['probability']:.2%}")Tutoriel 3 : API de prédiction ML
Déployer un modèle ML avec FastAPI.
Prérequis
pip install fastapi uvicorn scikit-learn joblib pandasÉtape 1 : Entraîner et sauvegarder le modèle
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import joblib
import pandas as pd
# Charger les données
df = pd.read_csv("data.csv")
X = df.drop("target", axis=1)
y = df["target"]
# Créer le pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# Entraîner
pipeline.fit(X, y)
# Sauvegarder
joblib.dump(pipeline, "model.joblib")
joblib.dump(list(X.columns), "features.joblib")
print("Modèle sauvegardé!")Étape 2 : Créer l'API
# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import joblib
import numpy as np
from typing import Optional
app = FastAPI(
title="API de Prédiction ML",
description="API pour prédire avec un modèle de classification",
version="1.0.0"
)
# Charger le modèle au démarrage
model = joblib.load("model.joblib")
feature_names = joblib.load("features.joblib")
class PredictionInput(BaseModel):
features: dict[str, float]
@validator('features')
def check_features(cls, v):
missing = set(feature_names) - set(v.keys())
if missing:
raise ValueError(f"Features manquantes: {missing}")
return v
class PredictionOutput(BaseModel):
prediction: int
probability: float
probabilities: dict[str, float]
class BatchInput(BaseModel):
items: list[PredictionInput]
class BatchOutput(BaseModel):
predictions: list[PredictionOutput]
@app.get("/")
def root():
return {"message": "API de Prédiction ML", "version": "1.0.0"}
@app.get("/health")
def health():
return {"status": "healthy", "model_loaded": model is not None}
@app.get("/features")
def get_features():
return {"features": feature_names}
@app.post("/predict", response_model=PredictionOutput)
def predict(input_data: PredictionInput):
try:
# Préparer les données
features = [input_data.features[f] for f in feature_names]
X = np.array(features).reshape(1, -1)
# Prédiction
prediction = model.predict(X)[0]
probabilities = model.predict_proba(X)[0]
return PredictionOutput(
prediction=int(prediction),
probability=float(max(probabilities)),
probabilities={
str(i): float(p)
for i, p in enumerate(probabilities)
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch", response_model=BatchOutput)
def predict_batch(input_data: BatchInput):
predictions = []
for item in input_data.items:
result = predict(item)
predictions.append(result)
return BatchOutput(predictions=predictions)Étape 3 : Lancer l'API
uvicorn api:app --reload --host 0.0.0.0 --port 8000Étape 4 : Tester l'API
import requests
# Test de prédiction
response = requests.post(
"http://localhost:8000/predict",
json={
"features": {
"feature1": 1.5,
"feature2": 2.3,
"feature3": 0.8
}
}
)
print(response.json())
# Test batch
response = requests.post(
"http://localhost:8000/predict/batch",
json={
"items": [
{"features": {"feature1": 1.0, "feature2": 2.0, "feature3": 0.5}},
{"features": {"feature1": 1.5, "feature2": 2.5, "feature3": 0.7}}
]
}
)
print(response.json())Étape 5 : Dockeriser
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.joblib features.joblib api.py ./
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3Tutoriel 4 : Analyse de sentiments en temps réel
Analyser les sentiments de tweets ou avis en streaming.
Prérequis
pip install transformers torch streamlit pandas plotlyÉtape 1 : Créer l'analyseur
# sentiment_analyzer.py
from transformers import pipeline
import pandas as pd
from datetime import datetime
class SentimentAnalyzer:
def __init__(self):
self.analyzer = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment"
)
self.history = []
def analyze(self, text: str) -> dict:
result = self.analyzer(text)[0]
stars = int(result['label'][0])
sentiment = "positive" if stars >= 4 else "negative" if stars <= 2 else "neutral"
analysis = {
"text": text,
"sentiment": sentiment,
"stars": stars,
"confidence": result['score'],
"timestamp": datetime.now().isoformat()
}
self.history.append(analysis)
return analysis
def analyze_batch(self, texts: list[str]) -> list[dict]:
return [self.analyze(text) for text in texts]
def get_stats(self) -> dict:
if not self.history:
return {}
df = pd.DataFrame(self.history)
return {
"total": len(df),
"positive": len(df[df['sentiment'] == 'positive']),
"negative": len(df[df['sentiment'] == 'negative']),
"neutral": len(df[df['sentiment'] == 'neutral']),
"average_stars": df['stars'].mean(),
"average_confidence": df['confidence'].mean()
}
def get_history(self) -> pd.DataFrame:
return pd.DataFrame(self.history)Étape 2 : Interface Streamlit
# app.py
import streamlit as st
import plotly.express as px
from sentiment_analyzer import SentimentAnalyzer
st.set_page_config(page_title="Analyse de Sentiments", layout="wide")
# Initialiser l'analyseur
if 'analyzer' not in st.session_state:
st.session_state.analyzer = SentimentAnalyzer()
analyzer = st.session_state.analyzer
st.title("Analyse de Sentiments en Temps Réel")
# Input
col1, col2 = st.columns([2, 1])
with col1:
text_input = st.text_area("Entrez un texte à analyser:", height=100)
if st.button("Analyser", type="primary"):
if text_input:
with st.spinner("Analyse en cours..."):
result = analyzer.analyze(text_input)
# Afficher le résultat
sentiment_color = {
"positive": "green",
"negative": "red",
"neutral": "gray"
}
st.markdown(f"""
### Résultat
- **Sentiment**: :{sentiment_color[result['sentiment']]}[{result['sentiment'].upper()}]
- **Note**: {'⭐' * result['stars']}
- **Confiance**: {result['confidence']:.2%}
""")
with col2:
st.subheader("Statistiques")
stats = analyzer.get_stats()
if stats:
st.metric("Total analysés", stats['total'])
st.metric("Score moyen", f"{stats['average_stars']:.1f} ⭐")
# Graphique
fig = px.pie(
values=[stats['positive'], stats['neutral'], stats['negative']],
names=['Positif', 'Neutre', 'Négatif'],
color_discrete_sequence=['green', 'gray', 'red']
)
st.plotly_chart(fig, use_container_width=True)
# Historique
st.subheader("Historique des analyses")
history = analyzer.get_history()
if not history.empty:
st.dataframe(
history[['text', 'sentiment', 'stars', 'confidence', 'timestamp']],
use_container_width=True
)
# Télécharger
csv = history.to_csv(index=False)
st.download_button(
"Télécharger CSV",
csv,
"sentiment_analysis.csv",
"text/csv"
)Étape 3 : Lancer l'application
streamlit run app.pyBonnes pratiques générales
Structure de projet
mon_projet_ia/
├── data/
│ ├── raw/ # Données brutes
│ ├── processed/ # Données traitées
│ └── models/ # Modèles sauvegardés
├── notebooks/ # Exploration et prototypage
├── src/
│ ├── data/ # Chargement et preprocessing
│ ├── models/ # Définition des modèles
│ ├── training/ # Entraînement
│ └── inference/ # Inférence
├── tests/ # Tests unitaires
├── api/ # API de serving
├── requirements.txt
├── Dockerfile
└── README.mdGestion des dépendances
# requirements.txt
torch>=2.0.0
transformers>=4.30.0
anthropic>=0.18.0
chromadb>=0.4.0
fastapi>=0.100.0
uvicorn>=0.23.0
scikit-learn>=1.3.0
pandas>=2.0.0
numpy>=1.24.0Configuration
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API Keys
anthropic_api_key: str
openai_api_key: str | None = None
# Model settings
model_name: str = "claude-sonnet-4-20250514"
max_tokens: int = 1024
temperature: float = 0.7
# Database
chroma_path: str = "./chroma_db"
# API
api_host: str = "0.0.0.0"
api_port: int = 8000
class Config:
env_file = ".env"
settings = Settings()Logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Utilisation
logger.info("Démarrage de l'application")
logger.error("Erreur lors du traitement", exc_info=True)Résumé
| Tutoriel | Technologies | Difficulté |
|---|---|---|
| Chatbot RAG | Anthropic, Chroma, Sentence Transformers | Intermédiaire |
| Classification d'images | PyTorch, ResNet, Transfer Learning | Intermédiaire |
| API ML | FastAPI, scikit-learn, Docker | Débutant |
| Analyse de sentiments | Transformers, Streamlit | Débutant |