Cómo usar ThreadPoolExecutor en Python 3

El autor seleccionó el COVID-19 Relief Fund para que reciba una donación como parte del programa Write for DOnations.

Introducción

Los subprocesos de Python son una especie de paralelismo que le permiten a su programa ejecutar varios procedimientos a la vez. Este paralelismo en Python también se puede lograr utilizando varios procesos, pero los subprocesos son particularmente adecuados para acelerar las aplicaciones que implican una cantidad considerable de E/S (entrada/salida).

Algunos ejemplos de operaciones limitadas por las E/S son la realización de solicitudes web y la lectura de datos de archivos. A diferencia de las operaciones limitadas por las E/S, las operaciones limitadas por la CPU (como realizar cálculos matemáticos con la biblioteca estándar de Python) no se benefician mucho de los subprocesos de Python.

Python 3 incluye la utilidad ThreadPoolExecutor para ejecutar código en subprocesos.

En este tutorial, utilizaremos ThreadPoolExecutor para realizar solicitudes de red de forma rápida. Definiremos una función que se pueda invocar desde subprocesos, utilizaremos ThreadPoolExecutor para ejecutar esa función y procesaremos los resultados de esas ejecuciones.

Para los fines de este tutorial, realizaremos solicitudes de red para verificar la existencia de páginas de Wikipedia.

Nota: El hecho de que las operaciones limitadas por las E/S se beneficien más de los subprocesos que las limitadas por la CPU se debe a una idiosincrasia de Python denominada bloqueo global de intérpretes. Si desea obtener más información sobre el bloqueo global de intérpretes de Python, puede consultar la documentación oficial de Python.

Requisitos previos

Para aprovechar este tutorial al máximo, es recomendable tener algunos conocimientos de programación en Python y un entorno de programación local de Python con requests instalado.

Puede revisar estos tutoriales para encontrar la información básica necesaria:

  • Cómo programar en Python 3
  • Cómo instalar Python 3 y configurar un entorno de programación local en Ubuntu 18.04

  • Para instalar el paquete requests en su entorno de programación local de Python, puede ejecutar este comando:

  • pip install --user requests==2.23.0

Paso 1: Definir una función que se ejecute en subprocesos

Vamos a comenzar por definir una función que nos gustaría ejecutar con la ayuda de subprocesos.

Puede abrir este archivo usando nano o el editor de texto o entorno de desarrollo que prefiera:

  • nano wiki_page_function.py

Para los fines de este tutorial, vamos a escribir una función que determine si una página de Wikipedia existe o no:

wiki_page_function.py

import requests  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status 

La función get_wiki_page_existence acepta dos argumentos: una URL a una página de Wikipedia (wiki_page_url) y un timeout que indica la cantidad de segundos que se debe esperar una respuesta de esa URL.

get_wiki_page_existence utiliza el paquete requests para realizar una solicitud web a esa URL. Se devuelve una cadena que indica si la página existe o no dependiendo del código de estado de response, la respuesta HTTP. Los diferentes códigos de estado representan los distintos resultados que puede tener una solicitud HTTP. En este procedimiento, se asume que un código de estado 200, “correcto”, indica que la página de Wikipedia existe y un código de estado 404, “no encontrada”, que la página de Wikipedia no existe.

Como se describe en la sección Requisitos previos, deberá tener el paquete requests instalado para poder ejecutar esta función.

Intentemos ejecutar la función añadiendo la url y la invocación a la función después de la función get_wiki_page_existence:

wiki_page_function.py

. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url)) 

Una vez que haya añadido el código, guarde y cierre el archivo.

Si ejecutamos este código:

  • python wiki_page_function.py

Veremos un resultado como el siguiente:

Outputhttps://en.wikipedia.org/wiki/Ocean - exists 

La invocación de la función get_wiki_page_existence con una página de Wikipedia válida devuelve una cadena que confirma que la página efectivamente existe.

Advertencia: En general, no es seguro compartir objetos o estados de Python entre subprocesos sin tener especial cuidado para evitar errores de simultaneidad. A la hora de definir una función que se ejecute en un subproceso, lo mejor es definir una que realice una sola tarea y no publique ni comparta su estado con otros subprocesos. La función get_wiki_page_existence es un ejemplo de una función con estas características.

Paso 2: Usar ThreadPoolExecutor para ejecutar una función en subprocesos

Ahora que tenemos una función que se puede invocar con subprocesos, podemos usar ThreadPoolExecutor para invocar esa función varias veces de forma rápida.

Agregue el siguiente código resaltado a su programa en wiki_page_function.py:

wiki_page_function.py

import requests import concurrent.futures  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) 

Veamos cómo funciona este código:

  • concurrent.futures se importa para darnos acceso a ThreadPoolExecutor.
  • Se utiliza una instrucción with para crear un executor de una instancia de ThreadPoolExecutor que limpia de forma rápida los subprocesos al completarse.
  • Se envían (submit) cuatro tareas al executor para cada una de las URL de la lista wiki_page_urls.
  • Cada invocación a submit devuelve una instancia Future que se almacena en la lista de futures.
  • La función as_completed espera que se complete cada invocación a get_wiki_page_existence de Future para que podamos imprimir su resultado.

Si volvemos a ejecutar este programa con el siguiente comando:

  • python wiki_page_function.py

Veremos un resultado como el siguiente:

Outputhttps://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists 

Este resultado es lógico: tres de las URL son páginas de Wikipedia válidas y una de ellas, this_page_does_not_exist, no lo es. Tenga en cuenta que su resultado puede tener un orden distinto a este. La función concurrent.futures.as_completed de este ejemplo devuelve resultados tan pronto estén disponibles, independientemente del orden en que se presentaron las tareas.

Paso 3: Procesar excepciones de funciones ejecutadas en subprocesos

En el paso anterior, get_wiki_page_existence devolvió correctamente un valor para todas nuestras invocaciones. En este paso, veremos que ThreadPoolExecutor también puede crear excepciones generadas en invocaciones de funciones con subprocesos.

Consideremos el siguiente bloque de código de ejemplo:

wiki_page_function.py

import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status   wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(             executor.submit(                 get_wiki_page_existence, wiki_page_url=url, timeout=0.00001             )         )     for future in concurrent.futures.as_completed(futures):         try:             print(future.result())         except requests.ConnectTimeout:             print("ConnectTimeout.") 

Este bloque de código es casi idéntico al que utilizamos en el paso 2, pero tiene dos diferencias clave:

  • Ahora, pasamos timeout=0.00001 a get_wiki_page_existence. Como el paquete requests no puede completar su solicitud web a Wikipedia en 0.00001 segundos, creará una excepción ConnectTimeout.
  • Tomamos las excepciones ConnectTimeout que generó future.result() e imprimimos una cadena cada vez que lo hacemos.

Si volvemos a ejecutar el programa, veremos el siguiente resultado:

OutputConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout. 

Se imprimen cuatro mensajes de ConnectTimeout (uno para cada una de nuestras cuatro wiki_page_urls), dado que no se pudo completar ninguna de ellas en 0.00001 segundos y cada una de las cuatro invocaciones a get_wiki_page_existence generó la excepción ConnectTimeout.

Aprendió que si la invocación a una función enviada a ThreadPoolExecutor crea una excepción, esa excepción se puede generar normalmente al invocar Future.result. Invocar Future.result en todas sus invocaciones enviadas garantiza que su programa no omita ninguna excepción que se haya generado a partir de su función de subprocesos.

Paso 4: Comparar el tiempo de ejecución con y sin subprocesos

Ahora, vamos a verificar que usar ThreadPoolExecutor efectivamente hace que su programa sea más rápido.

Primero, vamos a medir el tiempo de ejecución de get_wiki_page_existence sin subprocesos:

wiki_page_function.py

import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls:     print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start) 

En el código de ejemplo, invocamos nuestra función get_wiki_page_existence con cincuenta URL de páginas de Wikipedia distintas, una por una. Usamos la función time.time() para imprimir la cantidad de segundos que toma la ejecución de nuestro programa.

Si volvemos a ejecutar este código como antes, veremos un resultado similar al siguiente:

OutputRunning without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182 

Se omitieron las entradas 2 a 47 de este resultado para mayor brevedad.

La cantidad de segundos que se imprima después de Without threads time será distinta cuando lo ejecute en su máquina, lo que es normal, dado que solo está recibiendo un número de referencia para realizar una comparación con una solución que utiliza ThreadPoolExecutor. En este caso, tomó ~5.803 segundos.

Ejecutemos las mismas cincuenta URL de Wikipedia a través de get_wiki_page_existence, pero, esta vez, utilizando ThreadPoolExecutor:

wiki_page_function.py

import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) print("Threaded time:", time.time() - threaded_start) 

El código es el mismo que el que creamos en el Paso 2, solo agregamos algunas instrucciones de impresión que nos muestran la cantidad de segundos que toma la ejecución de nuestro código.

Si volvemos a ejecutar el programa, veremos lo siguiente:

OutputRunning threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543 

Nuevamente, la cantidad de segundos que se imprime después de Threaded time será diferente en su computadora (al igual que el orden de su resultado).

Ahora, puede comparar el tiempo de ejecución de la búsqueda de las cincuenta URL de páginas de Wikipedia con y sin subprocesos.

En la máquina utilizada en este tutorial, la ejecución sin subprocesos llevó ~5.803 segundos y con subprocesos, ~1.220 segundos. Nuestro programa se ejecutó considerablemente más rápido con subprocesos.

Conclusión

En este tutorial, aprendió a usar la utilidad ThreadPoolExecutor de Python 3 para ejecutar de forma eficiente código limitado por las E/S. Creó una función que se puede invocar desde subprocesos, aprendió a recuperar resultados y excepciones de ejecuciones de esa función y notó la mejora de desempeño que se obtiene al utilizar subprocesos.

Ahora, puede obtener más información sobre otras funciones de simultaneidad que ofrece el módulo concurrent.futures.