How To Build a Data Processing Pipeline Using Luigi in Python on Ubuntu 20.04

The author selected the Free and Open Source Fund to receive a donation as part of the Write for DOnations program.

Introduction

Luigi is a Python package that manages long-running batch processing, which is the automated running of data processing jobs on batches of items. Luigi allows you to define a data processing job as a set of dependent tasks. For example, task B depends on the output of task A. And task D depends on the output of task B and task C. Luigi automatically works out what tasks it needs to run to complete a requested job.

Overall Luigi provides a framework to develop and manage data processing pipelines. It was originally developed by Spotify, who use it to manage plumbing together collections of tasks that need to fetch and process data from a variety of sources. Within Luigi, developers at Spotify built functionality to help with their batch processing needs including handling of failures, the ability to automatically resolve dependencies between tasks, and visualization of task processing. Spotify uses Luigi to support batch processing jobs, including providing music recommendations to users, populating internal dashboards, and calculating lists of top songs.

In this tutorial, you will build a data processing pipeline to analyze the most common words from the most popular books on Project Gutenberg. To do this, you will build a pipeline using the Luigi package. You will use Luigi tasks, targets, dependencies, and parameters to build your pipeline.

Prerequisites

To complete this tutorial, you will need the following:

  • An Ubuntu server set up with a non-root user with sudo privileges. Follow the Initial Server Setup with Ubuntu 20.04 guide.
  • Python 3.6 or higher and virtualenv installed. Follow How To Install Python 3 and Set Up a Local Programming Environment on Ubuntu 20.04 to configure Python and install virtualenv. You’ll set up the environment and project folders in this tutorial.

Step 1 — Installing Luigi

In this step, you will create a clean sandbox environment for your Luigi installation.

First, create a project directory. For this tutorial luigi-demo:

  • mkdir luigi-demo

Navigate into the newly created luigi-demo directory:

  • cd luigi-demo

Create a new virtual environment luigi-venv:

  • python3 -m venv luigi-venv

And activate the newly created virtual environment:

  • . luigi-venv/bin/activate

You will find (luigi-venv) appended to the front of your terminal prompt to indicate which virtual environment is active:

Output(luigi-venv) [email protected]:~/luigi-demo$ 

For this tutorial, you will need three libraries: luigi, beautifulsoup4, and requests. The requests library streamlines making HTTP requests; you will use it to download the Project Gutenberg book lists and the books to analyze. The beautifulsoup4 library provides functions to parse data from web pages; you will use it to parse out a list of the most popular books on the Project Gutenberg site.

Run the following command to install these libraries using pip:

  • pip install wheel luigi beautifulsoup4 requests

You will get a response confirming the installation of the latest versions of the libraries and all of their dependencies:

OutputSuccessfully installed beautifulsoup4-4.9.1 certifi-2020.6.20 chardet-3.0.4 docutils-0.16 idna-2.10 lockfile-0.12.2 luigi-3.0.1 python-daemon-2.2.4 python-dateutil-2.8.1 requests-2.24.0 six-1.15.0 soupsieve-2.0.1 tornado-5.1.1 urllib3-1.25.10 

You’ve installed the dependencies for your project. Now, you’ll move on to building your first Luigi task.

Step 2 — Creating a Luigi Task

In this step, you will create a “Hello World” Luigi task to demonstrate how they work.

A Luigi task is where the execution of your pipeline and the definition of each task’s input and output dependencies take place. Tasks are the building blocks that you will create your pipeline from. You define them in a class, which contains:

  • A run() method that holds the logic for executing the task.
  • An output() method that returns the artifacts generated by the task. The run() method populates these artifacts.
  • An optional input() method that returns any additional tasks in your pipeline that are required to execute the current task. The run() method uses these to carry out the task.

Create a new file hello-world.py:

  • nano hello-world.py

Now add the following code to your file:

hello-world.py

import luigi  class HelloLuigi(luigi.Task):      def output(self):         return luigi.LocalTarget('hello-luigi.txt')      def run(self):         with self.output().open("w") as outfile:             outfile.write("Hello Luigi!")  

You define that HelloLuigi() is a Luigi task by adding the luigi.Task mixin to it.

The output() method defines one or more Target outputs that your task produces. In the case of this example, you define a luigi.LocalTarget, which is a local file.

Note: Luigi allows you to connect to a variety of common data sources including AWS S3 buckets, MongoDB databases, and SQL databases. You can find a complete list of supported data sources in the Luigi docs.

The run() method contains the code you want to execute for your pipeline stage. For this example you are opening the output() target file in write mode, self.output().open("w") as outfile: and writing "Hello Luigi!" to it with outfile.write("Hello Luigi!").

To execute the task you created, run the following command:

  • python -m luigi --module hello-world HelloLuigi --local-scheduler

Here, you run the task using python -m instead of executing the luigi command directly; this is because Luigi can only execute code that is within the current PYTHONPATH. You can alternatively add PYTHONPATH='.' to the front of your Luigi command, like so:

  • PYTHONPATH='.' luigi --module hello-world HelloLuigi --local-scheduler

With the --module hello-world HelloLuigi flag, you tell Luigi which Python module and Luigi task to execute.

The --local-scheduler flag tells Luigi to not connect to a Luigi scheduler and, instead, execute this task locally. (We explain the Luigi scheduler in Step 4.) Running tasks using the local-scheduler flag is only recommended for development work.

Luigi will output a summary of the executed tasks:

Output===== Luigi Execution Summary =====  Scheduled 1 tasks of which: * 1 ran successfully:     - 1 HelloLuigi()  This progress looks :) because there were no failed tasks or missing dependencies  ===== Luigi Execution Summary ===== 

And it will create a new file hello-luigi.txt with content:

hello-luigi.txt

Hello Luigi! 

You have created a Luigi task that generates a file and then executed it using the Luigi local-scheduler. Now, you’ll create a task that can extract a list of books from a web page.

Step 3 — Creating a Task to Extract a List of Books

In this step, you will create a Luigi task and define a run() method for the task to download a list of the most popular books on Project Gutenberg. You’ll define an output() method to store links to these books in a file. You will run these using the Luigi local scheduler.

Create a new directory data inside of your luigi-demo directory. This will be where you will store the files defined in the output() methods of your tasks. You need to create the directories before running your tasks—Python throws exceptions when you try to write a file to a directory that does not exist yet:

  • mkdir data
  • mkdir data/counts
  • mkdir data/downloads

Create a new file word-frequency.py:

  • nano word-frequency.py

Insert the following code, which is a Luigi task to extract a list of links to the top most-read books on Project Gutenberg:

word-frequency.py

import requests import luigi from bs4 import BeautifulSoup   class GetTopBooks(luigi.Task):     """     Get list of the most popular books from Project Gutenberg     """      def output(self):         return luigi.LocalTarget("data/books_list.txt")      def run(self):         resp = requests.get("http://www.gutenberg.org/browse/scores/top")          soup = BeautifulSoup(resp.content, "html.parser")          pageHeader = soup.find_all("h2", string="Top 100 EBooks yesterday")[0]         listTop = pageHeader.find_next_sibling("ol")          with self.output().open("w") as f:             for result in listTop.select("li>a"):                 if "/ebooks/" in result["href"]:                     f.write("http://www.gutenberg.org{link}.txt.utf-8n"                         .format(                             link=result["href"]                         )                     ) 

You define an output() target of file "data/books_list.txt" to store the list of books.

In the run() method, you:

  • use the requests library to download the HTML contents of the Project Gutenberg top books page.
  • use the BeautifulSoup library to parse the contents of the page. The BeautifulSoup library allows us to scrape information out of web pages. To find out more about using the BeautifulSoup library, read the How To Scrape Web Pages with Beautiful Soup and Python 3 tutorial.
  • open the output file defined in the output() method.
  • iterate over the HTML structure to get all of the links in the Top 100 EBooks yesterday list. For this page, this is locating all links <a> that are within a list item <li>. For each of those links, if they link to a page that points at a link containing /ebooks/, you can assume it is a book and write that link to your output() file.

Screenshot of the Project Gutenberg top books web page with the top ebooks links highlighted

Save and exit the file once you’re done.

Execute this new task using the following command:

  • python -m luigi --module word-frequency GetTopBooks --local-scheduler

Luigi will output a summary of the executed tasks:

Output===== Luigi Execution Summary =====  Scheduled 1 tasks of which: * 1 ran successfully:     - 1 GetTopBooks()  This progress looks :) because there were no failed tasks or missing dependencies  ===== Luigi Execution Summary ===== 

In the data directory, Luigi will create a new file (data/books_list.txt). Run the following command to output the contents of the file:

  • cat data/books_list.txt

This file contains a list of URLs extracted from the Project Gutenberg top projects list:

Outputhttp://www.gutenberg.org/ebooks/1342.txt.utf-8 http://www.gutenberg.org/ebooks/11.txt.utf-8 http://www.gutenberg.org/ebooks/2701.txt.utf-8 http://www.gutenberg.org/ebooks/1661.txt.utf-8 http://www.gutenberg.org/ebooks/16328.txt.utf-8 http://www.gutenberg.org/ebooks/45858.txt.utf-8 http://www.gutenberg.org/ebooks/98.txt.utf-8 http://www.gutenberg.org/ebooks/84.txt.utf-8 http://www.gutenberg.org/ebooks/5200.txt.utf-8 http://www.gutenberg.org/ebooks/51461.txt.utf-8 ... 

You’ve created a task that can extract a list of books from a web page. In the next step, you’ll set up a central Luigi scheduler.

Step 4 — Running the Luigi Scheduler

Now, you’ll launch the Luigi scheduler to execute and visualize your tasks. You will take the task developed in Step 3 and run it using the Luigi scheduler.

So far, you have been running Luigi using the --local-scheduler tag to run your jobs locally without allocating work to a central scheduler. This is useful for development, but for production usage it is recommended to use the Luigi scheduler. The Luigi scheduler provides:

  • A central point to execute your tasks.
  • Visualization of the execution of your tasks.

To access the Luigi scheduler interface, you need to enable access to port 8082. To do this, run the following command:

  • sudo ufw allow 8082/tcp

To run the scheduler execute the following command:

  • sudo sh -c ". luigi-venv/bin/activate ;luigid --background --port 8082"

Note: We have re-run the virtualenv activate script as root, before launching the Luigi scheduler as a background task. This is because when running sudo the virtualenv environment variables and aliases are not carried over.

If you do not want to run as root, you can run the Luigi scheduler as a background process for the current user. This command runs the Luigi scheduler in the background and hides messages from the scheduler background task. You can find out more about managing background processes in the terminal at How To Use Bash’s Job Control to Manage Foreground and Background Processes:

  • luigid --port 8082 > /dev/null 2> /dev/null &

Open a browser to access the Luigi interface. This will either be at http://your_server_ip:8082, or if you have set up a domain for your server http://your_domain:8082. This will open the Luigi user interface.

Luigi default user interface

By default, Luigi tasks run using the Luigi scheduler. To run one of your previous tasks using the Luigi scheduler omit the --local-scheduler argument from the command. Re-run the task from Step 3 using the following command:

  • python -m luigi --module word-frequency GetTopBooks

Refresh the Luigi scheduler user interface. You will find the GetTopBooks task added to the run list and its execution status.

Luigi User Interface after running the GetTopBooks Task

You will continue to refer back to this user interface to monitor the progress of your pipeline.

Note: If you’d like to secure your Luigi scheduler through HTTPS, you can serve it through Nginx. To set up an Nginx server using HTTPS follow: How To Secure Nginx with Let’s Encrypt on Ubuntu 20.04. See Github – Luigi – Pull Request 2785 for suggestions on a suitable Nginx configuration to connect the Luigi server to Nginx.

You’ve launched the Luigi Scheduler and used it to visualize your executed tasks. Next, you will create a task to download the list of books that the GetTopBooks() task outputs.

Step 5 — Downloading the Books

In this step you will create a Luigi task to download a specified book. You will define a dependency between this newly created task and the task created in Step 3.

First open your file:

  • nano word-frequency.py

Add an additional class following your GetTopBooks() task to the word-frequency.py file with the following code:

word-frequency.py

. . . class DownloadBooks(luigi.Task):     """     Download a specified list of books     """     FileID = luigi.IntParameter()      REPLACE_LIST = """.,"';_[]:*-"""      def requires(self):         return GetTopBooks()      def output(self):         return luigi.LocalTarget("data/downloads/{}.txt".format(self.FileID))      def run(self):         with self.input().open("r") as i:             URL = i.read().splitlines()[self.FileID]              with self.output().open("w") as outfile:                 book_downloads = requests.get(URL)                 book_text = book_downloads.text                  for char in self.REPLACE_LIST:                     book_text = book_text.replace(char, " ")                  book_text = book_text.lower()                 outfile.write(book_text) 

In this task you introduce a Parameter; in this case, an integer parameter. Luigi parameters are inputs to your tasks that affect the execution of the pipeline. Here you introduce a parameter FileID to specify a line in your list of URLs to fetch.

You have added an additional method to your Luigi task, def requires(); in this method you define the Luigi task that you need the output of before you can execute this task. You require the output of the GetTopBooks() task you defined in Step 3.

In the output() method, you define your target. You use the FileID parameter to create a name for the file created by this step. In this case, you format data/downloads/{FileID}.txt.

In the run() method, you:

  • open the list of books generated in the GetTopBooks() task.
  • get the URL from the line specified by parameter FileID.
  • use the requests library to download the contents of the book from the URL.
  • filter out any special characters inside the book like :,.?, so they don’t get included in your word analysis.
  • convert the text to lowercase so you can compare words with different cases.
  • write the filtered output to the file specified in the output() method.

Save and exit your file.

Run the new DownloadBooks() task using this command:

  • python -m luigi --module word-frequency DownloadBooks --FileID 2

In this command, you set the FileID parameter using the --FileID argument.

Note: Be careful when defining a parameter with an _ in the name. To reference them in Luigi you need to substitute the _ for a -. For example, a File_ID parameter would be referenced as --File-ID when calling a task from the terminal.

You will receive the following output:

Output===== Luigi Execution Summary =====  Scheduled 2 tasks of which: * 1 complete ones were encountered:     - 1 GetTopBooks() * 1 ran successfully:     - 1 DownloadBooks(FileID=2)  This progress looks :) because there were no failed tasks or missing dependencies  ===== Luigi Execution Summary ===== 

Note from the output that Luigi has detected that you have already generated the output of GetTopBooks() and skipped running that task. This functionality allows you to minimize the number of tasks you have to execute as you can re-use successful output from previous runs.

You have created a task that uses the output of another task and downloads a set of books to analyze. In the next step, you will create a task to count the most common words in a downloaded book.

Step 6 — Counting Words and Summarizing Results

In this step, you will create a Luigi task to count the frequency of words in each of the books downloaded in Step 5. This will be your first task that executes in parallel.

First open your file again:

  • nano word-frequency.py

Add the following imports to the top of word-frequency.py:

word-frequency.py

from collections import Counter import pickle 

Add the following task to word-frequency.py, after your DownloadBooks() task. This task takes the output of the previous DownloadBooks() task for a specified book, and returns the most common words in that book:

word-frequency.py

class CountWords(luigi.Task):     """     Count the frequency of the most common words from a file     """      FileID = luigi.IntParameter()      def requires(self):         return DownloadBooks(FileID=self.FileID)      def output(self):         return luigi.LocalTarget(             "data/counts/count_{}.pickle".format(self.FileID),             format=luigi.format.Nop         )      def run(self):         with self.input().open("r") as i:             word_count = Counter(i.read().split())              with self.output().open("w") as outfile:                 pickle.dump(word_count, outfile) 

When you define requires() you pass the FileID parameter to the next task. When you specify that a task depends on another task, you specify the parameters you need the dependent task to be executed with.

In the run() method you:

  • open the file generated by the DownloadBooks() task.
  • use the built-in Counter object in the collections library. This provides an easy way to analyze the most common words in a book.
  • use the pickle library to store the output of the Python Counter object, so you can re-use that object in a later task. pickle is a library that you use to convert Python objects into a byte stream, which you can store and restore into a later Python session. You have to set the format property of the luigi.LocalTarget to allow it to write the binary output the pickle library generates.

Save and exit your file.

Run the new CountWords() task using this command:

  • python -m luigi --module word-frequency CountWords --FileID 2

Open the CountWords task graph view in the Luigi scheduler user interface.

Showing how to view a graph from the Luigi user interface

Deselect the Hide Done option, and deselect Upstream Dependencies. You will find the flow of execution from the tasks you have created.

Visualizing the execution of the CountWords task

You have created a task to count the most common words in a downloaded book and visualized the dependencies between those tasks. Next, you will define parameters that you can use to customize the execution of your tasks.

Step 7 — Defining Configuration Parameters

In this step, you will add configuration parameters to the pipeline. These will allow you to customize how many books to analyze and the number of words to include in the results.

When you want to set parameters that are shared among tasks, you can create a Config() class. Other pipeline stages can reference the parameters defined in the Config() class; these are set by the pipeline when executing a job.

Add the following Config() class to the end of word-frequency.py. This will define two new parameters in your pipeline for the number of books to analyze and the number of most frequent words to include in the summary:

word-frequency.py

class GlobalParams(luigi.Config):     NumberBooks = luigi.IntParameter(default=10)     NumberTopWords = luigi.IntParameter(default=500) 

Add the following class to word-frequency.py. This class aggregates the results from all of the CountWords() task to create a summary of the most frequent words:

word-frequency.py

class TopWords(luigi.Task):     """     Aggregate the count results from the different files     """      def requires(self):         requiredInputs = []         for i in range(GlobalParams().NumberBooks):             requiredInputs.append(CountWords(FileID=i))         return requiredInputs      def output(self):         return luigi.LocalTarget("data/summary.txt")      def run(self):         total_count = Counter()         for input in self.input():             with input.open("rb") as infile:                 nextCounter = pickle.load(infile)                 total_count += nextCounter          with self.output().open("w") as f:             for item in total_count.most_common(GlobalParams().NumberTopWords):                 f.write("{0: <15}{1}n".format(*item))  

In the requires() method, you can provide a list where you want a task to use the output of multiple dependent tasks. You use the GlobalParams().NumberBooks parameter to set the number of books you need word counts from.

In the output() method, you define a data/summary.txt output file that will be the final output of your pipeline.

In the run() method you:

  • create a Counter() object to store the total count.
  • open the file and “unpickle” it (convert it from a file back to a Python object), for each count carried out in the CountWords() method
  • append the loaded count and add it to the total count.
  • write the most common words to target output file.

Run the pipeline with the following command:

  • python -m luigi --module word-frequency TopWords --GlobalParams-NumberBooks 15 --GlobalParams-NumberTopWords 750

Luigi will execute the remaining tasks needed to generate the summary of the top words:

Output===== Luigi Execution Summary =====  Scheduled 31 tasks of which: * 2 complete ones were encountered:     - 1 CountWords(FileID=2)     - 1 GetTopBooks() * 29 ran successfully:     - 14 CountWords(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)     - 14 DownloadBooks(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)     - 1 TopWords()  This progress looks :) because there were no failed tasks or missing dependencies  ===== Luigi Execution Summary ===== 

You can visualize the execution of the pipeline from the Luigi scheduler. Select the GetTopBooks task in the task list and press the View Graph button.

Showing how to view a graph from the Luigi user interface

Deselect the Hide Done and Upstream Dependencies options.

Visualizing the execution of the TopWords Task

It will show the flow of processing that is happening in Luigi.

Open the data/summary.txt file:

  • cat data/summary.txt

You will find the calculated most common words:

Outputthe            64593 and            41650 of             31896 to             31368 a              25265 i              23449 in             19496 it             16282 that           15907 he             14974 ... 

In this step, you have defined and used parameters to customize the execution of your tasks. You have generated a summary of the most common words for a set of books.

Find all the code for this tutorial in this repository.

Conclusion

This tutorial has introduced you to using the Luigi data processing pipeline and its major features including tasks, parameters, configuration parameters, and the Luigi scheduler.

Luigi supports connecting to a large number of common data sources out the box. You can also scale it to run large, complex data pipelines. This provides a powerful framework to start solving your data processing challenges.

For more tutorials, check out our Data Analysis topic page and Python topic page.