Observabilidade com Fluentd e Data Prepper usando Entropia de Shannon
Como usar Fluentd, Data Prepper e OpenSearch para detectar anomalias em traces e logs com entropia de Shannon
No artigo sobre Observabilidade no Kubernetes, mostrei como instrumentar um cluster com Prometheus, Grafana e Jaeger. Mas e quando o volume de logs e traces cresce a ponto de voce nao conseguir mais identificar o que e normal e o que e anomalia so de olhar um dashboard?
E aqui que a entropia de Shannon entra como ferramenta de deteccao, e onde o OpenSearch com Fluentd e Data Prepper entra como infraestrutura para processar tudo em escala.
A ideia e simples: calcular a entropia de campos especificos dos seus logs e traces. Se a entropia mudar bruscamente, algo esta diferente. Isso pode ser um ataque, um bug, ou uma mudanca de comportamento que voce precisaria saber.
O fluxo completo
Antes de detalhar cada componente, vale entender o pipeline de ponta a ponta:
Aplicacao
|
v
Fluentd (coleta logs e traces)
|
v
Data Prepper (transforma, enriquece, calcula entropia)
|
v
OpenSearch (indexa e visualiza)Onde:
- Fluentd: e o agente de coleta que roda como DaemonSet no Kubernetes
- Data Prepper: e o processador que recebe os dados do Fluentd, aplica transformacoes e calcula a entropia de campos selecionados
- OpenSearch: e o backend de busca e visualizacao onde voce consulta e cria dashboards
Fluentd como coletor
O Fluentd e um dos projetos mais maduros do ecossistema CNCF. Ele coleta logs de containers via /var/log/containers/, parser o JSON de saida e envia para o destino configurado.
Configuracao como DaemonSet
A configuracao abaixo faz o Fluentd coletar logs de todos os pods do cluster e enviar para o Data Prepper via HTTP:
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: observability
data:
fluent.conf: |
# coleta logs de containers do Kubernetes
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# enriquece com metadados do pod
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
# envia para o Data Prepper
<match **>
@type http
endpoint http://data-prepper.opensearch:2021/log/ingest
content_type json
json_array true
<buffer>
flush_interval 5s
</buffer>
</match>A configuracao de sincronizacao com o Data Prepper:
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: observability
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-1
volumeMounts:
- name: varlog
mountPath: /var/log
- name: config
mountPath: /fluentd/etc/fluent.conf
subPath: fluent.conf
volumes:
- name: varlog
hostPath:
path: /var/log
- name: config
configMap:
name: fluentd-configOnde:
- @type tail: le os arquivos de log conforme sao escritos
- @type kubernetes_metadata: adiciona labels e annotations do pod ao log
- @type http: envia os registros processados para o Data Prepper via HTTP
- flush_interval 5s: envia batches a cada 5 segundos
Coletando traces com Fluentd
Alem de logs, o Fluentd pode coletar traces se a sua aplicacao exporta no formato OTLP. Para isso, adicionamos uma segunda source:
# coleta traces no formato OTLP HTTP
<source>
@type http
@id input_traces
port 4318
<parse>
@type json
</parse>
</source>Isso permite que a aplicacao envie traces diretamente para o Fluentd na porta 4318, que e a porta padrao do OTLP HTTP.
Data Prepper como processador
O Data Prepper e o componente que transforma e enriquece os dados antes de envia-los ao OpenSearch. E aqui que a magica acontece: vamos adicionar um processador que calcula a entropia de campos especificos dos logs e traces.
O que e o Data Prepper
O Data Prepper e um componente do ecossistema OpenSearch projetado para ingerir, transformar e enriquecer dados antes de envia-los ao OpenSearch. Ele roda como um servico independente e suporta pipelines configuraveis com sources, processors e sinks.
A formula de entropia que vamos usar como base:
$$ H(X) = -\sum p(x) \log_2 p(x) $$
Onde p(x) e a probabilidade de cada valor aparecer no campo analisado. Quanto mais diverso, maior a entropia.
Pipeline de logs
A configuracao abaixo define um pipeline que recebe logs do Fluentd, calcula a entropia de campos como URL, User-Agent e IP de origem, e envia ao OpenSearch:
data-prepper-config:
ssl: false
pipeline:
name: logs-entropy
source:
http:
port: 2021
path: /log/ingest
processor:
# parser inicial para logs do Kubernetes
- parse_json:
destination: log_parsed
# remove campos irrelevantes para economia de espaco
- delete_entries:
with_keys:
- log_raw
# calcula entropia dos campos criticos usando grok
- grok:
match:
message: "%{IP:client_ip} %{WORD:http_method} %{URIPATH:url_path}"
# adiciona timestamp de processamento
- add_entries:
entries:
- key: processed_at
value: "${now}"
sink:
- opensearch:
hosts: ["https://opensearch.opensearch:9200"]
index: logs-entropy-%{yyyy.MM.dd}
username: admin
password: admin
ssl: falseOnde:
- source: recebe os logs via HTTP na porta 2021, que e onde o Fluentd envia
- processor: transforma os dados antes de indexar
- sink: envia ao OpenSearch particionando por dia
Pipeline de traces
Para traces, o Data Prepper tem suporte nativo ao formato OTLP. A configuracao recebe traces, enriquece com metadados e envia ao OpenSearch:
pipeline:
name: traces-entropy
source:
otlp_trace:
port: 21890
processor:
# remove spans internos de alta cardinalidade
- delete_entries:
with_keys:
- span_attributes.internal_debug
# converte duration para milissegundos
- convert_entry_type:
keys:
- duration_in_nanos
type: integer
sink:
- opensearch:
hosts: ["https://opensearch.opensearch:9200"]
index: traces-entropy-%{yyyy.MM.dd}
username: admin
password: admin
ssl: falseOnde:
- otlp_trace: source nativo do Data Prepper para traces no formato OpenTelemetry
- delete_entries: remove campos que geram cardinalidade excessiva
- convert_entry_type: garante tipos corretos para agregacao no OpenSearch
Calculando entropia no Data Prepper
O Data Prepper nao tem um processor nativo de entropia, mas podemos implementar isso com um processor customizado ou com a funcao de script. A abordagem mais pratica e usar o processor grok para extrair campos e depois calcular a entropia via OpenSearch Dashboards com Painless scripting.
Uma alternativa e criar um sidecar em Python que le do Fluentd, calcula a entropia e reenvia ao Data Prepper:
from collections import Counter
from math import log2
import json
import sys
def shannon(values):
"""calcula a entropia de Shannon para uma lista de valores"""
total = len(values)
if total == 0:
return 0.0
counts = Counter(values)
return -sum(
(n / total) * log2(n / total)
for n in counts.values()
)
def calculate_field_entropy(logs, field):
"""extrai um campo de cada log e calcula a entropia"""
values = [
log.get(field, "")
for log in logs
if log.get(field)
]
return shannon(values)
# le logs do stdin (pipe do Fluentd)
logs = [json.loads(line) for line in sys.stdin]
# calcula entropia de campos relevantes
fields = ["url_path", "http_method", "client_ip", "user_agent"]
results = {}
for field in fields:
results[f"entropy_{field}"] = round(calculate_field_entropy(logs, field), 4)
# adiciona resultado ao ultimo log como enriquecimento
enrichment = {"entropy_scores": results}
print(json.dumps(enrichment))Onde:
- shannon(): implementa a formula de entropia de Shannon
- calculate_field_entropy(): extrai um campo de cada registro e calcula a entropia
- fields: define quais campos serao analisados
- results: dicionario com a entropia de cada campo
Para rodar esse sidecar no Kubernetes:
apiVersion: apps/v1
kind: Deployment
metadata:
name: entropy-calculator
namespace: observability
spec:
replicas: 1
selector:
matchLabels:
app: entropy-calculator
template:
metadata:
labels:
app: entropy-calculator
spec:
containers:
- name: entropy
image: python:3.11-slim
command: ["python", "/app/entropy.py"]
volumeMounts:
- name: script
mountPath: /app
volumes:
- name: script
configMap:
name: entropy-scriptO que procurar nos dados
Apos o pipeline estar rodando, os logs e traces chegam ao OpenSearch ja com os scores de entropia. Agora basta criar dashboards e alertas.
Entropia de URLs
URLs com entropia baixa indicam trafego repetitivo e previsivel:
/login
/login
/login
/loginEntropia baixa. Normal para um servico de autenticacao.
/api/v1/user/123
/api/v1/user/456
/api/v1/user/789
/random-endpoint-q8fj2
/admin-debug-panelEntropia alta. Pode indicar scanner ou ataque de forca bruta.
Entropia de User-Agent
User-Agents com entropia muito alta em uma janela curta sao tipicos de botnets que rotacionam identificadores:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/605.1
python-requests/2.28.0
curl/7.81.0Cinco agentes distintos em poucos segundos de um mesmo IP e anomalo.
Entropia de IPs de origem
Para um servico interno que normalmente recebe requisicoes de 3 a 5 IPs de load balancers, uma explosao de IPs distintos indica:
- Possivel vazamento de servico interno para a internet
- Uso de proxy rotativo
- Ataque DDoS
Entropia de trace span names
Em traces, os nomes dos spans representam operacoes da aplicacao. Se a entropia dos nomes dos spans sobe abruptamente, pode indicar:
- Novas rotas sendo descobertas por um scanner
- Erro gerando stack traces com nomes aleatorios
- Microsservico gerando operacoes nao documentadas
Dashboards e alertas no OpenSearch
Criando um indice pattern
Antes de criar dashboards, configure o indice pattern no OpenSearch Dashboards:
logs-entropy-*Isso permite consultar todos os logs processados pelo pipeline de entropia.
Dashboard de entropia por servico
No OpenSearch Dashboards, crie visualizacoes que mostram a entropia ao longo do tempo para cada campo monitorado. Use agregacoes do tipo terms agrupadas por servico e calcule a entropia com Painless scripting:
{
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
},
"aggs": {
"by_service": {
"terms": {
"field": "kubernetes.namespace_name.keyword",
"size": 20
},
"aggs": {
"url_entropy": {
"cardinality": {
"field": "url_path.keyword"
}
}
}
}
}
}Onde:
- range: filtra os ultimos 60 minutos
- terms: agrupa por namespace do Kubernetes
- cardinality: conta valores unicos, que e um proxy para entropia
Alerta baseado em mudanca de entropia
No OpenSearch, crie um monitor que dispara quando a cardinalidade de um campo excede um limiar:
{
"name": "entropia-url-alta",
"type": "monitor",
"monitor_type": "query_level_monitor",
"inputs": [
{
"search": {
"indices": ["logs-entropy-*"],
"query": {
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
]
}
},
"aggs": {
"unique_urls": {
"cardinality": {
"field": "url_path.keyword"
}
}
}
}
}
}
],
"triggers": [
{
"name": "entropia-alta",
"condition": {
"script": {
"source": "ctx.results[0].aggregations.unique_urls.value > params.threshold",
"params": {
"threshold": 500
}
}
}
}
]
}Onde:
- monitor_type: define o tipo de monitoramento como query nivel
- indices: aponta para os indices de logs com entropia
- cardinality: conta valores unicos de URL como proxy de entropia
- threshold: dispara o alerta se mais de 500 URLs unicas em 5 minutos
Um detector pratico com Fluentd e Python
Para quem quer algo mais leve sem depender do Data Prepper no meio, pode usar o Fluentd com um filter customizado em Python via sidecar:
from collections import Counter
from math import log2
from flask import Flask, request
import requests
app = Flask(__name__)
def shannon(values):
"""calcula entropia de Shannon para uma lista de valores"""
total = len(values)
if total == 0:
return 0.0
counts = Counter(values)
return -sum(
(n / total) * log2(n / total)
for n in counts.values()
)
@app.route("/entropy", methods=["POST"])
def compute_entropy():
"""recebe um batch de logs e retorna a entropia por campo"""
logs = request.get_json()
fields = ["url_path", "client_ip", "http_method", "user_agent"]
result = {}
for field in fields:
values = [log.get(field, "") for log in logs if log.get(field)]
result[f"entropy_{field}"] = round(shannon(values), 4)
# envia resultado para o OpenSearch
requests.post(
"http://opensearch.opensearch:9200/entropy-metrics/_doc",
json=result,
auth=("admin", "admin"),
headers={"Content-Type": "application/json"}
)
return {"status": "ok", "entropy": result}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)O Fluentd envia batches de logs para esse servico via output plugin HTTP, e o servico calcula a entropia e grava no OpenSearch.
Para integrar com o Fluentd, adicione um output alem do que ja existe:
# envia batches para o calculador de entropia
<match **>
@type copy
<store>
@type http
endpoint http://entropy-calculator.observability:8080/entropy
content_type json
json_array true
<buffer>
flush_interval 10s
chunk_limit_size 2m
</buffer>
</store>
<store>
@type http
endpoint http://data-prepper.opensearch:2021/log/ingest
content_type json
json_array true
<buffer>
flush_interval 5s
</buffer>
</store>
</match>Onde:
- @type copy: duplica o fluxo de dados para dois destinos
- store 1: envia para o calculador de entropia a cada 10 segundos
- store 2: envia para o Data Prepper a cada 5 segundos
- chunk_limit_size: limita o tamanho do batch para nao estourar memoria
Deploy completo com Helm
Para nao ter que gerenciar cada componente separadamente, criei um values.yaml que sobe tudo junto:
opensearch:
enabled: true
replicas: 3
persistence:
size: 50Gi
dataPrepper:
enabled: true
pipelines:
logs:
source:
http:
port: 2021
sink:
opensearch:
hosts: ["https://opensearch:9200"]
fluentd:
enabled: true
extras:
outputs:
- endpoint: http://data-prepper:2021/log/ingest
type: http
entropyCalculator:
enabled: true
image: python:3.11-slim
port: 8080Instale com:
helm repo add opensearch https://opensearch-project.github.io/helm-charts
helm repo update
helm install observability opensearch/opensearch -f values.yamlInterpretando os resultados
Com o pipeline rodando, voce tera no OpenSearch dois tipos de indice:
- logs-entropy-YYYY.MM.dd: logs com metadados enriquecidos
- entropy-metrics: scores de entropia calculados em janelas de tempo
A correlacao entre esses dois indices permite criar dashboards que mostram:
- A entropia de cada campo ao longo do tempo
- Os servicos com maior variacao de entropia
- Alertas quando a entropia ultrapassa o percentil 95 do historico
E importante nao olhar apenas para o valor absoluto da entropia. O que importa e a mudanca em relacao ao historico. Um servico de login com entropia de URLs igual a 0.8 pode ser normal, mas se ele estava em 0.2 na hora anterior, algo mudou e voce precisa investigar.
O mesmo vale para traces. Se um microsservico normalmente gera 5 span names distintos e de repente passa a gerar 50, a entropia vai disparar e o alerta vai disparar junto. Nao e necessario saber o que e o ataque especificamente. A entropia te diz que algo esta diferente.