Olympus Docs
CookbookDefensive security

Stream audit logs to Kafka

Push security events to a Kafka topic for downstream consumers

For SIEM, fraud detection, or compliance archive, you'll want audit logs streamed in real-time to Kafka rather than queried from Postgres.

Architecture

Kratos / Hydra / Athena → log to stdout

                podman/journald

                Vector / Fluent Bit

                  Kafka topic

        ┌───────────────┴──────────────┐
        ↓                              ↓
    SIEM (Splunk)              Long-term archive (S3)

Plus: direct from the security_audit Postgres table via Debezium for guaranteed durability.

Option A: Vector with podman logs

# /etc/vector/vector.toml
[sources.kratos_logs]
type = "docker_logs"
include_containers = ["ciam-kratos"]

[transforms.parse_kratos]
type = "remap"
inputs = ["kratos_logs"]
source = '''
. = parse_json!(.message)
.event_source = "kratos"
.timestamp = parse_timestamp!(.time, "%+")
'''

[transforms.filter_security]
type = "filter"
inputs = ["parse_kratos"]
condition = '''
includes(["login_succeeded", "login_failed", "password_changed", "mfa_enrolled",
          "mfa_removed", "recovery_started", "identity_created", "identity_deleted"],
          .msg)
'''

[sinks.kafka_audit]
type = "kafka"
inputs = ["filter_security"]
bootstrap_servers = "kafka1:9092,kafka2:9092"
topic = "olympus.audit"
encoding.codec = "json"
compression = "snappy"

Run:

podman run -d --name vector \
  -v /var/run/podman/podman.sock:/var/run/docker.sock:ro \
  -v /etc/vector/vector.toml:/etc/vector/vector.toml:ro \
  timberio/vector:latest-alpine

Option B: Debezium on security_audit

Use Postgres logical decoding to stream changes to Kafka via Debezium:

# debezium connector config
{
  "name": "olympus-audit",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "ciam-postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "...",
    "database.dbname": "olympus",
    "topic.prefix": "olympus",
    "table.include.list": "public.security_audit",
    "plugin.name": "pgoutput"
  }
}

Pros over Vector:

  • Captures every row, regardless of log format.
  • Transactional consistency.
  • Already-deduped (no double-write).

Cons:

  • Requires Postgres logical replication enabled.
  • More moving parts.

For most deployments, Vector on stdout is fine. Use Debezium when you must guarantee no event loss.

Topic strategy

Single topic with all events, or multiple?

Single topic (recommended):

  • olympus.audit, all events. Consumers filter.
  • Simpler.

Topic-per-type:

  • olympus.audit.login
  • olympus.audit.mfa
  • Better for high-volume consumers that only care about one type. More operational overhead.

Schema

Standardize the JSON event:

{
  "schema_version": "1.0",
  "event_id": "01HZ...",  // ulid
  "event_type": "login_succeeded",
  "timestamp": "2026-05-15T10:30:45.123Z",
  "actor": {
    "type": "identity",
    "id": "uuid"
  },
  "target": {
    "type": "identity",
    "id": "uuid"
  },
  "source": {
    "ip": "1.2.3.4",
    "user_agent": "Mozilla/...",
    "geo": "US-NY"
  },
  "outcome": "success",
  "metadata": {
    "method": "password",
    "aal": "aal1"
  }
}

Use a JSON Schema in the topic's schema registry (Confluent / AWS Glue) so consumers can validate.

Consumers

SIEM

Splunk, Datadog, Elastic, all have Kafka inputs. Subscribe to the topic, ingest.

# Splunk Connect for Kafka
kafka.topic: olympus.audit
splunk.indexes: olympus_audit

Archive

Long-term S3 bucket:

# Confluent S3 Sink Connector
connector.class: io.confluent.connect.s3.S3SinkConnector
topics: olympus.audit
s3.bucket.name: olympus-audit-archive
flush.size: 10000
format.class: io.confluent.connect.s3.format.json.JsonFormat

Rotate to Glacier after 90 days. Retain 7+ years for compliance.

Real-time analysis

A Kafka Streams / Flink job:

streams.stream("olympus.audit")
  .filter((k, v) -> v.event_type.equals("login_failed"))
  .groupBy((k, v) -> v.actor.id)
  .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
  .count()
  .filter((key, count) -> count >= 5)
  .toStream()
  .to("olympus.alerts.brute_force");

Detects brute-force in real-time, emits to an alert topic.

Reliability

  • Vector buffers locally if Kafka is down. Survives short outages.
  • Debezium has guaranteed delivery via Postgres WAL.
  • Kafka itself: replicate topic across 3 brokers (min.insync.replicas: 2, acks: all).

Cost

Kafka costs scale with throughput + retention.

Volume estimate:

  • 10k MAU, average 5 audit events/day/user = 50k events/day = ~5MB/day.
  • 100k MAU = 50MB/day.
  • 1M MAU = 500MB/day.

Even at 1M MAU, a 100GB topic with 7-day retention is fine. Managed Kafka costs (Confluent Cloud, MSK) start around $200/mo.

For < 100k MAU, run a single-broker Kafka if you don't already have one, or use Redpanda as a lighter alternative.

On this page