gladilov.org.ru gladilov.org.ua

Проброс событий в ClickHouse с использованием Vector

От устройства снаружи ИС на фронтальные балансировщики приходят сообщения в локейшн

/<device_location>/status

Файл журнала

/var/log/<device_access_log_file_path>.status.access.log

разбирается с помощью vector, пересылающий события на внутренний балансировщик. Между фронтальными и внутренними балансировщиками есть сетевая связность по порту TCP

<port>

Конфиг vector на фронтальных балансировщиках: Показать

sources:
  device-status:
    type: "file"
    max_line_bytes: 1638400
    include:
      - /var/log/<device_access_log_file_path>.status.access.log

transforms:
  device-status_filter:
    type: "filter"
    inputs:
      - device-status
    condition:
      .message != ""

  device-status_transform:
    type: "remap"
    inputs:
      - device-status_filter
    source: |
      .message = parse_jsonI(.message)
      .message.traffic_source = .host
      . = .message

sinks:
  sink_clickhouse:
    type: "clickhouse"
    inputs:
      - device-status_transform
    endpoint: "http://<inner_BGP_IP>:<port>"
    format: "json_as_string"
    healthcheck:
        enabled: false
    auth:
      strategy: "basic"
      user: "<clickhouse_DB_user>"
      password: "<clickhouse_DB_password>"
    database: "<clickhouse_DB>"
    table: "<clickhouse_table>"

Конфиг внутреннего балансировщика, пробрасывающего события в clickhouse: Показать

stream {
include /etc/<path_to_balancer_config>/log-format-s.conf;

upstream clickhouse8123 {
    server <clickhouse_BGP_IP>:8123;
    server <clickhouse_node1_IP>:8123 backup;
    server <clickhouse_node2_IP>:8123 backup;
}

server {
    listen <port>;
    proxy_pass clickhouse8123;
    access_log /var/log/<access_log_file_path> main_json_mini_s;
    error_log /var/log/<error_log_file_path>;
}
}

Команды clickhouse для создания БД, таблиц и представлений: Показать

# Создание БД
CREATE DATABASE vector ON CLUSTER <cluster_name>;

DROP TABLE IF EXISTS vector.device_status_logs ON CLUSTER <cluster_name>;
# Создание таблицы
CREATE TABLE vector.device_status_logs ON CLUSTER <cluster_name> (
        'message' String
)
ENGINE = ReplicatedReplacingMergeTree('/сlickhouse/{cluster}/tables/validator_status_logs','{replica}')
ORDER BY tuple();

DROP VIEW IF EXISTS vector.device_status_logs_view ON CLUSTER <cluster_name>;
# Создание представления
CREATE MATERIALIZED VIEW vector.device_status_logs_view ON CLUSTER <cluster_name> (
        'time_stamp' DateTime,
        'req' String,
        'req_body' String
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{cluster}/tables/device_status_logs_view','{replica}')
ORDER BY time_stamp
SETTINGS index_granularity = 8192
AS SELECT
        parseDateTimeBestEffortOrNull(JSONExtractString(message, 'timestamp')) AS time_stamp,
        simpleJSONExtractRaw(message, 'req') AS req,
        simpleJSONExtractRaw(message, 'req_body') AS req_body
FROM (
        SELECT message
        FROM vector.device_status_logs
);

Популярное
Наверх