Machine learning involves tasks that include data sourcing, data ingestion, data transformation, pre-processing data for use in training, training a model, and hosting the model. Additionally, to get value out of machine learning models, we need an architecture and process in place to repeatedly and consistently train new models and retrain existing models with new data.
For example, for a movie dataset from an external source on the internet:
-
If we are in AWS, we could upload it to S3 and then bring to Dynamo DB. The data could be ingested as a one-time full-load as a batch or as a real-time stream.
-
There may be a need to do both batch and stream or just a batch or a stream. In this case, the data could be full-loaded of data into Dynamo DB and then stream new records into Kinesis stream using the Lambda function as a source simulator.
-
We could create a schema on the data stored in S3 and DynamoDB and perform ETL on the data to prepare it for the machine learning process.
-
If we have an AWS S3 data lake ready, we could use Amazon Sagemaker for model training and inference.
As I start a little github repo for my personal research (dumps...) on Data Pipelines (PR your contribution!), in this post, I add my little intro on the topic.
ETL: Extract, Transform, and Load
These three conceptual steps are how most data pipelines are designed and structured, serving as a blueprint for how raw data is transformed to analysis-ready data:
- Extract: sensors wait for upstream data sources.
- Transform: business logic is applied (e.g., filtering, grouping, and aggregation to translate raw data into analysis-ready datasets).
- Load: processed data is transported to a final destination.
Airflow
Apache Airflow was a tool developed by Airbnb in 2014 and later open-sourced. It is a platform to programmatically author, schedule, and monitor workflows. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.
You can use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
The key concepts are:
- DAG: a directed acyclic graph object that ties together all the tasks in a cohesive workflow and dictates the execution frequency (i.e., schedule).
- task: a unit of work to be executed that should be both atomic and idempotent. In Airflow, there are two types of tasks: Operators and Sensors.
- operator: a specific type of work to be executed.
- sensor: a blocking task that runs until a condition is met or until it times out.
Airflow's architecture has the following components:
- job definitions (in source control).
- CLI: to test, run, backfill, describe and clear parts of your DAGs.
- web application: to explore your DAGs definition, their dependencies, progress, metadata, and logs (built-in Flask).
- metadata repository (in MySQL or Postgres): keeps track of task job statuses.
- array of workers: runs jobs task instances in a distributed fashion.
- scheduler: fires up the task instances that are ready.
Here is a very simple toy example of an Airflow job that simply prints the date in bash every day after waiting for one second to pass, after the execution date is reached:
from datetime import datetime, timedelta
from airflow.models import DAG # Import the DAG class
from airflow.operators.bash_operator import BashOperator
from airflow.operators.sensors import TimeDeltaSensor
default_args = {
'owner': 'you',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
}
dag = DAG(
dag_id='anatomy_of_a_dag',
description="This describes my DAG",
default_args=default_args,
schedule_interval=timedelta(days=1)) # This is a daily DAG.
# t0 and t1 are examples of tasks created by instantiating operators
t0 = TimeDeltaSensor(
task_id='wait_a_second',
delta=timedelta(seconds=1),
dag=dag)
t1 = BashOperator(
task_id='print_date_in_bash',
bash_command='date',
dag=dag)
t1.set_upstream(t0)
Luigi
Luigi data pipelining is Spotify's Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, etc. It also comes with Hadoop support built-in.
The basic units of Luigi are task classes that model an atomic ETL operation, in three parts: a requirements part that includes pointers to other tasks that need to run before this task, the data transformation step, and the output. All tasks can be feed into a final table (e.g., on Redshift) into one file.
Here is an example of a simple workflow in Luigi:
import luigi
class WritePipelineTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_one.txt")
def run(self):
with self.output().open("w") as output_file:
output_file.write("pipeline")
class AddMyTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_two.txt")
def requires(self):
return WritePipelineTask()
def run(self):
with self.input().open("r") as input_file:
line = input_file.read()
with self.output().open("w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
Airflow vs. Luigi
Airflow | Luigi | |
---|---|---|
web dashboard | very nice | minimal |
Built in scheduler | yes | no |
Separates output data and task state | yes | no |
calendar scheduling | yes | no, use cron |
parallelism | yes, workers | threads per workers |
finds new deployed tasks | yes | no |
persists state | yes, to db | sort of |
sync tasks to workers | yes | no |
scheduling | yes | no |
Learning References
- Data science resources.
- Lorte data pipelining.
- Incubator Airflow data pipelining
- Udemy's Airflow for Beginners.
- Awesome Airflow Resources.
- Airflow in Kubernetes.
- Astronomer: Airflow as a Service.
- Data pipeline samples.
- GCP Dataflow templates.
- Awesome Scalability: a lot of articles and resources on the subject.
Enterprise Solutions
- Netflix data pipeline.
- Netlix data videos.
- Yelp data pipeline.
- Gusto data pipeline.
- 500px data pipeline
- Twitter data pipeline.
- Coursera data pipeline.
- Cloudfare data pipeline.
- Pandora data pipeline.
- Heroku data pipeline.
- Zillow data pipeline.
- Airbnb data pipeline.
- Walmart data pipeline.
- Robinwood data pipeline.
- Lyft data pipeline.
- Slack data pipeline.
- Remind data pipeline.
- Wish data pipeline.
- Databrick data pipeline.