Comment utiliser ThreadPoolExecutor en Python 3

L'auteur a choisi le COVID-19 Relief Fund pour recevoir un don dans le cadre du programme Write for DOnations.

Introduction

Les threads Python sont une forme de parallélisme qui permet à votre programme d'exécuter plusieurs procédures à la fois. Le parallélisme en Python peut également être réalisé en utilisant des processus multiples, mais les threads sont particulièrement bien adaptés pour accélérer les applications qui impliquent des quantités importantes d'entrées/sorties (input/output).

Les opérations liées aux entrées/sorties comprennent, par exemple, les requêtes web et la lecture des données des fichiers. Contrairement aux opérations liées aux entrées/sorties, les opérations liées au CPU (comme l'exécution de calculs mathématiques avec la bibliothèque standard Python) ne bénéficieront pas beaucoup des threads Python.

Python 3 inclut l'utilitaire ThreadPoolExecutor pour exécuter du code dans un thread.

Au cours de ce tutoriel, nous utiliserons ThreadPoolExecutor pour effectuer rapidement des requêtes réseau. Nous allons définir une fonction bien adaptée à l'invocation dans les threads, utiliser ThreadPoolExecutor pour exécuter cette fonction, et traiter les résultats de ces exécutions.

Pour ce tutoriel, nous allons faire des requêtes réseau pour vérifier l'existence de pages Wikipédia.

Note : le fait que les opérations liées aux entrées/sorties bénéficient davantage des threads que les opérations liées au CPU est causé par une idiosyncrasie en Python appelée verrouillage global de l'interpréteur. Si vous le souhaitez, vous pouvez en apprendre davantage sur le verrouillage global de l'interpréteur de Python dans la documentation officielle de Python.

Conditions préalables

Pour tirer le meilleur parti de ce tutoriel, il est recommandé de se familiariser avec la programmation en Python et d'avoir un environnement de programmation Python local avec des requêtes installé.

Vous pouvez consulter ces tutoriels pour obtenir les informations de base nécessaires :

  • Comment coder en Python 3
  • Comment installer Python 3 et configurer un environnement de programmation local sur Ubuntu 18.04

  • Pour installer le paquet requests dans votre environnement de programmation Python local, vous pouvez exécuter cette commande :

  • pip install --user requests==2.23.0

Étape 1 — Définition d'une fonction à exécuter en threads

Commençons par définir une fonction que nous aimerions exécuter à l'aide de threads.

En utilisant nano ou votre éditeur de texte/environnement de développement préféré, vous pouvez ouvrir ce fichier :

  • nano wiki_page_function.py

Pour ce tutoriel, nous allons écrire une fonction qui détermine si une page Wikipédia existe ou non :

wiki_page_function.py

import requests  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status 

La fonction get_wiki_page_existence accepte deux arguments : une URL vers une page Wikipédia (wiki_page_url), et un timeout de quelques secondes pour obtenir une réponse de cette URL.

get_wiki_page_existence utilise le paquet requests pour faire une requête web à cette URL. En fonction du code d'état de la response HTTP, une chaîne de caractères qui décrit si la page existe ou non est renvoyée. Les différents codes d'état représentent les différents résultats d'une requête HTTP. Cette procédure suppose qu'un code d'état 200 “réussi” signifie que la page Wikipédia existe, et qu'un code d'état 404 “non trouvé” signifie que la page Wikipédia n'existe pas.

Comme décrit dans la section Prérequis, vous aurez besoin du paquet requests installé pour exécuter cette fonction.

Essayons d'exécuter la fonction en ajoutant l’url et l'appel de fonction après la fonction get_wiki_page_existence :

wiki_page_function.py

. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url)) 

Une fois que vous avez ajouté le code, enregistrez et fermez le fichier.

Si nous exécutons ce code :

  • python wiki_page_function.py

Nous verrons une sortie comme celle-ci :

Outputhttps://en.wikipedia.org/wiki/Ocean - exists 

L'appel de la fonction get_wiki_page_existence avec une page Wikipédia valide renvoie une chaîne de caractères qui confirme que la page existe bel et bien.

Avertissement : en général, il n'est pas sûr de partager des objets ou des états Python entre les threads sans prendre un soin particulier pour éviter les bogues de concurrence. Lors de la définition d'une fonction à exécuter dans un thread, il est préférable de définir une fonction qui effectue un seul travail et qui ne partage ni ne publie l'état à d'autres threads. get_wiki_page_existence est un exemple d'une telle fonction.

Étape 2 — Utilisation de ThreadPoolExecutor pour exécuter une fonction dans Threads

Maintenant que nous disposons d'une fonction bien adaptée à l'invocation avec des threads, nous pouvons utiliser ThreadPoolExecutor pour effectuer de multiples invocations de cette fonction de manière opportune.

Ajoutons le code surligné suivant à votre programme dans wiki_page_function.py :

wiki_page_function.py

import requests import concurrent.futures  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) 

Voyons comment ce code fonctionne :

  • concurrent.futures est importé pour nous donner accès à ThreadPoolExecutor.
  • Un énoncé with est utilisé pour créer un executord'instance ThreadPoolExecutor qui nettoiera rapidement les threads dès leur achèvement.
  • Quatre emplois sont submitted à l’executor : un pour chacune des URL de la liste wiki_page_urls.
  • Chaque appel à submit renvoie une instance Future qui est stockée dans la liste futures.
  • La fonction as_completed attend que chaque appel Future get_wiki_page_existence soit terminé pour que nous puissions imprimer son résultat.

Si nous exécutons à nouveau ce programme, avec la commande suivante :

  • python wiki_page_function.py

Nous verrons une sortie comme celle-ci :

Outputhttps://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists 

Cette sortie est logique : 3 des URLs sont des pages Wikipédia valides, et l'une d'entre elles, this_page_does_not_exist, ne l'est pas. Notez que votre sortie peut être ordonnée différemment de cette sortie. Dans cet exemple, la fonction concurrent.futures.as_completed renvoie les résultats dès qu'ils sont disponibles, quel que soit l'ordre dans lequel les emplois ont été soumis.

Étape 3 — Traitement des exceptions aux fonctions exécutées dans Threads

Au cours de l'étape précédente, get_wiki_page_existence a réussi à retourner une valeur pour toutes nos invocations. Dans cette étape, nous verrons que ThreadPoolExecutor peut également lever les exceptions générées dans les invocations de fonctions threadées.

Considérons l'exemple de bloc de code suivant :

wiki_page_function.py

import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status   wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(             executor.submit(                 get_wiki_page_existence, wiki_page_url=url, timeout=0.00001             )         )     for future in concurrent.futures.as_completed(futures):         try:             print(future.result())         except requests.ConnectTimeout:             print("ConnectTimeout.") 

Ce bloc de code est presque identique à celui que nous avons utilisé à l'étape 2, mais il présente deux différences essentielles :

  • Nous passons maintenant timeout=0.00001 à get_wiki_page_existence. Comme le paquet requests ne pourra pas terminer sa requête web à Wikipedia en 0,00001 seconde, cela entraînera une exception ConnectTimeout.
  • Nous attrapons les exceptions ConnectTimeout soulevées par future.result() et imprimons une chaîne de caractères à chaque fois que nous le faisons.

Si nous relançons le programme, nous obtiendrons la sortie suivante :

OutputConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout. 

Quatre messages ConnectTimeout sont imprimés – un pour chacun de nos quatre wiki_page_urls, car aucun d'entre eux n'a pu être terminé en 0.00001 seconde et chacun des quatre appels get_wiki_page_existence a soulevé l'exception ConnectTimeout.

Vous avez maintenant vu que si un appel de fonction soumis à un ThreadPoolExecutor soulève une exception, cette exception peut être soulevée normalement en appelant Future.result. Appeler Future.result sur toutes vos invocations soumises garantit que votre programme ne manquera aucune exception soulevée par votre fonction threadée.

Étape 4 — Comparaison du temps d'exécution avec et sans threads

Maintenant, vérifions que l'utilisation de ThreadPoolExecutor rend réellement votre programme plus rapide.

Tout d'abord, chronométrons get_wiki_page_existence si nous le faisons fonctionner sans threads :

wiki_page_function.py

import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls:     print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start) 

Dans l'exemple de code, nous appelons notre fonction get_wiki_page_existence avec cinquante URL de pages Wikipédia différentes, une par une. Nous utilisons la fonction time.time() pour imprimer le nombre de secondes qu'il faut pour exécuter notre programme.

Si nous exécutons à nouveau ce code comme auparavant, nous obtiendrons la sortie suivante :

OutputRunning without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182 

Les entrées 2–47 dans cette sortie ont été omises par concision.

Le nombre de secondes affiché après Without threads time (Temps écoulé sans threads) sera différent lorsque vous l'exécuterez sur votre machine – ce n'est pas grave, vous obtenez juste un nombre de base à comparer avec une solution qui utilise ThreadPoolExecutor. Dans ce cas, il était de ~5.803 secondes.

Exécutons les mêmes cinquante URLs de Wikipédia dans get_wiki_page_existence, mais cette fois en utilisant ThreadPoolExecutor :

wiki_page_function.py

import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) print("Threaded time:", time.time() - threaded_start) 

Le code est le même que celui que nous avons créé à l'étape 2, mais avec l'ajout de quelques énoncés imprimés qui nous indiquent le nombre de secondes qu'il faut pour exécuter notre code.

Si nous relançons le programme, nous obtiendrons la sortie suivante :

OutputRunning threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543 

Là encore, le nombre de secondes imprimées après Threaded time (temps threadé écoulé) sera différent sur votre ordinateur (ainsi que l'ordre de votre sortie).

Vous pouvez maintenant comparer le temps d'exécution pour récupérer les cinquante URLs des pages Wikipédia avec et sans threads.

Sur la machine utilisée dans ce tutoriel, l'exécution sans threads a pris ~5.803 secondes, et celle avec threads a pris ~1.220 secondes. Notre programme a fonctionné beaucoup plus rapidement avec les threads.

Conclusion

Dans ce tutoriel, vous avez appris à utiliser l'utilitaire ThreadPoolExecutor en Python 3 pour exécuter efficacement du code lié aux entrées/sorties. Vous avez créé une fonction bien adaptée à l'invocation dans les threads, appris comment récupérer à la fois la sortie et les exceptions des exécutions threadées de cette fonction, et observé l'augmentation des performances obtenue en utilisant les threads.

De là, vous pouvez en savoir plus sur les autres fonctions de concurrence offertes par le module concurrent.futures.