Conteúdos

Observabilidade com Fluentd e Data Prepper usando Entropia de Shannon

Como usar Fluentd, Data Prepper e OpenSearch para detectar anomalias em traces e logs com entropia de Shannon

No artigo sobre Observabilidade no Kubernetes, mostrei como instrumentar um cluster com Prometheus, Grafana e Jaeger. Mas e quando o volume de logs e traces cresce a ponto de voce nao conseguir mais identificar o que e normal e o que e anomalia so de olhar um dashboard?

E aqui que a entropia de Shannon entra como ferramenta de deteccao, e onde o OpenSearch com Fluentd e Data Prepper entra como infraestrutura para processar tudo em escala.

A ideia e simples: calcular a entropia de campos especificos dos seus logs e traces. Se a entropia mudar bruscamente, algo esta diferente. Isso pode ser um ataque, um bug, ou uma mudanca de comportamento que voce precisaria saber.

Antes de detalhar cada componente, vale entender o pipeline de ponta a ponta:

Aplicacao
    |
    v
Fluentd (coleta logs e traces)
    |
    v
Data Prepper (transforma, enriquece, calcula entropia)
    |
    v
OpenSearch (indexa e visualiza)

Onde:

  • Fluentd: e o agente de coleta que roda como DaemonSet no Kubernetes
  • Data Prepper: e o processador que recebe os dados do Fluentd, aplica transformacoes e calcula a entropia de campos selecionados
  • OpenSearch: e o backend de busca e visualizacao onde voce consulta e cria dashboards

O Fluentd e um dos projetos mais maduros do ecossistema CNCF. Ele coleta logs de containers via /var/log/containers/, parser o JSON de saida e envia para o destino configurado.

A configuracao abaixo faz o Fluentd coletar logs de todos os pods do cluster e enviar para o Data Prepper via HTTP:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: observability
data:
  fluent.conf: |
    # coleta logs de containers do Kubernetes
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_key time
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    # enriquece com metadados do pod
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>

    # envia para o Data Prepper
    <match **>
      @type http
      endpoint http://data-prepper.opensearch:2021/log/ingest
      content_type json
      json_array true
      <buffer>
        flush_interval 5s
      </buffer>
    </match>

A configuracao de sincronizacao com o Data Prepper:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: observability
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccountName: fluentd
      containers:
        - name: fluentd
          image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-1
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: config
              mountPath: /fluentd/etc/fluent.conf
              subPath: fluent.conf
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: config
          configMap:
            name: fluentd-config

Onde:

  • @type tail: le os arquivos de log conforme sao escritos
  • @type kubernetes_metadata: adiciona labels e annotations do pod ao log
  • @type http: envia os registros processados para o Data Prepper via HTTP
  • flush_interval 5s: envia batches a cada 5 segundos

Alem de logs, o Fluentd pode coletar traces se a sua aplicacao exporta no formato OTLP. Para isso, adicionamos uma segunda source:

# coleta traces no formato OTLP HTTP
<source>
  @type http
  @id input_traces
  port 4318
  <parse>
    @type json
  </parse>
</source>

Isso permite que a aplicacao envie traces diretamente para o Fluentd na porta 4318, que e a porta padrao do OTLP HTTP.

O Data Prepper e o componente que transforma e enriquece os dados antes de envia-los ao OpenSearch. E aqui que a magica acontece: vamos adicionar um processador que calcula a entropia de campos especificos dos logs e traces.

O Data Prepper e um componente do ecossistema OpenSearch projetado para ingerir, transformar e enriquecer dados antes de envia-los ao OpenSearch. Ele roda como um servico independente e suporta pipelines configuraveis com sources, processors e sinks.

A formula de entropia que vamos usar como base:

$$ H(X) = -\sum p(x) \log_2 p(x) $$

Onde p(x) e a probabilidade de cada valor aparecer no campo analisado. Quanto mais diverso, maior a entropia.

A configuracao abaixo define um pipeline que recebe logs do Fluentd, calcula a entropia de campos como URL, User-Agent e IP de origem, e envia ao OpenSearch:

data-prepper-config:
  ssl: false

pipeline:
  name: logs-entropy
  source:
    http:
      port: 2021
      path: /log/ingest
  processor:
    # parser inicial para logs do Kubernetes
    - parse_json:
        destination: log_parsed
    # remove campos irrelevantes para economia de espaco
    - delete_entries:
        with_keys:
          - log_raw
    # calcula entropia dos campos criticos usando grok
    - grok:
        match:
          message: "%{IP:client_ip} %{WORD:http_method} %{URIPATH:url_path}"
    # adiciona timestamp de processamento
    - add_entries:
        entries:
          - key: processed_at
            value: "${now}"
  sink:
    - opensearch:
        hosts: ["https://opensearch.opensearch:9200"]
        index: logs-entropy-%{yyyy.MM.dd}
        username: admin
        password: admin
        ssl: false

Onde:

  • source: recebe os logs via HTTP na porta 2021, que e onde o Fluentd envia
  • processor: transforma os dados antes de indexar
  • sink: envia ao OpenSearch particionando por dia

Para traces, o Data Prepper tem suporte nativo ao formato OTLP. A configuracao recebe traces, enriquece com metadados e envia ao OpenSearch:

pipeline:
  name: traces-entropy
  source:
    otlp_trace:
      port: 21890
  processor:
    # remove spans internos de alta cardinalidade
    - delete_entries:
        with_keys:
          - span_attributes.internal_debug
    # converte duration para milissegundos
    - convert_entry_type:
        keys:
          - duration_in_nanos
          type: integer
  sink:
    - opensearch:
        hosts: ["https://opensearch.opensearch:9200"]
        index: traces-entropy-%{yyyy.MM.dd}
        username: admin
        password: admin
        ssl: false

Onde:

  • otlp_trace: source nativo do Data Prepper para traces no formato OpenTelemetry
  • delete_entries: remove campos que geram cardinalidade excessiva
  • convert_entry_type: garante tipos corretos para agregacao no OpenSearch

O Data Prepper nao tem um processor nativo de entropia, mas podemos implementar isso com um processor customizado ou com a funcao de script. A abordagem mais pratica e usar o processor grok para extrair campos e depois calcular a entropia via OpenSearch Dashboards com Painless scripting.

Uma alternativa e criar um sidecar em Python que le do Fluentd, calcula a entropia e reenvia ao Data Prepper:

from collections import Counter
from math import log2
import json
import sys

def shannon(values):
    """calcula a entropia de Shannon para uma lista de valores"""
    total = len(values)
    if total == 0:
        return 0.0
    counts = Counter(values)

    return -sum(
        (n / total) * log2(n / total)
        for n in counts.values()
    )

def calculate_field_entropy(logs, field):
    """extrai um campo de cada log e calcula a entropia"""
    values = [
        log.get(field, "")
        for log in logs
        if log.get(field)
    ]

    return shannon(values)

# le logs do stdin (pipe do Fluentd)
logs = [json.loads(line) for line in sys.stdin]

# calcula entropia de campos relevantes
fields = ["url_path", "http_method", "client_ip", "user_agent"]
results = {}

for field in fields:
    results[f"entropy_{field}"] = round(calculate_field_entropy(logs, field), 4)

# adiciona resultado ao ultimo log como enriquecimento
enrichment = {"entropy_scores": results}

print(json.dumps(enrichment))

Onde:

  • shannon(): implementa a formula de entropia de Shannon
  • calculate_field_entropy(): extrai um campo de cada registro e calcula a entropia
  • fields: define quais campos serao analisados
  • results: dicionario com a entropia de cada campo

Para rodar esse sidecar no Kubernetes:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: entropy-calculator
  namespace: observability
spec:
  replicas: 1
  selector:
    matchLabels:
      app: entropy-calculator
  template:
    metadata:
      labels:
        app: entropy-calculator
    spec:
      containers:
        - name: entropy
          image: python:3.11-slim
          command: ["python", "/app/entropy.py"]
          volumeMounts:
            - name: script
              mountPath: /app
      volumes:
        - name: script
          configMap:
            name: entropy-script

Apos o pipeline estar rodando, os logs e traces chegam ao OpenSearch ja com os scores de entropia. Agora basta criar dashboards e alertas.

URLs com entropia baixa indicam trafego repetitivo e previsivel:

/login
/login
/login
/login

Entropia baixa. Normal para um servico de autenticacao.

/api/v1/user/123
/api/v1/user/456
/api/v1/user/789
/random-endpoint-q8fj2
/admin-debug-panel

Entropia alta. Pode indicar scanner ou ataque de forca bruta.

User-Agents com entropia muito alta em uma janela curta sao tipicos de botnets que rotacionam identificadores:

Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/605.1
python-requests/2.28.0
curl/7.81.0

Cinco agentes distintos em poucos segundos de um mesmo IP e anomalo.

Para um servico interno que normalmente recebe requisicoes de 3 a 5 IPs de load balancers, uma explosao de IPs distintos indica:

  • Possivel vazamento de servico interno para a internet
  • Uso de proxy rotativo
  • Ataque DDoS

Em traces, os nomes dos spans representam operacoes da aplicacao. Se a entropia dos nomes dos spans sobe abruptamente, pode indicar:

  • Novas rotas sendo descobertas por um scanner
  • Erro gerando stack traces com nomes aleatorios
  • Microsservico gerando operacoes nao documentadas

Antes de criar dashboards, configure o indice pattern no OpenSearch Dashboards:

logs-entropy-*

Isso permite consultar todos os logs processados pelo pipeline de entropia.

No OpenSearch Dashboards, crie visualizacoes que mostram a entropia ao longo do tempo para cada campo monitorado. Use agregacoes do tipo terms agrupadas por servico e calcule a entropia com Painless scripting:

{
  "size": 0,
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-1h"
      }
    }
  },
  "aggs": {
    "by_service": {
      "terms": {
        "field": "kubernetes.namespace_name.keyword",
        "size": 20
      },
      "aggs": {
        "url_entropy": {
          "cardinality": {
            "field": "url_path.keyword"
          }
        }
      }
    }
  }
}

Onde:

  • range: filtra os ultimos 60 minutos
  • terms: agrupa por namespace do Kubernetes
  • cardinality: conta valores unicos, que e um proxy para entropia

No OpenSearch, crie um monitor que dispara quando a cardinalidade de um campo excede um limiar:

{
  "name": "entropia-url-alta",
  "type": "monitor",
  "monitor_type": "query_level_monitor",
  "inputs": [
    {
      "search": {
        "indices": ["logs-entropy-*"],
        "query": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-5m"
                    }
                  }
                }
              ]
            }
          },
          "aggs": {
            "unique_urls": {
              "cardinality": {
                "field": "url_path.keyword"
              }
            }
          }
        }
      }
    }
  ],
  "triggers": [
    {
      "name": "entropia-alta",
      "condition": {
        "script": {
          "source": "ctx.results[0].aggregations.unique_urls.value > params.threshold",
          "params": {
            "threshold": 500
          }
        }
      }
    }
  ]
}

Onde:

  • monitor_type: define o tipo de monitoramento como query nivel
  • indices: aponta para os indices de logs com entropia
  • cardinality: conta valores unicos de URL como proxy de entropia
  • threshold: dispara o alerta se mais de 500 URLs unicas em 5 minutos

Para quem quer algo mais leve sem depender do Data Prepper no meio, pode usar o Fluentd com um filter customizado em Python via sidecar:

from collections import Counter
from math import log2
from flask import Flask, request
import requests

app = Flask(__name__)

def shannon(values):
    """calcula entropia de Shannon para uma lista de valores"""
    total = len(values)
    if total == 0:
        return 0.0
    counts = Counter(values)

    return -sum(
        (n / total) * log2(n / total)
        for n in counts.values()
    )

@app.route("/entropy", methods=["POST"])
def compute_entropy():
    """recebe um batch de logs e retorna a entropia por campo"""
    logs = request.get_json()

    fields = ["url_path", "client_ip", "http_method", "user_agent"]
    result = {}

    for field in fields:
        values = [log.get(field, "") for log in logs if log.get(field)]
        result[f"entropy_{field}"] = round(shannon(values), 4)

    # envia resultado para o OpenSearch
    requests.post(
        "http://opensearch.opensearch:9200/entropy-metrics/_doc",
        json=result,
        auth=("admin", "admin"),
        headers={"Content-Type": "application/json"}
    )

    return {"status": "ok", "entropy": result}

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

O Fluentd envia batches de logs para esse servico via output plugin HTTP, e o servico calcula a entropia e grava no OpenSearch.

Para integrar com o Fluentd, adicione um output alem do que ja existe:

# envia batches para o calculador de entropia
<match **>
  @type copy
  <store>
    @type http
    endpoint http://entropy-calculator.observability:8080/entropy
    content_type json
    json_array true
    <buffer>
      flush_interval 10s
      chunk_limit_size 2m
    </buffer>
  </store>
  <store>
    @type http
    endpoint http://data-prepper.opensearch:2021/log/ingest
    content_type json
    json_array true
    <buffer>
      flush_interval 5s
    </buffer>
  </store>
</match>

Onde:

  • @type copy: duplica o fluxo de dados para dois destinos
  • store 1: envia para o calculador de entropia a cada 10 segundos
  • store 2: envia para o Data Prepper a cada 5 segundos
  • chunk_limit_size: limita o tamanho do batch para nao estourar memoria

Para nao ter que gerenciar cada componente separadamente, criei um values.yaml que sobe tudo junto:

opensearch:
  enabled: true
  replicas: 3
  persistence:
    size: 50Gi

dataPrepper:
  enabled: true
  pipelines:
    logs:
      source:
        http:
          port: 2021
      sink:
        opensearch:
          hosts: ["https://opensearch:9200"]

fluentd:
  enabled: true
  extras:
    outputs:
      - endpoint: http://data-prepper:2021/log/ingest
        type: http

entropyCalculator:
  enabled: true
  image: python:3.11-slim
  port: 8080

Instale com:

helm repo add opensearch https://opensearch-project.github.io/helm-charts
helm repo update
helm install observability opensearch/opensearch -f values.yaml

Com o pipeline rodando, voce tera no OpenSearch dois tipos de indice:

  • logs-entropy-YYYY.MM.dd: logs com metadados enriquecidos
  • entropy-metrics: scores de entropia calculados em janelas de tempo

A correlacao entre esses dois indices permite criar dashboards que mostram:

  1. A entropia de cada campo ao longo do tempo
  2. Os servicos com maior variacao de entropia
  3. Alertas quando a entropia ultrapassa o percentil 95 do historico

E importante nao olhar apenas para o valor absoluto da entropia. O que importa e a mudanca em relacao ao historico. Um servico de login com entropia de URLs igual a 0.8 pode ser normal, mas se ele estava em 0.2 na hora anterior, algo mudou e voce precisa investigar.

O mesmo vale para traces. Se um microsservico normalmente gera 5 span names distintos e de repente passa a gerar 50, a entropia vai disparar e o alerta vai disparar junto. Nao e necessario saber o que e o ataque especificamente. A entropia te diz que algo esta diferente.