Skip to main content

TensorFlow ML Pipelines

Introduction

Machine Learning (ML) projects involve much more than just building and training models. They require multiple steps like data validation, preprocessing, model training, evaluation, and deployment, all of which need to work seamlessly together. TensorFlow Extended (TFX) provides a framework to build ML pipelines that automate and standardize these steps.

In this tutorial, we'll explore TensorFlow ML Pipelines as part of TFX. By the end, you'll understand how to create end-to-end ML workflows that are reproducible, scalable, and production-ready.

What are ML Pipelines?

ML pipelines are sequences of steps that automate the machine learning workflow. Each step performs a specific task, such as data ingestion, validation, preprocessing, or model training. The output of one step becomes the input for the next step.

TFX pipelines offer several advantages:

  • Automation: Reduce manual effort by automating repetitive tasks
  • Consistency: Ensure that each step of the ML workflow is performed in the same way every time
  • Reproducibility: Make it easier to reproduce results and debug issues
  • Scalability: Handle large datasets and complex models efficiently
  • Production-readiness: Build pipelines that can be deployed to production environments

TFX Pipeline Components

A TFX pipeline consists of several components, each responsible for a specific part of the ML workflow:

  1. ExampleGen: Ingests and splits data
  2. StatisticsGen: Computes statistics on the data
  3. SchemaGen: Infers a schema for the data
  4. ExampleValidator: Validates the data against the schema
  5. Transform: Performs feature engineering
  6. Trainer: Trains the model
  7. Evaluator: Evaluates the model's performance
  8. Pusher: Deploys the model to production

Let's see how these components work together to create a complete pipeline.

Building a Simple TFX Pipeline

Let's create a simple TFX pipeline for a classification task using the Penguin dataset. First, we need to install the required libraries:

python
# Install TFX
!pip install tfx

Now, let's import the necessary libraries:

python
import tensorflow as tf
import tensorflow_transform as tft
import tfx
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator
from tfx.components import Transform, Trainer, Evaluator, Pusher
from tfx.orchestration import pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

1. Setting Up Our Pipeline

First, we'll define our pipeline parameters:

python
pipeline_name = 'penguin_classification_pipeline'
pipeline_root = '/tmp/tfx_pipeline_output'
data_root = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv'
module_file = '/tmp/penguin_module.py' # We'll create this file later

2. Creating Pipeline Components

Now, we'll create our pipeline components one by one:

ExampleGen - for data ingestion

python
example_gen = CsvExampleGen(input_base=data_root)

StatisticsGen - to compute statistics

python
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

SchemaGen - to infer schema

python
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

ExampleValidator - to validate data against schema

python
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])

Transform - for feature engineering

python
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file)

Trainer - to train the model

python
trainer = Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=1000),
eval_args=tfx.proto.EvalArgs(num_steps=500))

Evaluator - to evaluate model performance

python
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'])

Pusher - to deploy the model

python
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory='/tmp/model_serving')))

3. Creating and Running the Pipeline

Now let's connect all components to create our pipeline:

python
components = [
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
evaluator,
pusher
]

p = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
enable_cache=True,
metadata_connection_config=None)

# Run the pipeline
LocalDagRunner().run(p)

4. Creating the Module File

For our pipeline to work, we need to create a module file that defines the preprocessing and model functions. Let's create the penguin_module.py:

python
import tensorflow as tf
import tensorflow_transform as tft

# Define features and label columns
_FEATURE_KEYS = ['culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g']
_LABEL_KEY = 'species'

def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs."""
outputs = {}

# Scale numerical features to 0-1
for key in _FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(inputs[key])

# Convert label to index
outputs[_LABEL_KEY] = inputs[_LABEL_KEY]

return outputs

def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example."""

model.tft_layer = tf_transform_output.transform_features_layer()

@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)

return serve_tf_examples_fn

def run_fn(fn_args):
"""Train the model."""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

train_dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=fn_args.train_files,
batch_size=32,
features=tf_transform_output.transformed_feature_spec(),
reader=tf.data.TFRecordDataset,
label_key=_LABEL_KEY)

eval_dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=fn_args.eval_files,
batch_size=32,
features=tf_transform_output.transformed_feature_spec(),
reader=tf.data.TFRecordDataset,
label_key=_LABEL_KEY)

feature_spec = tf_transform_output.transformed_feature_spec().copy()
feature_spec.pop(_LABEL_KEY)

# Create a simple Keras model
inputs = {
key: tf.keras.layers.Input(shape=(1,), name=key)
for key in _FEATURE_KEYS
}
x = tf.keras.layers.concatenate(list(inputs.values()))
x = tf.keras.layers.Dense(16, activation='relu')(x)
x = tf.keras.layers.Dense(8, activation='relu')(x)
outputs = tf.keras.layers.Dense(3, activation='softmax')(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)

signatures = {
'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples'))
}

model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Real-World Application: Customer Churn Prediction

Let's explore a more practical example. Imagine you're building a customer churn prediction pipeline for a telecom company. The pipeline would include:

  1. Data ingestion: Load customer data from a CSV file or database
  2. Data validation: Check for anomalies and ensure the data meets expectations
  3. Feature engineering: Create features like customer lifetime value, call frequency, etc.
  4. Model training: Train a binary classification model to predict churn
  5. Model evaluation: Evaluate using metrics like AUC, precision, and recall
  6. Model deployment: Deploy the model to a serving infrastructure

Here's how you might structure such a pipeline:

python
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components import (
CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator,
Transform, Trainer, Evaluator, Pusher, Resolver
)
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing

# Pipeline name and root directory
pipeline_name = 'telecom_churn_prediction'
pipeline_root = '/tmp/tfx/pipelines/' + pipeline_name
data_root = '/path/to/telecom/data'
module_file = '/path/to/churn_module.py'
serving_model_dir = '/tmp/tfx/serving_models/' + pipeline_name

# Create the pipeline components
# 1. Ingest data from CSV
example_gen = CsvExampleGen(
input_base=data_root,
output_config=example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=9),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
])
)
)

# 2. Compute statistics
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

# 3. Create schema
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

# 4. Validate examples against schema
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema']
)

# 5. Perform feature engineering
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file
)

# 6. Train the model
trainer = Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)

# 7. Get the latest blessed model for evaluation comparison
model_resolver = Resolver(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
)

# 8. Evaluate the model
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config
)

# 9. Deploy the model if it's good enough
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)
)
)

# Create and run the pipeline
tfx_pipeline = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen, schema_gen, example_validator,
transform, trainer, model_resolver, evaluator, pusher
],
enable_cache=True
)

BeamDagRunner().run(tfx_pipeline)

Pipeline Orchestration Options

TFX pipelines can be orchestrated (run and managed) in various ways:

  1. LocalDagRunner: For local development and testing
  2. BeamDagRunner: For distributed processing with Apache Beam
  3. KubeflowDagRunner: For running on Kubernetes with Kubeflow
  4. AirflowDagRunner: For scheduling with Apache Airflow

For production use cases, you would typically use Kubeflow or Airflow to orchestrate your pipelines.

Best Practices for TFX Pipelines

When building TFX pipelines, consider these best practices:

  1. Start simple: Begin with a basic pipeline and add complexity gradually
  2. Test components individually: Debug each component before integrating them
  3. Set up CI/CD: Automate testing and deployment of your pipelines
  4. Monitor pipeline runs: Track performance and identify bottlenecks
  5. Version your artifacts: Keep track of datasets, models, and other artifacts
  6. Document your pipelines: Make it easy for others to understand your workflow
  7. Handle failures gracefully: Implement error handling and recovery mechanisms

Summary

TensorFlow ML Pipelines with TFX provide a powerful framework for building end-to-end machine learning workflows. They help you automate repetitive tasks, ensure consistency across runs, and make your models production-ready.

In this tutorial, you've learned:

  • What ML pipelines are and why they're important
  • The core components of a TFX pipeline
  • How to build a simple classification pipeline
  • A more complex real-world application for customer churn prediction
  • Different orchestration options for running pipelines
  • Best practices for developing and maintaining pipelines

By using TFX pipelines, you can focus on solving business problems rather than worrying about the plumbing between different stages of your ML workflow.

Additional Resources

Exercises

  1. Modify the penguin classification pipeline to use a different dataset
  2. Add a component to perform hyperparameter tuning
  3. Implement a custom component that performs data augmentation
  4. Adapt the pipeline to run on Kubeflow or Airflow
  5. Add model validation criteria based on specific performance thresholds
  6. Create a pipeline that performs incremental training on new data

Happy building with TensorFlow ML Pipelines!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)