Skip to main content

TensorFlow Anomaly Detection

Introduction

Anomaly detection is the process of identifying data points, events, or observations that deviate significantly from the norm or expected behavior. These unusual patterns can indicate critical incidents such as:

  • Fraudulent transactions in financial systems
  • Malfunctioning equipment in manufacturing
  • Network intrusions in cybersecurity
  • Medical abnormalities in healthcare data

TensorFlow provides powerful tools for implementing anomaly detection through its probabilistic learning capabilities. In this tutorial, we'll explore how to build effective anomaly detection systems using TensorFlow, focusing on techniques that are accessible to beginners while providing robust detection capabilities.

Understanding Anomaly Detection

Anomalies (also called outliers) fall into three general categories:

  1. Point Anomalies: Individual data points that are far from the normal pattern
  2. Contextual Anomalies: Data points that are anomalous in a specific context
  3. Collective Anomalies: Collections of data points that together form an anomaly

TensorFlow enables us to detect these anomalies through several approaches:

  • Statistical methods
  • Machine learning-based methods (especially autoencoders)
  • Deep learning techniques

Setting Up Your Environment

First, let's set up our environment with the necessary libraries:

python
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Check TensorFlow version
print(f"TensorFlow version: {tf.__version__}")

Method 1: Autoencoders for Anomaly Detection

Autoencoders are neural networks that learn to compress data and then reconstruct it. When trained on normal data, they struggle to accurately reconstruct anomalies, making the reconstruction error a good indicator of anomalies.

Step 1: Create a Simple Dataset with Anomalies

Let's create a synthetic dataset with some anomalies:

python
# Generate normal data
np.random.seed(42)
normal_data = np.random.normal(0, 1, size=(1000, 10))

# Generate anomalies
anomalies = np.random.normal(5, 1, size=(50, 10))

# Combine normal and anomalous data
all_data = np.vstack([normal_data, anomalies])

# Labels: 0 for normal, 1 for anomaly
labels = np.hstack([np.zeros(1000), np.ones(50)])

# Split data
X_train, X_test, y_train, y_test = train_test_split(
all_data, labels, test_size=0.2, random_state=42,
stratify=labels # Ensure both sets have normal and anomalous samples
)

# Train only on normal data (filter out anomalies)
X_train_normal = X_train[y_train == 0]

# Normalize data
scaler = StandardScaler()
X_train_normal = scaler.fit_transform(X_train_normal)
X_test = scaler.transform(X_test)

Step 2: Build an Autoencoder Model

Now, let's build an autoencoder using TensorFlow's Keras API:

python
def build_autoencoder(input_dim, encoding_dim=5):
# Encoder
input_layer = tf.keras.layers.Input(shape=(input_dim,))
encoder = tf.keras.layers.Dense(encoding_dim, activation='relu')(input_layer)

# Decoder
decoder = tf.keras.layers.Dense(input_dim, activation='sigmoid')(encoder)

# Autoencoder model
autoencoder = tf.keras.models.Model(inputs=input_layer, outputs=decoder)

# Compile model
autoencoder.compile(optimizer='adam', loss='mse')

return autoencoder

# Build model
input_dim = X_train_normal.shape[1]
autoencoder = build_autoencoder(input_dim)
autoencoder.summary()

Step 3: Train the Autoencoder

Now let's train our autoencoder on the normal data only:

python
# Train the model
history = autoencoder.fit(
X_train_normal, X_train_normal, # Input = Output for autoencoders
epochs=50,
batch_size=32,
validation_split=0.1,
verbose=1
)

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Autoencoder Training')
plt.xlabel('Epochs')
plt.ylabel('Mean Squared Error')
plt.legend()
plt.show()

Step 4: Detect Anomalies

Now we can use our trained autoencoder to detect anomalies in the test set:

python
# Predict reconstructions
reconstructions = autoencoder.predict(X_test)

# Calculate reconstruction error
mse = np.mean(np.power(X_test - reconstructions, 2), axis=1)

# Plot reconstruction error
plt.figure(figsize=(12, 6))
plt.hist(mse, bins=50)
plt.xlabel('Reconstruction Error')
plt.ylabel('Count')
plt.title('Reconstruction Error Distribution')
plt.show()

# Find threshold (you can adjust this based on your needs)
threshold = np.percentile(mse, 95) # 95th percentile
print(f"Threshold: {threshold}")

# Identify anomalies
predicted_anomalies = mse > threshold

# Performance metrics
from sklearn.metrics import classification_report, confusion_matrix

print("Confusion Matrix:")
print(confusion_matrix(y_test, predicted_anomalies))
print("\nClassification Report:")
print(classification_report(y_test, predicted_anomalies))

# Visualize results
plt.figure(figsize=(12, 6))
plt.scatter(range(len(y_test)), mse, c=y_test, cmap='coolwarm', alpha=0.7)
plt.axhline(y=threshold, color='r', linestyle='-', label='Threshold')
plt.xlabel('Test Sample Index')
plt.ylabel('Reconstruction Error')
plt.title('Anomaly Detection Results')
plt.colorbar(label='Actual Class')
plt.legend()
plt.show()

Method 2: Using TensorFlow Probability for Anomaly Detection

TensorFlow Probability (TFP) provides more sophisticated tools for probabilistic modeling and anomaly detection. Let's implement a Variational Autoencoder (VAE) for anomaly detection:

Step 1: Import TensorFlow Probability

python
import tensorflow_probability as tfp
tfd = tfp.distributions

print(f"TensorFlow Probability version: {tfp.__version__}")

Step 2: Build a Variational Autoencoder (VAE)

python
class VAE(tf.keras.Model):
def __init__(self, input_dim, latent_dim=2):
super(VAE, self).__init__()
self.latent_dim = latent_dim

# Encoder
self.encoder = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(input_dim,)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(latent_dim + latent_dim) # Mean and log variance
])

# Decoder
self.decoder = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(input_dim)
])

def encode(self, x):
mean_log_var = self.encoder(x)
mean, log_var = tf.split(mean_log_var, num_or_size_splits=2, axis=1)
return mean, log_var

def reparameterize(self, mean, log_var):
eps = tf.random.normal(shape=tf.shape(mean))
return mean + tf.exp(log_var * 0.5) * eps

def decode(self, z):
return self.decoder(z)

def call(self, x):
mean, log_var = self.encode(x)
z = self.reparameterize(mean, log_var)
x_reconstructed = self.decode(z)
return x_reconstructed, mean, log_var

# Define loss function
def vae_loss(x, x_reconstructed, mean, log_var):
# Reconstruction loss
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(tf.square(x - x_reconstructed), axis=1)
)

# KL divergence
kl_loss = -0.5 * tf.reduce_mean(
tf.reduce_sum(1 + log_var - tf.square(mean) - tf.exp(log_var), axis=1)
)

return reconstruction_loss + kl_loss

# Build and compile the model
vae = VAE(input_dim=input_dim, latent_dim=2)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

Step 3: Train the VAE

python
@tf.function
def train_step(x):
with tf.GradientTape() as tape:
x_reconstructed, mean, log_var = vae(x)
loss = vae_loss(x, x_reconstructed, mean, log_var)

gradients = tape.gradient(loss, vae.trainable_variables)
optimizer.apply_gradients(zip(gradients, vae.trainable_variables))
return loss

# Training loop
epochs = 30
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices(X_train_normal).shuffle(1000).batch(batch_size)

loss_history = []

for epoch in range(epochs):
epoch_loss = 0
num_batches = 0

for batch in train_dataset:
loss = train_step(batch)
epoch_loss += loss
num_batches += 1

avg_loss = epoch_loss / num_batches
loss_history.append(avg_loss)

print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

# Plot training progress
plt.figure(figsize=(10, 6))
plt.plot(loss_history)
plt.title('VAE Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

Step 4: Detect Anomalies with VAE

python
# Compute reconstruction error for test data
def compute_reconstruction_error(model, x):
x_reconstructed, _, _ = model(x)
mse = tf.reduce_mean(tf.square(x - x_reconstructed), axis=1)
return mse.numpy()

# Calculate reconstruction error for all test samples
test_loss = compute_reconstruction_error(vae, X_test)

# Find threshold (95th percentile of reconstruction error on normal training data)
train_loss = compute_reconstruction_error(vae, X_train_normal)
threshold = np.percentile(train_loss, 95)

# Identify anomalies
predicted_anomalies = test_loss > threshold

# Performance metrics
print("Confusion Matrix:")
print(confusion_matrix(y_test, predicted_anomalies))
print("\nClassification Report:")
print(classification_report(y_test, predicted_anomalies))

# Visualize results
plt.figure(figsize=(12, 6))
plt.scatter(range(len(y_test)), test_loss, c=y_test, cmap='coolwarm', alpha=0.7)
plt.axhline(y=threshold, color='r', linestyle='-', label='Threshold')
plt.xlabel('Test Sample Index')
plt.ylabel('Reconstruction Error')
plt.title('VAE Anomaly Detection Results')
plt.colorbar(label='Actual Class')
plt.legend()
plt.show()

Real-World Application: Credit Card Fraud Detection

Let's apply anomaly detection to a real-world problem: credit card fraud detection.

python
# For this example, we'll use a synthetic dataset similar to credit card transactions
# In practice, you would load a real dataset like the Credit Card Fraud Detection dataset from Kaggle

# Generate synthetic credit card transaction data
np.random.seed(42)
n_samples = 10000
n_features = 30

# Normal transactions
normal_transactions = np.random.normal(0, 1, size=(n_samples, n_features))

# Fraudulent transactions (1% of all transactions)
n_fraud = int(n_samples * 0.01)
fraud_transactions = np.random.normal(3, 2, size=(n_fraud, n_features))

# Add some realistic features
# Time of day (represented as hour, normalized)
time_normal = np.random.normal(12, 4, (n_samples, 1)) # Most transactions during day
time_fraud = np.random.normal(3, 2, (n_fraud, 1)) # Fraudulent more common late night

# Transaction amount
amount_normal = np.abs(np.random.normal(50, 30, (n_samples, 1))) # Normal transactions ~$50
amount_fraud = np.abs(np.random.normal(200, 150, (n_fraud, 1))) # Fraudulent often larger

# Combine features
normal_transactions = np.hstack([normal_transactions, time_normal, amount_normal])
fraud_transactions = np.hstack([fraud_transactions, time_fraud, amount_fraud])

# Combine datasets
X = np.vstack([normal_transactions, fraud_transactions])
y = np.hstack([np.zeros(n_samples), np.ones(n_fraud)])

# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Use only normal data for training
X_train_normal = X_train[y_train == 0]

# Standardize data
scaler = StandardScaler()
X_train_normal = scaler.fit_transform(X_train_normal)
X_test = scaler.transform(X_test)

# Create and train the autoencoder
input_dim = X_train_normal.shape[1]
autoencoder = build_autoencoder(input_dim, encoding_dim=10)

# Train the model
history = autoencoder.fit(
X_train_normal, X_train_normal,
epochs=20,
batch_size=64,
validation_split=0.1,
verbose=1
)

# Detect anomalies
reconstructions = autoencoder.predict(X_test)
mse = np.mean(np.power(X_test - reconstructions, 2), axis=1)

# Find threshold based on normal data distribution
threshold = np.percentile(mse[y_test == 0], 99) # Higher threshold to reduce false positives
predictions = mse > threshold

# Evaluate results
print("Confusion Matrix:")
print(confusion_matrix(y_test, predictions))
print("\nClassification Report:")
print(classification_report(y_test, predictions))

# Calculate financial impact
# Assume average fraudulent transaction is $200 and cost of investigating false positive is $10
true_positives = np.sum((predictions == 1) & (y_test == 1))
false_positives = np.sum((predictions == 1) & (y_test == 0))
false_negatives = np.sum((predictions == 0) & (y_test == 1))

savings = true_positives * 200 # Saved from detecting fraud
investigation_cost = false_positives * 10 # Cost of investigating false alarms
missed_fraud = false_negatives * 200 # Cost of undetected fraud

print(f"\nFinancial Impact Analysis:")
print(f"Money saved from detected fraud: ${savings:.2f}")
print(f"Cost of investigating false alarms: ${investigation_cost:.2f}")
print(f"Money lost from undetected fraud: ${missed_fraud:.2f}")
print(f"Net financial impact: ${savings - investigation_cost - missed_fraud:.2f}")

Improving Your Anomaly Detection Model

Here are several ways to improve your anomaly detection models:

  1. Feature Engineering: Create domain-specific features that might help identify anomalies.

  2. Ensemble Methods: Combine multiple anomaly detection techniques:

python
def ensemble_anomaly_detection(X, autoencoder, vae, isolation_forest, threshold_ae, threshold_vae, threshold_if):
# Get predictions from each model
# Autoencoder prediction
reconstructions = autoencoder.predict(X)
mse_ae = np.mean(np.power(X - reconstructions, 2), axis=1)
pred_ae = mse_ae > threshold_ae

# VAE prediction
mse_vae = compute_reconstruction_error(vae, X)
pred_vae = mse_vae > threshold_vae

# Isolation Forest prediction (sklearn model)
scores_if = isolation_forest.decision_function(X)
pred_if = scores_if < threshold_if

# Combine predictions (majority voting)
ensemble_pred = np.vstack([pred_ae, pred_vae, pred_if])
return np.sum(ensemble_pred, axis=0) >= 2 # At least 2 models must agree
  1. Dynamic Thresholding: Adjust thresholds based on time or context:
python
def dynamic_threshold(reconstruction_errors, window_size=100, percentile=95):
"""Calculate dynamic thresholds over sliding windows."""
thresholds = []
for i in range(len(reconstruction_errors)):
start = max(0, i - window_size)
window = reconstruction_errors[start:i+1]
if len(window) > 20: # Ensure enough data points
threshold = np.percentile(window, percentile)
thresholds.append(threshold)
else:
# Fall back to global threshold for beginning
thresholds.append(np.percentile(reconstruction_errors[:i+1], percentile))
return np.array(thresholds)

Summary

In this tutorial, you've learned how to:

  1. Implement anomaly detection using autoencoders in TensorFlow
  2. Use TensorFlow Probability for more advanced anomaly detection with VAEs
  3. Apply these techniques to a real-world fraud detection problem
  4. Evaluate and improve your anomaly detection models

Anomaly detection is a critical part of many machine learning systems, helping to identify unusual patterns that could indicate problems, opportunities, or security threats. The techniques we've covered provide a solid foundation for building effective anomaly detection systems using TensorFlow.

Additional Resources and Exercises

Resources

Exercises

  1. Time Series Anomaly Detection: Extend the techniques learned here to detect anomalies in time series data like stock prices or sensor readings.

  2. Image Anomaly Detection: Build a convolutional autoencoder to detect anomalies in image data (e.g., manufacturing defects).

  3. Compare Methods: Try implementing other anomaly detection methods like One-Class SVM or Isolation Forest and compare their performance with the autoencoder approaches.

  4. Hyperparameter Tuning: Experiment with different architectures and hyperparameters for your autoencoder to improve its performance.

  5. Real Dataset Challenge: Apply what you've learned to a real-world dataset such as the Credit Card Fraud Detection dataset from Kaggle.

Happy anomaly hunting!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)