TensorFlow ML Pipelines
Introduction
Machine Learning (ML) projects involve much more than just building and training models. They require multiple steps like data validation, preprocessing, model training, evaluation, and deployment, all of which need to work seamlessly together. TensorFlow Extended (TFX) provides a framework to build ML pipelines that automate and standardize these steps.
In this tutorial, we'll explore TensorFlow ML Pipelines as part of TFX. By the end, you'll understand how to create end-to-end ML workflows that are reproducible, scalable, and production-ready.
What are ML Pipelines?
ML pipelines are sequences of steps that automate the machine learning workflow. Each step performs a specific task, such as data ingestion, validation, preprocessing, or model training. The output of one step becomes the input for the next step.
TFX pipelines offer several advantages:
- Automation: Reduce manual effort by automating repetitive tasks
- Consistency: Ensure that each step of the ML workflow is performed in the same way every time
- Reproducibility: Make it easier to reproduce results and debug issues
- Scalability: Handle large datasets and complex models efficiently
- Production-readiness: Build pipelines that can be deployed to production environments
TFX Pipeline Components
A TFX pipeline consists of several components, each responsible for a specific part of the ML workflow:
- ExampleGen: Ingests and splits data
- StatisticsGen: Computes statistics on the data
- SchemaGen: Infers a schema for the data
- ExampleValidator: Validates the data against the schema
- Transform: Performs feature engineering
- Trainer: Trains the model
- Evaluator: Evaluates the model's performance
- Pusher: Deploys the model to production
Let's see how these components work together to create a complete pipeline.
Building a Simple TFX Pipeline
Let's create a simple TFX pipeline for a classification task using the Penguin dataset. First, we need to install the required libraries:
# Install TFX
!pip install tfx
Now, let's import the necessary libraries:
import tensorflow as tf
import tensorflow_transform as tft
import tfx
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator
from tfx.components import Transform, Trainer, Evaluator, Pusher
from tfx.orchestration import pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner
1. Setting Up Our Pipeline
First, we'll define our pipeline parameters:
pipeline_name = 'penguin_classification_pipeline'
pipeline_root = '/tmp/tfx_pipeline_output'
data_root = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv'
module_file = '/tmp/penguin_module.py' # We'll create this file later
2. Creating Pipeline Components
Now, we'll create our pipeline components one by one:
ExampleGen - for data ingestion
example_gen = CsvExampleGen(input_base=data_root)
StatisticsGen - to compute statistics
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
SchemaGen - to infer schema
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
ExampleValidator - to validate data against schema
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
Transform - for feature engineering
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file)
Trainer - to train the model
trainer = Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=1000),
eval_args=tfx.proto.EvalArgs(num_steps=500))
Evaluator - to evaluate model performance
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'])
Pusher - to deploy the model
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory='/tmp/model_serving')))
3. Creating and Running the Pipeline
Now let's connect all components to create our pipeline:
components = [
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
evaluator,
pusher
]
p = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
enable_cache=True,
metadata_connection_config=None)
# Run the pipeline
LocalDagRunner().run(p)
4. Creating the Module File
For our pipeline to work, we need to create a module file that defines the preprocessing and model functions. Let's create the penguin_module.py
:
import tensorflow as tf
import tensorflow_transform as tft
# Define features and label columns
_FEATURE_KEYS = ['culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g']
_LABEL_KEY = 'species'
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs."""
outputs = {}
# Scale numerical features to 0-1
for key in _FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(inputs[key])
# Convert label to index
outputs[_LABEL_KEY] = inputs[_LABEL_KEY]
return outputs
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example."""
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)
return serve_tf_examples_fn
def run_fn(fn_args):
"""Train the model."""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=fn_args.train_files,
batch_size=32,
features=tf_transform_output.transformed_feature_spec(),
reader=tf.data.TFRecordDataset,
label_key=_LABEL_KEY)
eval_dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=fn_args.eval_files,
batch_size=32,
features=tf_transform_output.transformed_feature_spec(),
reader=tf.data.TFRecordDataset,
label_key=_LABEL_KEY)
feature_spec = tf_transform_output.transformed_feature_spec().copy()
feature_spec.pop(_LABEL_KEY)
# Create a simple Keras model
inputs = {
key: tf.keras.layers.Input(shape=(1,), name=key)
for key in _FEATURE_KEYS
}
x = tf.keras.layers.concatenate(list(inputs.values()))
x = tf.keras.layers.Dense(16, activation='relu')(x)
x = tf.keras.layers.Dense(8, activation='relu')(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples'))
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
Real-World Application: Customer Churn Prediction
Let's explore a more practical example. Imagine you're building a customer churn prediction pipeline for a telecom company. The pipeline would include:
- Data ingestion: Load customer data from a CSV file or database
- Data validation: Check for anomalies and ensure the data meets expectations
- Feature engineering: Create features like customer lifetime value, call frequency, etc.
- Model training: Train a binary classification model to predict churn
- Model evaluation: Evaluate using metrics like AUC, precision, and recall
- Model deployment: Deploy the model to a serving infrastructure
Here's how you might structure such a pipeline:
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components import (
CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator,
Transform, Trainer, Evaluator, Pusher, Resolver
)
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
# Pipeline name and root directory
pipeline_name = 'telecom_churn_prediction'
pipeline_root = '/tmp/tfx/pipelines/' + pipeline_name
data_root = '/path/to/telecom/data'
module_file = '/path/to/churn_module.py'
serving_model_dir = '/tmp/tfx/serving_models/' + pipeline_name
# Create the pipeline components
# 1. Ingest data from CSV
example_gen = CsvExampleGen(
input_base=data_root,
output_config=example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=9),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
])
)
)
# 2. Compute statistics
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
# 3. Create schema
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
# 4. Validate examples against schema
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema']
)
# 5. Perform feature engineering
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file
)
# 6. Train the model
trainer = Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
# 7. Get the latest blessed model for evaluation comparison
model_resolver = Resolver(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
)
# 8. Evaluate the model
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config
)
# 9. Deploy the model if it's good enough
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)
)
)
# Create and run the pipeline
tfx_pipeline = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen, schema_gen, example_validator,
transform, trainer, model_resolver, evaluator, pusher
],
enable_cache=True
)
BeamDagRunner().run(tfx_pipeline)
Pipeline Orchestration Options
TFX pipelines can be orchestrated (run and managed) in various ways:
- LocalDagRunner: For local development and testing
- BeamDagRunner: For distributed processing with Apache Beam
- KubeflowDagRunner: For running on Kubernetes with Kubeflow
- AirflowDagRunner: For scheduling with Apache Airflow
For production use cases, you would typically use Kubeflow or Airflow to orchestrate your pipelines.
Best Practices for TFX Pipelines
When building TFX pipelines, consider these best practices:
- Start simple: Begin with a basic pipeline and add complexity gradually
- Test components individually: Debug each component before integrating them
- Set up CI/CD: Automate testing and deployment of your pipelines
- Monitor pipeline runs: Track performance and identify bottlenecks
- Version your artifacts: Keep track of datasets, models, and other artifacts
- Document your pipelines: Make it easy for others to understand your workflow
- Handle failures gracefully: Implement error handling and recovery mechanisms
Summary
TensorFlow ML Pipelines with TFX provide a powerful framework for building end-to-end machine learning workflows. They help you automate repetitive tasks, ensure consistency across runs, and make your models production-ready.
In this tutorial, you've learned:
- What ML pipelines are and why they're important
- The core components of a TFX pipeline
- How to build a simple classification pipeline
- A more complex real-world application for customer churn prediction
- Different orchestration options for running pipelines
- Best practices for developing and maintaining pipelines
By using TFX pipelines, you can focus on solving business problems rather than worrying about the plumbing between different stages of your ML workflow.
Additional Resources
Exercises
- Modify the penguin classification pipeline to use a different dataset
- Add a component to perform hyperparameter tuning
- Implement a custom component that performs data augmentation
- Adapt the pipeline to run on Kubeflow or Airflow
- Add model validation criteria based on specific performance thresholds
- Create a pipeline that performs incremental training on new data
Happy building with TensorFlow ML Pipelines!
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)