"""
Module d'authentification NTLM pour IziProxy
"""
import gzip
import io
import ssl
import socket
import base64
import logging
import requests
from urllib.parse import urlparse
from urllib3.connection import HTTPSConnection
from urllib3.connectionpool import HTTPSConnectionPool
from iziproxy.secure_config import SecurePassword
# Configuration du logger
logger = logging.getLogger("iziproxy")
# Importations conditionnelles pour les dépendances NTLM
try:
from ntlm_auth.ntlm import NtlmContext
NTLM_AVAILABLE = True
except ImportError:
logger.debug("Package ntlm_auth non disponible, le support NTLM ne sera pas activé")
NTLM_AVAILABLE = False
# Essayer d'importer PyCryptodome pour le support MD4/MD5
try:
from Cryptodome.Hash import MD4, MD5
CRYPTODOME_AVAILABLE = True
except ImportError:
logger.debug("Package pycryptodomex non disponible, le patch MD4 ne sera pas activé")
CRYPTODOME_AVAILABLE = False
[docs]
def is_ntlm_auth_available():
"""
Vérifie si les dépendances pour l'authentification NTLM sont disponibles
Returns:
bool: True si les dépendances sont disponibles
"""
return NTLM_AVAILABLE
[docs]
class NtlmProxyManager:
"""
Gestionnaire de proxy NTLM pour l'authentification sur les proxys d'entreprise
Cette classe simplifie l'utilisation de l'authentification NTLM avec requests
"""
[docs]
def __init__(self):
"""
Initialise le gestionnaire NTLM
Raises:
ImportError: Si le package ntlm_auth n'est pas disponible
"""
# Vérifier les dépendances
if not NTLM_AVAILABLE:
raise ImportError("Le package ntlm_auth est requis pour utiliser l'authentification NTLM")
# Appliquer le patch MD4 si PyCryptodome est disponible
if CRYPTODOME_AVAILABLE:
self._patch_ntlm_auth_md4()
def _patch_ntlm_auth_md4(self):
"""
Applique un patch pour ntlm_auth afin d'utiliser PyCryptodome pour MD4/MD5
Cette méthode remplace l'implémentation hashlib de ntlm_auth par PyCryptodome,
qui fournit une implémentation native de MD4 (nécessaire pour NTLM)
"""
try:
import ntlm_auth.compute_hash
# Créer une implémentation de hashlib compatible avec ntlm_auth
class FakeHashlib:
@staticmethod
def new(name, data=b''):
if name.lower() == 'md4':
h = MD4.new()
h.update(data)
return h
elif name.lower() == 'md5':
h = MD5.new()
h.update(data)
return h
else:
raise ValueError(f"Unsupported hash type: {name}")
@staticmethod
def md5(data=b''):
h = MD5.new()
h.update(data)
return h
# Remplacer l'implémentation hashlib dans ntlm_auth
ntlm_auth.compute_hash.hashlib = FakeHashlib
logger.debug("Patch MD4/MD5 appliqué avec succès pour ntlm_auth")
except Exception as e:
logger.warning(f"Erreur lors de l'application du patch MD4: {e}")
[docs]
def create_ntlm_proxy_session(self, proxy_host, proxy_port, username, password, domain='', workstation='WORKSTATION', debug=False):
"""
Crée une session requests configurée pour utiliser l'authentification NTLM
Args:
proxy_host (str): Hôte du proxy
proxy_port (int): Port du proxy
username (str): Nom d'utilisateur
password (str): Mot de passe
domain (str, optional): Domaine (optionnel)
workstation (str, optional): Nom du poste de travail (optionnel)
debug (bool, optional): Activer le mode débogage
Returns:
requests.Session: Session configurée avec l'adaptateur NTLM
"""
session = requests.Session()
adapter = NtlmProxyAdapter(
proxy_host=proxy_host,
proxy_port=proxy_port,
username=username,
password=password,
domain=domain,
workstation=workstation,
debug=debug
)
session.mount('https://', adapter)
session.mount('http://', adapter)
session.proxies = {}
session.trust_env = False
logger.debug(f"Session NTLM créée pour {username}@{proxy_host}:{proxy_port}")
return session
[docs]
class NtlmProxyTunnel:
"""
Établit un tunnel HTTPS à travers un proxy avec authentification NTLM
Cette classe gère la création et l'authentification d'un tunnel SSL
à travers un proxy NTLM.
"""
[docs]
def __init__(self, proxy_host, proxy_port, username, password, domain='', workstation='WORKSTATION', debug=False):
"""
Initialise le tunnel proxy NTLM
Args:
proxy_host (str): Hôte du proxy
proxy_port (int): Port du proxy
username (str): Nom d'utilisateur
password (str ou SecurePassword): Mot de passe
domain (str, optional): Domaine (optionnel)
workstation (str, optional): Nom du poste de travail (optionnel)
debug (bool, optional): Activer le mode débogage
"""
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.username = username
# Assurer que le mot de passe est sécurisé
if isinstance(password, SecurePassword):
self.password = password
else:
self.password = SecurePassword(password)
self.domain = domain
self.workstation = workstation
self.debug = debug
[docs]
def open_tunnel(self, target_host, target_port=443):
"""
Ouvre un tunnel vers l'hôte cible via le proxy NTLM
Args:
target_host (str): Hôte de destination
target_port (int, optional): Port de destination (443 par défaut)
Returns:
socket: Socket SSL connecté à la destination
Raises:
Exception: Si l'établissement du tunnel échoue
"""
if self.debug:
logger.debug(f"Connexion au proxy {self.proxy_host}:{self.proxy_port}")
# Connexion initiale au proxy
sock = socket.create_connection((self.proxy_host, self.proxy_port))
# Création du contexte NTLM
context = NtlmContext(
username=self.username,
password=self.password.get_password(),
domain=self.domain,
workstation=self.workstation,
)
# Étape 1: Envoi du message NTLM Negotiate
negotiate_token = base64.b64encode(context.step()).decode('ascii')
self._send_connect(sock, target_host, target_port, negotiate_token)
# Réception de la réponse du proxy
response1 = self._recv_response(sock)
if b"407" not in response1:
# Si le proxy ne demande pas d'authentification, vérifiez si la connexion est établie
if b"200 connection established" in response1.lower():
if self.debug:
logger.debug("Tunnel établi avec succès sans authentification")
return self._wrap_socket(sock, target_host)
else:
raise Exception("Échec de l'établissement du tunnel proxy sans authentification.")
# Extraction du challenge NTLM
challenge_token = self._parse_ntlm_challenge(response1)
if self.debug:
logger.debug("Challenge NTLM reçu")
# Étape 2: Envoi du message NTLM Authenticate
authenticate_token = base64.b64encode(context.step(base64.b64decode(challenge_token))).decode('ascii')
self._send_connect(sock, target_host, target_port, authenticate_token)
# Réception de la réponse du proxy
response2 = self._recv_response(sock)
if self.debug:
logger.debug("Réponse du proxy après authentification")
logger.debug(response2.decode(errors='ignore').strip())
if b"200 connection established" not in response2.lower():
raise Exception("Échec de l'établissement du tunnel proxy après authentification.")
if self.debug:
logger.debug("Tunnel établi avec succès après authentification")
# Encapsulation SSL de la connexion
return self._wrap_socket(sock, target_host)
def _wrap_socket(self, sock, target_host):
"""
Encapsule la connexion avec SSL et retourne le socket SSL
Args:
sock (socket): Socket connecté au proxy
target_host (str): Nom d'hôte cible pour la vérification SSL
Returns:
socket: Socket SSL connecté
"""
ssl_context = ssl.create_default_context()
return ssl_context.wrap_socket(sock, server_hostname=target_host)
def _send_connect(self, sock, target_host, target_port, token):
"""
Envoie une requête CONNECT avec l'authentification NTLM
Args:
sock (socket): Socket connecté au proxy
target_host (str): Hôte de destination
target_port (int): Port de destination
token (str): Token d'authentification NTLM
"""
connect_request = (
f"CONNECT {target_host}:{target_port} HTTP/1.1\r\n"
f"Host: {target_host}:{target_port}\r\n"
f"Proxy-Authorization: NTLM {token}\r\n"
f"Proxy-Connection: Keep-Alive\r\n"
f"Connection: Keep-Alive\r\n"
f"\r\n"
)
sock.sendall(connect_request.encode('utf-8'))
def _recv_response(self, sock):
"""
Reçoit une réponse HTTP du proxy
Args:
sock (socket): Socket connecté au proxy
Returns:
bytes: Réponse du proxy
"""
buffer = b""
while b"\r\n\r\n" not in buffer:
data = sock.recv(4096)
if not data:
break
buffer += data
return buffer
def _parse_ntlm_challenge(self, response):
"""
Extrait le challenge NTLM de la réponse du proxy
Args:
response (bytes): Réponse du proxy
Returns:
str: Token de challenge NTLM
Raises:
Exception: Si le challenge NTLM n'est pas trouvé
"""
headers = response.decode(errors='ignore').split('\r\n')
for header in headers:
if header.lower().startswith('proxy-authenticate:'):
header_value = header[len('proxy-authenticate:'):].strip()
if header_value.startswith('NTLM'):
return header_value[len('NTLM'):].strip()
raise Exception("Challenge NTLM non trouvé dans la réponse du proxy")
# Adaptations des classes de connexion pour l'intégration NTLM
[docs]
class PatchedHTTPSConnection(HTTPSConnection):
"""
Connexion HTTPS patchée pour utiliser un socket SSL préconnecté
Cette classe modifie HTTPSConnection pour utiliser un socket SSL
déjà établi par le tunnel NTLM.
"""
[docs]
def __init__(self, host, ssl_sock, port=443, timeout=60):
"""
Initialise la connexion patchée
Args:
host (str): Hôte de destination
ssl_sock (socket): Socket SSL préconnecté
port (int, optional): Port de destination
timeout (int, optional): Timeout en secondes
"""
super().__init__(host, port=port, timeout=timeout)
self.sock = ssl_sock
self._custom_connected = True
[docs]
def connect(self):
"""
Méthode connect surchargée pour utiliser le socket préconnecté
"""
if getattr(self, '_custom_connected', False):
return
super().connect()
[docs]
class CustomHTTPSConnectionPool(HTTPSConnectionPool):
"""
Pool de connexions HTTPS personnalisé pour gérer le tunnel NTLM
Cette classe étend HTTPSConnectionPool pour créer des connexions
à travers un tunnel NTLM.
"""
[docs]
def __init__(self, tunnel, host, port):
"""
Initialise le pool de connexions
Args:
tunnel (NtlmProxyTunnel): Instance de NtlmProxyTunnel
host (str): Hôte de destination
port (int): Port de destination
"""
self.tunnel = tunnel
super().__init__(host=host, port=port)
def _new_conn(self):
"""
Crée une nouvelle connexion à travers le tunnel NTLM
Returns:
PatchedHTTPSConnection: Connexion HTTPS patchée
"""
ssl_sock = self.tunnel.open_tunnel(self.host, self.port)
return PatchedHTTPSConnection(self.host, ssl_sock, port=self.port)
[docs]
class NtlmProxyAdapter(requests.adapters.BaseAdapter):
"""
Adaptateur requests pour l'authentification NTLM avec les proxys
Cette classe implémente un adaptateur requests personnalisé qui gère
l'authentification NTLM pour les proxys d'entreprise.
"""
[docs]
def __init__(self, proxy_host, proxy_port, username, password, domain='', workstation='WORKSTATION', debug=False):
"""
Initialise l'adaptateur NTLM
Args:
proxy_host (str): Hôte du proxy
proxy_port (int): Port du proxy
username (str): Nom d'utilisateur
password (str): Mot de passe
domain (str, optional): Domaine (optionnel)
workstation (str, optional): Nom du poste de travail (optionnel)
debug (bool, optional): Activer le mode débogage
"""
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.username = username
self.password = SecurePassword(password)
self.domain = domain
self.workstation = workstation
self.debug = debug
[docs]
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
"""
Envoie une requête via le tunnel NTLM
Cette méthode établit un tunnel NTLM puis envoie la requête HTTP
à travers ce tunnel. Elle gère également la lecture et le traitement
de la réponse.
Args:
request (PreparedRequest): Requête requests à envoyer
stream (bool, optional): Activer le streaming
timeout (int, optional): Timeout en secondes
verify (bool, optional): Vérifier le certificat SSL
cert (str, optional): Certificat client
proxies (dict, optional): Configurations de proxy (ignorées)
Returns:
requests.Response: Réponse à la requête
Raises:
Exception: Si une erreur survient lors de l'envoi de la requête
"""
parsed_url = urlparse(request.url)
hostname = parsed_url.hostname
# Créer le tunnel NTLM
tunnel = NtlmProxyTunnel(
proxy_host=self.proxy_host,
proxy_port=self.proxy_port,
username=self.username,
password=self.password,
domain=self.domain,
workstation=self.workstation,
debug=self.debug
)
# Créer le pool de connexions et obtenir une connexion
pool = CustomHTTPSConnectionPool(tunnel=tunnel, host=hostname, port=443)
conn = pool._new_conn()
# Construire la requête HTTP
path = parsed_url.path or '/'
if parsed_url.query:
path += '?' + parsed_url.query
request_line = f"{request.method} {path} HTTP/1.1\r\n"
headers = ''.join(f"{k}: {v}\r\n" for k, v in request.headers.items())
full_request = (request_line +
f"Host: {hostname}\r\n" +
headers +
"\r\n").encode()
if request.body:
full_request += request.body if isinstance(request.body, bytes) else request.body.encode()
# Envoyer la requête
conn.sock.sendall(full_request)
# Étape 1: Lire les headers
response_buffer = b""
while b"\r\n\r\n" not in response_buffer:
chunk = conn.sock.recv(4096)
if not chunk:
raise Exception("Connexion fermée avant réception des headers")
response_buffer += chunk
header_data, _, remaining = response_buffer.partition(b"\r\n\r\n")
# Parser les headers
header_lines = header_data.decode(errors='ignore').split("\r\n")
status_line = header_lines[0]
headers = {}
for line in header_lines[1:]:
if ": " in line:
key, value = line.split(": ", 1)
headers[key.strip()] = value.strip()
# Vérifier Content-Length si disponible
content_length = None
if "Content-Length" in headers:
try:
content_length = int(headers["Content-Length"])
except ValueError:
content_length = None
# Étape 2: Lire le body
body_data = remaining
if content_length is not None:
# Lire exactement le nombre d'octets spécifié
while len(body_data) < content_length:
chunk = conn.sock.recv(content_length - len(body_data))
if not chunk:
break
body_data += chunk
else:
# Pas de Content-Length: lire jusqu'à fermeture
# Vérifier s'il s'agit d'un transfert chunked
is_chunked = headers.get("Transfer-Encoding", "").lower() == "chunked"
if is_chunked:
# Gestion du transfert chunked
decoded_body = b""
chunk_data = body_data
while True:
# Si pas de données chunk, lire plus
if not chunk_data:
chunk_data = conn.sock.recv(4096)
if not chunk_data:
break
# Trouver la taille du chunk
chunk_size_end = chunk_data.find(b"\r\n")
if chunk_size_end == -1:
# Taille incomplète, lire plus
more_data = conn.sock.recv(4096)
if not more_data:
break
chunk_data += more_data
continue
# Extraire la taille du chunk
try:
chunk_size_hex = chunk_data[:chunk_size_end].decode('ascii').strip()
chunk_size = int(chunk_size_hex, 16)
except (ValueError, UnicodeDecodeError):
# Format invalide
break
# Si taille zéro, fin du body
if chunk_size == 0:
break
# Calculer où le chunk se termine
chunk_end = chunk_size_end + 2 + chunk_size + 2
# Si le chunk n'est pas complet, lire plus
while len(chunk_data) < chunk_end:
more_data = conn.sock.recv(4096)
if not more_data:
break
chunk_data += more_data
# Extraire le chunk et ajouter au body
chunk_content = chunk_data[chunk_size_end + 2:chunk_size_end + 2 + chunk_size]
decoded_body += chunk_content
# Passer au chunk suivant
chunk_data = chunk_data[chunk_end:]
body_data = decoded_body
else:
# Transfert standard, lire jusqu'à fermeture
while True:
chunk = conn.sock.recv(4096)
if not chunk:
break
body_data += chunk
# Étape 3: Créer la réponse Requests
http_version, status_code, *reason_parts = status_line.split(' ', 2)
reason = ' '.join(reason_parts) if reason_parts else ""
response = requests.Response()
response.status_code = int(status_code)
response.reason = reason
response.headers = headers
response._content = body_data
response.url = request.url
response.request = request
response.encoding = requests.utils.get_encoding_from_headers(response.headers)
# Vérifiez si le contenu est compressé avec gzip
if 'Content-Encoding' in headers and headers['Content-Encoding'] == 'gzip':
# Utilisez un BytesIO pour décompresser le contenu gzip
with gzip.GzipFile(fileobj=io.BytesIO(body_data)) as gzip_file:
decompressed_data = gzip_file.read()
response._content = decompressed_data
else:
response._content = body_data
return response
[docs]
def close(self):
"""Ferme l'adaptateur et ses ressources associées"""
pass
[docs]
class NtlmProxyDict:
"""
Classe pour encapsuler une session NTLM comme un dictionnaire de proxy
Cette classe permet d'utiliser une session NTLM comme un dictionnaire
de proxy standard, compatible avec les API requests
"""
[docs]
def __init__(self, ntlm_session):
"""
Initialise un dictionnaire de proxy NTLM
Args:
ntlm_session (requests.Session): Session NTLM à encapsuler
"""
self.ntlm_session = ntlm_session
[docs]
def __getitem__(self, key):
"""
Retourne la session NTLM pour http ou https
Args:
key (str): Protocole ('http' ou 'https')
Returns:
requests.Session: Session NTLM
Raises:
KeyError: Si le protocole n'est pas supporté
"""
# Retourner la session NTLM pour http ou https
if key in ('http', 'https'):
return self.ntlm_session
# Lever une erreur pour les clés non supportées
raise KeyError(f"Proxy key '{key}' not found.")
[docs]
def __setitem__(self, key, value):
"""
Ne pas autoriser la modification des proxies
Raises:
NotImplementedError: Toujours levée car non supporté
"""
# Ne pas autoriser la modification des proxies
raise NotImplementedError("Modification of proxy settings is not allowed.")
[docs]
def get(self, key, default=None):
"""
Gérer l'appel pour un proxy
Args:
key (str): Protocole ('http' ou 'https')
default: Valeur par défaut si non trouvé
Returns:
requests.Session: Session NTLM ou default
"""
# Gérer l'appel pour un proxy
return self.__getitem__(key) if key in ('http', 'https') else default
[docs]
def keys(self):
"""
Retourne les clés supportées
Returns:
list: Liste des protocoles supportés
"""
# Retourne les clés supportées
return ['http', 'https']
[docs]
def __contains__(self, key):
"""
Permet de vérifier si une clé est dans le dictionnaire
Args:
key (str): Protocole à vérifier
Returns:
bool: True si le protocole est supporté
"""
# Permet de vérifier si une clé est dans le dictionnaire
return key in ('http', 'https')