Friday, April 24, 2026
All posts
Lv.2 BeginnerData Engineering
30 min readLv.2 Beginner
SeriesData Engineering Playbook · Part 6/7View series hub

Data Engineering Playbook — Part 6: AI-Native Data Engineering (Feature Store, MLOps, RAG)

Data Engineering Playbook — Part 6: AI-Native Data Engineering (Feature Store, MLOps, RAG)

With over 85% of ML projects failing to reach production, the data engineer's core role in 2026 is eliminating training-serving skew with Feature Stores, automating model lifecycle with MLOps, and powering LLMs with real-time context through RAG pipelines. This guide covers everything from Point-in-Time join implementation and MLflow experiment tracking to vector DB selection strategies and agentic pipeline guardrails — with production-ready code throughout.

Series Overview

  • Part 1 — Overview & 2026 Key Trends (published)
  • Part 2 — Data Architecture Design (published)
  • Part 3 — Building Data Pipelines (published)
  • Part 4 — Data Quality & Governance (published)
  • Part 5 — Cloud & Infrastructure (FinOps, IaC) (published)
  • Part 6 — AI-Native Data Engineering (Feature Store, MLOps, RAG) (current)
  • Part 7 — DataOps & Team Operations Playbook (coming soon)

Table of Contents

  1. 2026: Data Engineering Becomes AI Infrastructure
  2. Feature Store — The Data Supply Chain for ML
  3. MLOps — Automating the Model Lifecycle
  4. Data Pipelines for LLMs
  5. RAG Pipeline Design in Practice
  6. Vector Database Selection Guide
  7. Agentic Data Pipelines
  8. Accelerating Data Engineering with AI Copilots
  9. AI Data Pipeline Governance
  10. Production Checklist

1. 2026: Data Engineering Becomes AI Infrastructure

Data engineering's role has fundamentally expanded in 2026. What began as moving and transforming data has evolved into designing and operating the foundational infrastructure on which entire AI systems run.

According to Gartner's 2025 AI report, more than 85% of ML projects never reach production, and fewer than 40% of those that do sustain business value beyond 12 months. The primary cause of failure isn't the models themselves — it's the absence of data infrastructure.

The core AI infrastructure components that data engineers are now responsible for:

ComponentRole
Feature StoreSingle source of truth for training and serving features
Data VersioningDataset version tracking for model reproducibility
RAG PipelineInfrastructure supplying real-time context to LLMs
Vector DatabaseEmbedding storage and semantic search
Feature ServingSub-millisecond real-time feature delivery
ML Pipeline OrchestrationAutomated training, evaluation, and deployment
Model MonitoringData drift detection and performance degradation alerts

2. Feature Store

What is a Feature Store?

A Feature Store is a system that centrally stores, manages, and serves ML features. Features are the input variables ML models use for prediction — values like "purchases in the last 30 days" or "average user session duration."

Without a Feature Store, three critical problems emerge:

Problem 1: Training-Serving Skew
  Training:  user_age = (current date - birth date) / 365
  Serving:   user_age = raw database age column (integer)
  → Same concept computed differently → model performance degrades

Problem 2: Feature Redundancy
  Team A: computes average purchase amount (Spark job)
  Team B: computes the same feature (different SQL)
  → Duplicate compute costs + risk of divergent values

Problem 3: Data Leakage
  Future data included in training causes overfitting
  → Feature Store's Point-in-Time join prevents this

Feature Store Architecture

Point-in-Time Join — Preventing Data Leakage

The cornerstone Feature Store capability. During model training, only the feature values that existed at the exact timestamp of each training record are used.

# Point-in-Time join example using Feast

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# Training data: retrieve customer features as of each order's timestamp
# (prevents future data inclusion)
training_df = pd.DataFrame({
    "customer_id": ["c001", "c002", "c003"],
    "event_timestamp": pd.to_datetime([
        "2026-01-15 10:00:00",
        "2026-02-20 14:30:00",
        "2026-03-05 09:15:00",
    ]),
    "label": [1, 0, 1]  # churn indicator
})

# Feast automatically retrieves feature values as of each timestamp
feature_vector = store.get_historical_features(
    entity_df=training_df,
    features=[
        "customer_stats:purchase_count_30d",   # purchases in last 30 days
        "customer_stats:avg_order_value",       # average order value
        "customer_stats:days_since_last_order", # days since last order
        "customer_stats:total_lifetime_value",  # total lifetime value
    ]
).to_df()

print(feature_vector)
# customer_id | event_timestamp | purchase_count_30d | avg_order_value | ...
# c001        | 2026-01-15      | 3                 | 45.20           | ...

Real-Time Feature Serving

# Real-time feature retrieval at inference

from feast import FeatureStore

store = FeatureStore(repo_path=".")

def predict_churn(customer_id: str) -> float:
    """Real-time churn prediction — fetches latest features from Feature Store"""

    # Fetch features from online store (Redis) in milliseconds
    feature_vector = store.get_online_features(
        features=[
            "customer_stats:purchase_count_30d",
            "customer_stats:avg_order_value",
            "customer_stats:days_since_last_order",
        ],
        entity_rows=[{"customer_id": customer_id}]
    ).to_dict()

    # Model inference
    prediction = churn_model.predict([
        feature_vector["purchase_count_30d"][0],
        feature_vector["avg_order_value"][0],
        feature_vector["days_since_last_order"][0],
    ])

    return float(prediction[0])

Feature Store Tool Comparison

ToolCharacteristicsBest ForOpen Source
FeastLightweight, broad store support, active communityTeams building their own infra
TectonEnterprise-grade, real-time transforms, Databricks integrationEnterprise, strong real-time requirements
Databricks Feature StoreFull Lakehouse integration, Unity Catalog supportDatabricks-native environmentsPartial
Vertex AI Feature StoreGCP managed, deep Vertex AI integrationGCP-centric environments
HopsworksOpen-source full-stack, real-time + batchMid-to-large teams wanting self-hosted

3. MLOps

The 7-Stage MLOps Pipeline

MLOps integrates ML development (Dev) and operations (Ops) to deploy and operate ML systems reliably and repeatably. According to Gartner, organizations with mature MLOps practices ship ML features 4x faster and detect model drift 28x faster than those without.

MLflow in Practice — Experiment Tracking

# MLflow experiment tracking — logging every training run

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score

mlflow.set_experiment("churn-prediction-v2")

with mlflow.start_run(run_name="rf-baseline"):

    # ① Log hyperparameters
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "min_samples_split": 10,
        "class_weight": "balanced",
    }
    mlflow.log_params(params)

    # ② Log data version (for reproducibility)
    mlflow.log_param("dataset_version", "v2026-04-19")
    mlflow.log_param("feature_store_snapshot", "20260419-0600")

    # ③ Train model
    model = RandomForestClassifier(**params, random_state=42)
    model.fit(X_train, y_train)

    # ④ Log performance metrics
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "roc_auc": roc_auc_score(y_test, y_prob),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
    }
    mlflow.log_metrics(metrics)

    # ⑤ Save model artifact with schema (for governance)
    mlflow.sklearn.log_model(
        model,
        artifact_path="churn-model",
        registered_model_name="CustomerChurnPredictor",
        signature=mlflow.models.infer_signature(X_train, y_pred),
    )

    print(f"ROC-AUC: {metrics['roc_auc']:.4f}")

Model Registry — Stage Management

# MLflow model registry: promoting Staging → Production

from mlflow.tracking import MlflowClient

client = MlflowClient()

def promote_to_production(model_name: str, run_id: str, min_auc: float = 0.85):
    """Promote model from Staging → Production if performance threshold is met"""
    run = client.get_run(run_id)
    auc = run.data.metrics.get("roc_auc", 0)

    if auc < min_auc:
        raise ValueError(f"ROC-AUC {auc:.4f} < threshold {min_auc}. Promotion rejected.")

    # Archive current Production model
    current_prod = client.get_latest_versions(model_name, stages=["Production"])
    for version in current_prod:
        client.transition_model_version_stage(
            name=model_name,
            version=version.version,
            stage="Archived",
            archive_existing_versions=False
        )

    # Promote new version to Production
    latest = client.get_latest_versions(model_name, stages=["Staging"])
    client.transition_model_version_stage(
        name=model_name,
        version=latest[0].version,
        stage="Production",
        archive_existing_versions=True
    )

    print(f"✅ Model v{latest[0].version} → Production (AUC: {auc:.4f})")

Model Monitoring — Drift Detection

# Data and model drift monitoring with Evidently AI

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import *

def monitor_model_drift(reference_df, current_df, output_path: str):
    """Weekly drift report — scheduled by Airflow"""
    report = Report(metrics=[
        DataDriftPreset(),      # detect feature distribution shifts
        TargetDriftPreset(),    # detect prediction/target distribution shifts
        DataQualityPreset(),    # data quality score
        ClassificationPreset(), # classification model performance metrics
    ])

    report.run(reference_data=reference_df, current_data=current_df)
    report.save_html(output_path)

    result = report.as_dict()
    drift_detected = result["metrics"][0]["result"]["dataset_drift"]

    if drift_detected:
        trigger_retraining_pipeline()
        send_slack_alert(
            channel="#ml-alerts",
            message="⚠️ Data drift detected — model retraining pipeline started"
        )

    return drift_detected

MLOps Core Tool Stack 2026

AreaTools
Data VersioningDVC, lakeFS, Delta Lake Time Travel
Experiment TrackingMLflow 3 (GenAI features included), Weights & Biases
Feature StoreFeast, Tecton, Databricks Feature Store
Model RegistryMLflow Registry, Hugging Face Hub
ML PipelinesKubeflow, Metaflow, Vertex AI Pipelines
Model ServingBentoML, Ray Serve, Seldon, FastAPI
MonitoringEvidently AI, WhyLabs, Arize AI

4. Data Pipelines for LLMs

Data Engineering in the LLM Era

LLMs have expanded the data engineer's responsibility from structured data to everything — including text, PDFs, images, audio, and video.

"The quality of what AI produces is dependent on the quality of the data it learns from or references. Without good data pipelines, there is no good AI."

LLM Pre-Training Data Pipeline

  1. Raw Data Collection — web crawling / public datasets
  2. Deduplication — MinHash / SimHash to remove identical and near-duplicate documents
  3. Quality Filtering — remove short documents, spam, ads, adult content; detect target language
  4. PII Removal — mask emails, phone numbers, identification numbers
  5. Tokenization — BPE / SentencePiece to convert text → tokens
  6. Dataset Versioning — DVC / lakeFS
  7. Storage — S3 / GCS (Parquet / JSONL)

Fine-Tuning Data Pipeline

# High-quality dataset construction pipeline for fine-tuning

import json
from dataclasses import dataclass
from typing import Optional

@dataclass
class FineTuningExample:
    """Fine-tuning training data format (Instruction Fine-tuning)"""
    system: str
    instruction: str
    input: Optional[str]
    output: str
    source: str           # data provenance (traceability)
    quality_score: float  # quality score (0.0-1.0)


def build_finetuning_dataset(raw_examples: list) -> list:
    """Raw examples → fine-tuning dataset transformation pipeline"""
    pipeline = [
        filter_low_quality,     # remove quality_score < 0.7
        remove_pii,             # mask PII
        deduplicate,            # remove near-duplicate examples
        validate_format,        # format validation
        compute_quality_score,  # re-score quality
    ]

    processed = raw_examples
    for step in pipeline:
        before = len(processed)
        processed = [step(ex) for ex in processed if ex is not None]
        processed = [ex for ex in processed if ex is not None]
        after = len(processed)
        print(f"[{step.__name__}] {before} → {after} (removed {before-after})")

    return processed


def filter_low_quality(example: dict) -> Optional[dict]:
    if len(example.get("output", "")) < 50:
        return None
    if example.get("quality_score", 0) < 0.7:
        return None
    return example


def remove_pii(example: dict) -> dict:
    """Mask personally identifiable information"""
    import re
    pii_patterns = {
        r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b': '[EMAIL]',
        r'\b\d{3}-\d{4}-\d{4}\b': '[PHONE]',
        r'\b\d{6}-\d{7}\b': '[ID_NUMBER]',
    }
    for field in ["instruction", "input", "output"]:
        if field in example and example[field]:
            for pattern, replacement in pii_patterns.items():
                example[field] = re.sub(pattern, replacement, example[field])
    return example

5. RAG Pipeline Design in Practice

What is RAG (Retrieval-Augmented Generation)?

RAG is a pattern where an LLM, instead of relying solely on its model weights, retrieves relevant information in real time from an external knowledge base and uses it as context when generating responses.

Why RAG? Compared to fine-tuning an LLM from scratch, RAG requires no model retraining when knowledge changes, enables citation of specific source documents for fact verification (grounding), and is dramatically more cost-effective.

RAG Pipeline — Two-Stage Architecture

Production RAG Pipeline Implementation

# production_rag.py
# Enterprise-grade RAG pipeline implementation

from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Weaviate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
import weaviate

# --- 1단계: Indexing Pipeline ---

class DocumentIndexer:
    def __init__(self, weaviate_client, embedding_model):
        self.client = weaviate_client
        self.embeddings = embedding_model
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=800,    # balances LLM context vs. retrieval precision
            chunk_overlap=100, # prevents information loss at chunk boundaries
            separators=["\n\n", "\n", ".", " "],
        )

    def index_documents(self, documents: list, collection_name: str):
        """Chunk → embed → store documents in vector DB"""
        chunks = []
        for doc in documents:
            splits = self.splitter.split_text(doc["content"])
            for i, chunk in enumerate(splits):
                chunks.append({
                    "content": chunk,
                    "source": doc["source"],
                    "doc_id": doc["id"],
                    "chunk_index": i,
                    "updated_at": doc.get("updated_at"),
                })

        # Incremental update: skip already-indexed documents
        new_chunks = self._filter_new_chunks(chunks, collection_name)
        if new_chunks:
            Weaviate.from_texts(
                texts=[c["content"] for c in new_chunks],
                embedding=self.embeddings,
                client=self.client,
                index_name=collection_name,
                metadatas=[{k: v for k, v in c.items() if k != "content"}
                           for c in new_chunks]
            )
            print(f"✅ Indexed {len(new_chunks)} chunks")


# --- 2단계: Retrieval & Generation Pipeline ---

class RAGChain:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.retriever = vectorstore.as_retriever(
            search_type="hybrid",
            search_kwargs={
                "k": 6,       # number of chunks to retrieve
                "alpha": 0.7, # 0=keyword, 1=vector (0.7 = vector-dominant)
            }
        )

    def build_chain(self):
        """Build the RAG chain"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an internal data analytics expert.
Answer exclusively based on the context provided below.
If the information is not in the context, respond with "I cannot find that information."
Always cite the source document(s) supporting your answer.

Context:
{context}"""),
            ("human", "{question}")
        ])

        return (
            {"context": self.retriever | self._format_docs,
             "question": RunnablePassthrough()}
            | prompt
            | self.llm
        )

    def _format_docs(self, docs):
        formatted = []
        for doc in docs:
            source = doc.metadata.get("source", "Unknown")
            formatted.append(f"[Source: {source}]\n{doc.page_content}")
        return "\n\n---\n\n".join(formatted)

    def query(self, question: str) -> dict:
        chain = self.build_chain()
        retrieved_docs = self.retriever.invoke(question)
        response = chain.invoke(question)

        return {
            "answer": response.content,
            "sources": [doc.metadata.get("source") for doc in retrieved_docs],
            "retrieved_chunks": len(retrieved_docs),
        }

Chunking Strategy Guide

Optimal chunking strategy by document type:

① Narrative documents (wikis, reports)
   → RecursiveCharacterTextSplitter
   → chunk_size=800-1,200, overlap=100-150
   → respects paragraph boundaries

② Structured documents (FAQ, manuals)
   → structure-based splitting (section/header units)
   → Markdown Header Text Splitter

③ Code documentation
   → split by function/class
   → Code Splitter (language-aware)

④ Documents containing tables
   → separate tables into dedicated chunks
   → keep each table as a single chunk

Chunk size optimization principles:
  Too small: context fragmentation → poor retrieval relevance
  Too large: noise increases → wastes LLM context window
  Practical recommendation: 500-1,000 tokens (adjust by document type)

6. Vector Database Selection Guide

What is a Vector DB?

A vector database stores embeddings (high-dimensional floating-point vectors) and performs semantic similarity search (ANN, Approximate Nearest Neighbor) in milliseconds.

2026 Vector DB Comparison

DBArchitectureScaleManagedStrengthsBest For
pgvectorPostgreSQL extension~50M vectorsAWS RDS, SupabaseSQL integration, familiar opsAlready using PostgreSQL, under 10M vectors
PineconeManaged cloud-onlyHundreds of millions✅ Fully managedZero ops burden, fast startStart without infra investment
WeaviateOpen-source distributedHundreds of millions✅ (Weaviate Cloud)Built-in embedding modules, GraphQLMultimodal, self-hosting flexibility
QdrantRust-based open-sourceHundreds of millions✅ (Qdrant Cloud)High performance, excellent filteringHigh-perf requirements, K8s capable
MilvusOpen-source distributedBillions✅ (Zilliz)Massive-scale expansion1B+ vectors
ChromaLightweight localMillionsSimple install, fast prototypingDev/test, PoC

Vector DB Decision Tree

pgvector in Practice

-- Install and use pgvector

CREATE EXTENSION IF NOT EXISTS vector;

-- Create embeddings table (1536 dimensions = OpenAI text-embedding-3-small)
CREATE TABLE document_embeddings (
    id          BIGSERIAL PRIMARY KEY,
    content     TEXT NOT NULL,
    embedding   VECTOR(1536),
    source      TEXT,
    doc_id      TEXT,
    created_at  TIMESTAMP DEFAULT NOW(),
    department  TEXT,
    language    TEXT DEFAULT 'en'
);

-- Create HNSW index (critical for ANN search performance)
-- m: connectivity (accuracy vs. memory), ef_construction: index quality
CREATE INDEX ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Semantic similarity search
SELECT
    content,
    source,
    1 - (embedding <=> $1::vector) AS similarity_score
FROM document_embeddings
WHERE department = 'finance'
  AND language = 'en'
ORDER BY embedding <=> $1::vector
LIMIT 5;

-- Hybrid search (vector + full-text combined)
SELECT
    content,
    source,
    (1 - (embedding <=> $1::vector)) * 0.7 +
    ts_rank(to_tsvector('english', content),
            plainto_tsquery('english', $2)) * 0.3
    AS combined_score
FROM document_embeddings
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
ORDER BY combined_score DESC
LIMIT 10;

7. Agentic Data Pipelines

How Agentic AI Is Reshaping Data Engineering

One of the most significant shifts in 2026 is the integration of agentic AI into data pipelines. Traditional pipelines execute pre-defined scripts; agentic pipelines perceive their environment, make decisions, and act autonomously.

In March 2026, Databricks released Genie Code — an agent capable of autonomously building pipelines, debugging failures, deploying dashboards, and maintaining production systems.

Self-Healing Pipeline Pattern

# self_healing_pipeline.py
# AI agent autonomously diagnoses and recovers from pipeline failures

import anthropic
import json

client = anthropic.Anthropic()

PIPELINE_TOOLS = [
    {
        "name": "get_pipeline_logs",
        "description": "Retrieve pipeline execution logs",
        "input_schema": {
            "type": "object",
            "properties": {
                "pipeline_id": {"type": "string"},
                "lines": {"type": "integer", "default": 100}
            },
            "required": ["pipeline_id"]
        }
    },
    {
        "name": "check_data_quality",
        "description": "Check data quality metrics",
        "input_schema": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string"},
                "date": {"type": "string"}
            },
            "required": ["table_name"]
        }
    },
    {
        "name": "restart_pipeline",
        "description": "Restart the pipeline. Use only after root cause analysis.",
        "input_schema": {
            "type": "object",
            "properties": {
                "pipeline_id": {"type": "string"},
                "backfill_date": {"type": "string", "description": "backfill start date"}
            },
            "required": ["pipeline_id"]
        }
    },
    {
        "name": "escalate_to_human",
        "description": "Escalate complex issues to a human operator",
        "input_schema": {
            "type": "object",
            "properties": {
                "issue_summary": {"type": "string"},
                "severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]}
            },
            "required": ["issue_summary", "severity"]
        }
    }
]


def diagnose_and_heal(pipeline_id: str, error_message: str):
    """Agent autonomously diagnoses pipeline failure and attempts recovery"""
    messages = [
        {
            "role": "user",
            "content": f"""Pipeline '{pipeline_id}' has encountered an error.

Error message: {error_message}

Diagnose in this order:
1. Retrieve pipeline logs to identify the root cause
2. Check whether it is a data quality issue
3. If automatic recovery is safe, restart the pipeline
4. If it is a complex issue, escalate to a human

Log all actions and your reasoning."""
        }
    ]

    # Agent loop — runs until resolved or escalated
    max_iterations = 5
    for iteration in range(max_iterations):
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            tools=PIPELINE_TOOLS,
            messages=messages
        )

        if response.stop_reason == "end_turn":
            print(f"✅ Agent diagnosis complete: {response.content[0].text}")
            break

        if response.stop_reason == "tool_use":
            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    print(f"🔧 Agent action: {block.name}({block.input})")
                    result = execute_tool(block.name, block.input)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result)
                    })

                    if block.name == "escalate_to_human":
                        send_pagerduty_alert(block.input)
                        return

            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})

Limitations and Guardrails for Agentic Pipelines

Agentic systems are powerful, but every autonomous action requires clear audit trails and human escalation paths.

Agentic pipeline governance principles:

✅ Permitted (low risk)
  - Log retrieval, status checks
  - Sending alerts
  - Retry within retry limits

⚠️ Conditionally permitted (medium risk)
  - Schedule modifications
  - Automatic backfill (with range limits)
  - Query optimization suggestions (human approval before execution)

❌ Prohibited (high risk — human decision required)
  - Deleting production data
  - Schema changes
  - Access permission changes
  - Provisioning resources beyond cost threshold

8. Accelerating Data Engineering with AI Copilots

State of AI Copilots in 2026

AI Copilots and autonomous workflows have become standard components of the data engineering toolkit. Key strengths of AI-assisted tooling in 2026:

  • AI pair programming (GitHub Copilot, Snowflake Cortex Code): 30-40% reduction in development time
  • Automated anomaly detection: AI models analyze query execution logs, data distributions, and lineage graphs to surface early warning signals
  • dbt Copilot: natural language SQL model generation, optimization suggestions, automated documentation
  • Databricks Genie Code: agent autonomously builds complex pipelines, debugs, and deploys

AI Copilot in Practice

-- Natural language request:
-- "Create a dbt model that aggregates revenue by month and country from the orders table.
--  Exclude cancelled orders and include month-over-month growth rate."

-- AI-generated dbt model: models/marts/fct_monthly_revenue_by_country.sql

with orders as (
    select * from {{ ref('stg_orders') }}
    where order_status != 'cancelled'
),

monthly_revenue as (
    select
        DATE_TRUNC('month', ordered_at) as month,
        country,
        SUM(order_amount_usd)           as revenue,
        COUNT(*)                        as order_count
    from orders
    group by 1, 2
),

with_growth as (
    select
        month,
        country,
        revenue,
        order_count,
        LAG(revenue) OVER (
            PARTITION BY country
            ORDER BY month
        )                               as prev_month_revenue,
        ROUND(
            (revenue - LAG(revenue) OVER (
                PARTITION BY country ORDER BY month
            )) / NULLIF(LAG(revenue) OVER (
                PARTITION BY country ORDER BY month
            ), 0) * 100, 2
        )                               as mom_growth_pct
    from monthly_revenue
)

select * from with_growth

Validating AI Copilot Output

Validation checklist for AI Copilot output:

□ Does the business logic match requirements?
□ Are JOIN conditions correct? (risk of data fan-out or data loss)
□ Is NULL handling correct?
□ Are there timezone issues in date/time handling?
□ Is the GROUP BY column set complete?
□ Are PARTITION BY and ORDER BY in window functions as intended?
□ Is the query efficient, leveraging indexes or partitions?

Principle: AI writes the first draft; humans verify and own it.

9. AI Data Pipeline Governance

Requirements for AI-Ready Data

The quality of ML model outputs is directly tied to the quality of training and inference data. AI-ready data refers to the properties data must have for AI to produce trustworthy results.

AI-Ready Data Checklist:

① Completeness: Are all required fields populated?
   → NULL rate < 0.1% (for training data)

② Free of bias: Is sampling free of skew toward specific groups?
   → Review sample distribution across demographics, regions, time zones

③ Label quality: For classification models, are labels accurate?
   → Human review + cross-validation required

④ Temporal consistency: Are feature distributions similar between
   training and serving data?
   → Continuously monitored via drift monitoring

⑤ PII removal: Is personally identifiable information removed from
   training data?
   → EU AI Act compliance requirement

⑥ Data lineage: Is the provenance and transformation history of
   training data traceable?
   → Essential for model audits and regulatory compliance

Model Card — The Data Engineer's Role

# model_card.yaml
# Sections a data engineer must complete at model deployment

model_name: "CustomerChurnPredictor-v2.3"

training_data:
  description: "Subscriber purchase history from 2024-01 to 2026-03"
  source: "gold.fct_customer_orders"
  size:
    rows: 2_450_000
    positive_samples: 312_000
    negative_samples: 2_138_000
  class_balance: "1:6.8 (churn:retained)"
  data_version: "snapshot-2026-04-01"

  limitations:
    - "Excludes customers with fewer than 30 days since signup"
    - "Does not include B2B customers (separate model required)"
    - "Does not reflect churn patterns prior to 2020"

  pii_handling:
    fields_removed: ["customer_name", "email", "phone"]
    fields_tokenized: ["customer_id"]
    compliance: ["GDPR", "Data Protection Act"]

metrics:
  overall:
    roc_auc: 0.891
    precision: 0.784
    recall: 0.823
  by_segment:
    premium_customers:
      roc_auc: 0.921
    standard_customers:
      roc_auc: 0.867

monitoring:
  drift_threshold: 0.15
  retraining_trigger: "weekly AUC < 0.85"
  data_freshness_sla: "24 hours"

10. Production Checklist

Feature Store

  • Are the offline store (training) and online store (serving) separated?
  • Does Point-in-Time join prevent data leakage?
  • Are feature definitions version-controlled in code?
  • Is the same feature transformation logic used for training and serving?
  • Is feature drift monitoring configured?

MLOps

  • Are all experiments tracked with MLflow or W&B?
  • Is the training dataset version-controlled (DVC/lakeFS)?
  • Is there a Staging → Production promotion workflow in the model registry?
  • Is the model deployment CI/CD pipeline automated?
  • Are data drift and model performance degradation monitored?
  • Are retraining trigger conditions defined?

RAG Pipeline

  • Is the chunking strategy optimized for the document type?
  • Is metadata filtering configured in the vector DB?
  • Is hybrid search (vector + keyword) applied?
  • Is incremental index update automated when documents change?
  • Is source citation implemented in RAG responses?
  • Are there prompt guidelines to prevent hallucination?

Vector Database

  • Is the right DB selected for the expected vector count and QPS?
  • Are HNSW index parameters tuned for the accuracy-performance trade-off?
  • Is there a re-indexing plan for embedding model upgrades?

Agentic AI

  • Is the permitted action scope (allowed/prohibited) clearly defined?
  • Are all autonomous actions recorded in audit logs?
  • Are human escalation paths and trigger conditions configured?

AI Data Governance

  • Is PII fully removed or masked from training data?
  • Is a Model Card authored and maintained?
  • Is training data lineage traceable?
  • Is bias checking included in the training pipeline?
  • Has compliance with EU AI Act and relevant regulations been reviewed?

Closing Thoughts

In 2026, the data engineer's role has evolved from "pipeline plumber" to "AI infrastructure architect."

Delivering features reliably, automating the model lifecycle, building RAG infrastructure that gives LLMs real-time context, designing governance frameworks for safe agentic AI operation — all of this is now the modern data engineer's responsibility.

One principle matters more than any technology: AI writes the first draft; humans verify and own it. No matter how advanced automation becomes, the ultimate accountability for data trustworthiness and the fairness of AI decisions belongs to people.

The final part of the playbook covers how to sustain all of this within teams and organizations — DataOps culture and team operations.


Part 7 Preview: DataOps & Team Operations Playbook (Final Part)

  • Building a DataOps culture — treating pipelines as products
  • Data engineering team structure and role design
  • On-call operations and runbook authoring
  • Defining and measuring data SLA/SLO/SLI
  • Data team career paths and skills development roadmap
  • Data engineering outlook for 2026-2028

Written: April 2026 | References: Databricks, MLflow, Gartner AI Report 2025, Evidently AI, Atlan, KDnuggets

Share This Article

Series Navigation

Data Engineering Playbook

6 / 7 · 6

Explore this topic·Start with featured series

한국어

Follow new posts via RSS

Until the newsletter opens, RSS is the fastest way to get updates.

Open RSS Guide