Wiki IA
Applications

Tutoriels

Guides pas à pas pour créer des applications IA

Des tutoriels pratiques pour construire des applications d'intelligence artificielle de A à Z.

Tutoriel 1 : Chatbot RAG complet

Créer un chatbot qui répond aux questions sur vos documents.

Prérequis

pip install anthropic chromadb sentence-transformers pypdf

Étape 1 : Charger les documents

from pypdf import PdfReader
from pathlib import Path

def load_pdf(path: str) -> str:
    """Charge le texte d'un PDF."""
    reader = PdfReader(path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() + "\n"
    return text

def load_documents(folder: str) -> list[dict]:
    """Charge tous les documents d'un dossier."""
    documents = []
    folder_path = Path(folder)

    for file_path in folder_path.glob("*.pdf"):
        text = load_pdf(str(file_path))
        documents.append({
            "source": file_path.name,
            "content": text
        })

    for file_path in folder_path.glob("*.txt"):
        text = file_path.read_text()
        documents.append({
            "source": file_path.name,
            "content": text
        })

    print(f"Chargé {len(documents)} documents")
    return documents

Étape 2 : Découper en chunks

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
    """Découpe le texte en chunks avec chevauchement."""
    chunks = []
    start = 0

    while start < len(text):
        end = start + chunk_size

        # Essayer de couper à une fin de phrase
        if end < len(text):
            # Chercher le dernier point dans la zone
            last_period = text.rfind('.', start + chunk_size - 100, end)
            if last_period > start:
                end = last_period + 1

        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)

        start = end - overlap

    return chunks

def process_documents(documents: list[dict]) -> list[dict]:
    """Découpe tous les documents en chunks."""
    all_chunks = []

    for doc in documents:
        chunks = chunk_text(doc["content"])
        for i, chunk in enumerate(chunks):
            all_chunks.append({
                "id": f"{doc['source']}_{i}",
                "source": doc["source"],
                "content": chunk
            })

    print(f"Créé {len(all_chunks)} chunks")
    return all_chunks

Étape 3 : Créer la base vectorielle

import chromadb
from sentence_transformers import SentenceTransformer

def create_vector_store(chunks: list[dict], collection_name: str = "documents"):
    """Crée la base vectorielle avec les chunks."""
    # Initialiser le modèle d'embedding
    embedder = SentenceTransformer('all-MiniLM-L6-v2')

    # Créer le client Chroma
    client = chromadb.PersistentClient(path="./chroma_db")

    # Supprimer la collection si elle existe
    try:
        client.delete_collection(collection_name)
    except:
        pass

    # Créer la collection
    collection = client.create_collection(
        name=collection_name,
        metadata={"hnsw:space": "cosine"}
    )

    # Embedder et ajouter les chunks par lots
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]

        # Générer les embeddings
        texts = [c["content"] for c in batch]
        embeddings = embedder.encode(texts).tolist()

        # Ajouter à la collection
        collection.add(
            ids=[c["id"] for c in batch],
            embeddings=embeddings,
            documents=texts,
            metadatas=[{"source": c["source"]} for c in batch]
        )

        print(f"Indexé {min(i+batch_size, len(chunks))}/{len(chunks)}")

    print(f"Base vectorielle créée avec {len(chunks)} chunks")
    return collection

Étape 4 : Créer le chatbot

from anthropic import Anthropic
from sentence_transformers import SentenceTransformer
import chromadb

class RAGChatbot:
    def __init__(self, collection_name: str = "documents"):
        self.llm = Anthropic()
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
        self.client = chromadb.PersistentClient(path="./chroma_db")
        self.collection = self.client.get_collection(collection_name)
        self.conversation = []

    def search(self, query: str, n_results: int = 5) -> list[dict]:
        """Recherche les chunks pertinents."""
        query_embedding = self.embedder.encode(query).tolist()

        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            include=["documents", "metadatas", "distances"]
        )

        return [
            {
                "content": doc,
                "source": meta["source"],
                "score": 1 - dist  # Convertir distance en similarité
            }
            for doc, meta, dist in zip(
                results["documents"][0],
                results["metadatas"][0],
                results["distances"][0]
            )
        ]

    def chat(self, user_message: str) -> str:
        """Répond à une question en utilisant le RAG."""
        # Rechercher le contexte
        relevant_docs = self.search(user_message)

        # Construire le contexte
        context = "\n\n---\n\n".join([
            f"[Source: {doc['source']}]\n{doc['content']}"
            for doc in relevant_docs
        ])

        # Construire le prompt système
        system_prompt = """Tu es un assistant qui répond aux questions en utilisant uniquement les informations fournies dans le contexte.

Règles:
- Utilise UNIQUEMENT les informations du contexte
- Si la réponse n'est pas dans le contexte, dis-le clairement
- Cite les sources quand c'est pertinent
- Sois précis et concis"""

        # Ajouter le message utilisateur avec contexte
        augmented_message = f"""CONTEXTE:
{context}

QUESTION: {user_message}"""

        self.conversation.append({
            "role": "user",
            "content": augmented_message
        })

        # Appeler le LLM
        response = self.llm.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            system=system_prompt,
            messages=self.conversation
        )

        assistant_message = response.content[0].text

        self.conversation.append({
            "role": "assistant",
            "content": assistant_message
        })

        return assistant_message

    def reset(self):
        """Réinitialise la conversation."""
        self.conversation = []

Étape 5 : Utilisation

# Indexation (à faire une fois)
documents = load_documents("./mes_documents")
chunks = process_documents(documents)
create_vector_store(chunks)

# Utilisation du chatbot
chatbot = RAGChatbot()

while True:
    question = input("\nVous: ")
    if question.lower() in ['quit', 'exit', 'q']:
        break

    response = chatbot.chat(question)
    print(f"\nAssistant: {response}")

Tutoriel 2 : Classification d'images

Créer un classificateur d'images personnalisé avec transfer learning.

Prérequis

pip install torch torchvision pillow matplotlib

Étape 1 : Préparer les données

import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

def prepare_data(data_dir: str, batch_size: int = 32):
    """Prépare les dataloaders pour l'entraînement."""

    # Transformations pour l'entraînement (avec augmentation)
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Transformations pour la validation
    val_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Charger les datasets
    train_dataset = datasets.ImageFolder(f"{data_dir}/train", train_transform)
    val_dataset = datasets.ImageFolder(f"{data_dir}/val", val_transform)

    # Créer les dataloaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    print(f"Classes: {train_dataset.classes}")
    print(f"Train: {len(train_dataset)} images")
    print(f"Val: {len(val_dataset)} images")

    return train_loader, val_loader, train_dataset.classes

Étape 2 : Créer le modèle

import torch.nn as nn
from torchvision import models

def create_model(num_classes: int, pretrained: bool = True):
    """Crée un modèle ResNet avec transfer learning."""

    # Charger ResNet pré-entraîné
    model = models.resnet50(pretrained=pretrained)

    # Geler les couches de base
    for param in model.parameters():
        param.requires_grad = False

    # Remplacer la dernière couche
    num_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(num_features, 256),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(256, num_classes)
    )

    return model

Étape 3 : Entraîner le modèle

import torch.optim as optim
from tqdm import tqdm

def train_model(model, train_loader, val_loader, num_epochs: int = 10):
    """Entraîne le modèle."""

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print("-" * 20)

        # Phase d'entraînement
        model.train()
        train_loss = 0.0
        train_correct = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            train_correct += (preds == labels).sum().item()

        train_loss = train_loss / len(train_loader.dataset)
        train_acc = train_correct / len(train_loader.dataset)

        # Phase de validation
        model.train(False)
        val_loss = 0.0
        val_correct = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()

        val_loss = val_loss / len(val_loader.dataset)
        val_acc = val_correct / len(val_loader.dataset)

        print(f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")

        # Sauvegarder le meilleur modèle
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), "best_model.pth")
            print("Meilleur modèle sauvegardé!")

        scheduler.step()

    print(f"\nMeilleure accuracy: {best_acc:.4f}")
    return model

Étape 4 : Utiliser le modèle

from PIL import Image

def predict(model, image_path: str, classes: list[str]):
    """Prédit la classe d'une image."""

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.train(False)

    # Préprocessing
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Charger et transformer l'image
    image = Image.open(image_path).convert('RGB')
    input_tensor = transform(image).unsqueeze(0).to(device)

    # Prédiction
    with torch.no_grad():
        outputs = model(input_tensor)
        probabilities = torch.nn.functional.softmax(outputs[0], dim=0)

    # Top 3 prédictions
    top3_prob, top3_idx = torch.topk(probabilities, 3)

    results = []
    for prob, idx in zip(top3_prob, top3_idx):
        results.append({
            "class": classes[idx],
            "probability": prob.item()
        })

    return results

# Exemple d'utilisation
train_loader, val_loader, classes = prepare_data("./data")
model = create_model(num_classes=len(classes))
model = train_model(model, train_loader, val_loader, num_epochs=10)

# Prédiction
results = predict(model, "test_image.jpg", classes)
for r in results:
    print(f"{r['class']}: {r['probability']:.2%}")

Tutoriel 3 : API de prédiction ML

Déployer un modèle ML avec FastAPI.

Prérequis

pip install fastapi uvicorn scikit-learn joblib pandas

Étape 1 : Entraîner et sauvegarder le modèle

from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import joblib
import pandas as pd

# Charger les données
df = pd.read_csv("data.csv")
X = df.drop("target", axis=1)
y = df["target"]

# Créer le pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Entraîner
pipeline.fit(X, y)

# Sauvegarder
joblib.dump(pipeline, "model.joblib")
joblib.dump(list(X.columns), "features.joblib")
print("Modèle sauvegardé!")

Étape 2 : Créer l'API

# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import joblib
import numpy as np
from typing import Optional

app = FastAPI(
    title="API de Prédiction ML",
    description="API pour prédire avec un modèle de classification",
    version="1.0.0"
)

# Charger le modèle au démarrage
model = joblib.load("model.joblib")
feature_names = joblib.load("features.joblib")

class PredictionInput(BaseModel):
    features: dict[str, float]

    @validator('features')
    def check_features(cls, v):
        missing = set(feature_names) - set(v.keys())
        if missing:
            raise ValueError(f"Features manquantes: {missing}")
        return v

class PredictionOutput(BaseModel):
    prediction: int
    probability: float
    probabilities: dict[str, float]

class BatchInput(BaseModel):
    items: list[PredictionInput]

class BatchOutput(BaseModel):
    predictions: list[PredictionOutput]

@app.get("/")
def root():
    return {"message": "API de Prédiction ML", "version": "1.0.0"}

@app.get("/health")
def health():
    return {"status": "healthy", "model_loaded": model is not None}

@app.get("/features")
def get_features():
    return {"features": feature_names}

@app.post("/predict", response_model=PredictionOutput)
def predict(input_data: PredictionInput):
    try:
        # Préparer les données
        features = [input_data.features[f] for f in feature_names]
        X = np.array(features).reshape(1, -1)

        # Prédiction
        prediction = model.predict(X)[0]
        probabilities = model.predict_proba(X)[0]

        return PredictionOutput(
            prediction=int(prediction),
            probability=float(max(probabilities)),
            probabilities={
                str(i): float(p)
                for i, p in enumerate(probabilities)
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch", response_model=BatchOutput)
def predict_batch(input_data: BatchInput):
    predictions = []
    for item in input_data.items:
        result = predict(item)
        predictions.append(result)
    return BatchOutput(predictions=predictions)

Étape 3 : Lancer l'API

uvicorn api:app --reload --host 0.0.0.0 --port 8000

Étape 4 : Tester l'API

import requests

# Test de prédiction
response = requests.post(
    "http://localhost:8000/predict",
    json={
        "features": {
            "feature1": 1.5,
            "feature2": 2.3,
            "feature3": 0.8
        }
    }
)
print(response.json())

# Test batch
response = requests.post(
    "http://localhost:8000/predict/batch",
    json={
        "items": [
            {"features": {"feature1": 1.0, "feature2": 2.0, "feature3": 0.5}},
            {"features": {"feature1": 1.5, "feature2": 2.5, "feature3": 0.7}}
        ]
    }
)
print(response.json())

Étape 5 : Dockeriser

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.joblib features.joblib api.py ./

EXPOSE 8000

CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    ports:
      - "8000:8000"
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Tutoriel 4 : Analyse de sentiments en temps réel

Analyser les sentiments de tweets ou avis en streaming.

Prérequis

pip install transformers torch streamlit pandas plotly

Étape 1 : Créer l'analyseur

# sentiment_analyzer.py
from transformers import pipeline
import pandas as pd
from datetime import datetime

class SentimentAnalyzer:
    def __init__(self):
        self.analyzer = pipeline(
            "sentiment-analysis",
            model="nlptown/bert-base-multilingual-uncased-sentiment"
        )
        self.history = []

    def analyze(self, text: str) -> dict:
        result = self.analyzer(text)[0]

        stars = int(result['label'][0])
        sentiment = "positive" if stars >= 4 else "negative" if stars <= 2 else "neutral"

        analysis = {
            "text": text,
            "sentiment": sentiment,
            "stars": stars,
            "confidence": result['score'],
            "timestamp": datetime.now().isoformat()
        }

        self.history.append(analysis)
        return analysis

    def analyze_batch(self, texts: list[str]) -> list[dict]:
        return [self.analyze(text) for text in texts]

    def get_stats(self) -> dict:
        if not self.history:
            return {}

        df = pd.DataFrame(self.history)

        return {
            "total": len(df),
            "positive": len(df[df['sentiment'] == 'positive']),
            "negative": len(df[df['sentiment'] == 'negative']),
            "neutral": len(df[df['sentiment'] == 'neutral']),
            "average_stars": df['stars'].mean(),
            "average_confidence": df['confidence'].mean()
        }

    def get_history(self) -> pd.DataFrame:
        return pd.DataFrame(self.history)

Étape 2 : Interface Streamlit

# app.py
import streamlit as st
import plotly.express as px
from sentiment_analyzer import SentimentAnalyzer

st.set_page_config(page_title="Analyse de Sentiments", layout="wide")

# Initialiser l'analyseur
if 'analyzer' not in st.session_state:
    st.session_state.analyzer = SentimentAnalyzer()

analyzer = st.session_state.analyzer

st.title("Analyse de Sentiments en Temps Réel")

# Input
col1, col2 = st.columns([2, 1])

with col1:
    text_input = st.text_area("Entrez un texte à analyser:", height=100)

    if st.button("Analyser", type="primary"):
        if text_input:
            with st.spinner("Analyse en cours..."):
                result = analyzer.analyze(text_input)

            # Afficher le résultat
            sentiment_color = {
                "positive": "green",
                "negative": "red",
                "neutral": "gray"
            }

            st.markdown(f"""
            ### Résultat
            - **Sentiment**: :{sentiment_color[result['sentiment']]}[{result['sentiment'].upper()}]
            - **Note**: {'⭐' * result['stars']}
            - **Confiance**: {result['confidence']:.2%}
            """)

with col2:
    st.subheader("Statistiques")
    stats = analyzer.get_stats()

    if stats:
        st.metric("Total analysés", stats['total'])
        st.metric("Score moyen", f"{stats['average_stars']:.1f} ⭐")

        # Graphique
        fig = px.pie(
            values=[stats['positive'], stats['neutral'], stats['negative']],
            names=['Positif', 'Neutre', 'Négatif'],
            color_discrete_sequence=['green', 'gray', 'red']
        )
        st.plotly_chart(fig, use_container_width=True)

# Historique
st.subheader("Historique des analyses")
history = analyzer.get_history()

if not history.empty:
    st.dataframe(
        history[['text', 'sentiment', 'stars', 'confidence', 'timestamp']],
        use_container_width=True
    )

    # Télécharger
    csv = history.to_csv(index=False)
    st.download_button(
        "Télécharger CSV",
        csv,
        "sentiment_analysis.csv",
        "text/csv"
    )

Étape 3 : Lancer l'application

streamlit run app.py

Bonnes pratiques générales

Structure de projet

mon_projet_ia/
├── data/
│   ├── raw/              # Données brutes
│   ├── processed/        # Données traitées
│   └── models/           # Modèles sauvegardés
├── notebooks/            # Exploration et prototypage
├── src/
│   ├── data/            # Chargement et preprocessing
│   ├── models/          # Définition des modèles
│   ├── training/        # Entraînement
│   └── inference/       # Inférence
├── tests/               # Tests unitaires
├── api/                 # API de serving
├── requirements.txt
├── Dockerfile
└── README.md

Gestion des dépendances

# requirements.txt
torch>=2.0.0
transformers>=4.30.0
anthropic>=0.18.0
chromadb>=0.4.0
fastapi>=0.100.0
uvicorn>=0.23.0
scikit-learn>=1.3.0
pandas>=2.0.0
numpy>=1.24.0

Configuration

# config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # API Keys
    anthropic_api_key: str
    openai_api_key: str | None = None

    # Model settings
    model_name: str = "claude-sonnet-4-20250514"
    max_tokens: int = 1024
    temperature: float = 0.7

    # Database
    chroma_path: str = "./chroma_db"

    # API
    api_host: str = "0.0.0.0"
    api_port: int = 8000

    class Config:
        env_file = ".env"

settings = Settings()

Logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('app.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Utilisation
logger.info("Démarrage de l'application")
logger.error("Erreur lors du traitement", exc_info=True)

Résumé

TutorielTechnologiesDifficulté
Chatbot RAGAnthropic, Chroma, Sentence TransformersIntermédiaire
Classification d'imagesPyTorch, ResNet, Transfer LearningIntermédiaire
API MLFastAPI, scikit-learn, DockerDébutant
Analyse de sentimentsTransformers, StreamlitDébutant

On this page