Observability for Data Pipelines — Grafana, OpenTelemetry, and What to Measure

Implement observability for data pipelines using OpenTelemetry traces, Prometheus metrics, and Grafana dashboards. Know what to measure and alert on.

· projects · 3 minutes

Observability for Data Pipelines — Grafana, OpenTelemetry, and What to Measure

Data pipeline observability is fundamentally different from application observability. Applications serve requests; pipelines transform data. The failure modes, the metrics that matter, and the debugging workflows are distinct. Here’s how I think about it.

The Three Pillars, Adapted for Data

The standard observability pillars are metrics, logs, and traces. For data pipelines, I’d reframe them:

Metrics: Pipeline runtime duration, record counts in vs. out, data freshness (time since last successful load), error rates, resource utilization (memory, CPU, slot usage). These answer “is my pipeline healthy right now?”

Logs: Structured logs from pipeline tasks, query execution details, error stack traces. These answer “what went wrong?”

Traces: End-to-end lineage of a data record from source to serving layer, including which pipeline stages it passed through and how long each took. These answer “where is the bottleneck?”

OpenTelemetry as the Foundation

OpenTelemetry (OTel) provides a vendor-neutral SDK for instrumenting your code. Instead of wiring directly to Grafana, Datadog, or Cloud Monitoring, you instrument with OTel and then configure an exporter for your backend. This decouples your code from your observability vendor.

For a Python pipeline:

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
# Setup tracing
trace_provider = TracerProvider()
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)
trace.set_tracer_provider(trace_provider)
# Setup metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=30000,
)
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
# Instrument your pipeline
tracer = trace.get_tracer("etl-pipeline")
meter = metrics.get_meter("etl-pipeline")
records_processed = meter.create_counter(
"pipeline.records.processed",
description="Total records processed",
)
pipeline_duration = meter.create_histogram(
"pipeline.duration.seconds",
description="Pipeline stage duration",
)

Instrumenting Pipeline Stages

import time
def extract(source_path: str) -> list[dict]:
with tracer.start_as_current_span("extract") as span:
span.set_attribute("source", source_path)
start = time.perf_counter()
data = read_from_source(source_path)
duration = time.perf_counter() - start
pipeline_duration.record(duration, {"stage": "extract"})
records_processed.add(len(data), {"stage": "extract"})
span.set_attribute("record_count", len(data))
return data

Each pipeline stage gets a span (for traces) and emits metrics (record counts, duration). When you view the trace in Grafana Tempo, you see the full pipeline execution as a waterfall — extract → transform → load — with timing for each stage.

The Grafana Stack

A common open-source observability stack:

  • Grafana — Dashboards and alerting
  • Prometheus — Metrics storage (or Grafana Mimir for scale)
  • Loki — Log aggregation
  • Tempo — Distributed tracing

The OTel Collector sits in front, receiving data from your pipelines and routing it to each backend. Your pipeline code only speaks OTLP (OpenTelemetry Protocol); the collector handles fan-out.

otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
exporters:
prometheus:
endpoint: 0.0.0.0:8889
loki:
endpoint: http://loki:3100/loki/api/v1/push
otlp/tempo:
endpoint: tempo:4317
service:
pipelines:
metrics:
receivers: [otlp]
exporters: [prometheus]
logs:
receivers: [otlp]
exporters: [loki]
traces:
receivers: [otlp]
exporters: [otlp/tempo]

What to Alert On

Not everything that’s measurable is worth alerting on. For data pipelines, I focus on:

  1. Data freshness. If the max(event_ts) in your serving table is older than your SLA, something is broken or delayed. This is the single most important metric for stakeholders.

  2. Record count anomalies. If today’s load has 90% fewer records than the trailing 7-day average, either the source is broken or your pipeline dropped data. Use a simple statistical threshold, not a fixed number.

  3. Pipeline failure. If a DAG run or Job fails after exhausting retries, alert immediately.

  4. Duration drift. If a pipeline that normally runs in 10 minutes starts taking 45, investigate before it becomes an outage.

What NOT to alert on: transient retries that succeed, minor record count variance, or infrastructure metrics that haven’t correlated with user-visible issues.

Data Quality as Observability

Observability doesn’t stop at infrastructure. Extend it to data quality: null rates, schema drift detection, distribution shifts in key columns. Tools like Great Expectations or dbt tests can emit metrics to the same Grafana stack, giving you a unified view of pipeline health and data health.

Takeaway: Instrument your pipelines with OpenTelemetry for vendor-neutral observability. Focus alerts on data freshness, record count anomalies, and failures — the signals that map to stakeholder impact. Grafana + Loki + Tempo gives you a complete open-source stack.


More posts