为 Python 应用程序配置 ELK 日志堆栈

ELK 日志堆栈(Elasticsearch B.V. 官方称“Elastic Stack”),是由 ElasticSearch、Kibana 以及 Logstash 等组件组成的工具集,被业界广泛用于进行日志处理等任务。其中 ElasticSearch 组件本身也是一个强大不可替代的开源搜索引擎。

一张网页截图,展示了 ELK 堆栈中的 Kibana 组件的可视化日志的能力。在网页截图的左侧,展示了由 ELK 整理的日志字段,右侧有一副柱状图,按时间顺序展示了日志数量。下方的主要部分逐条展示了日志内容
来自 demo.elastic.co 的 ELK 日志堆栈的官方在线示例截图,展示了 ELK 堆栈对于日志数据的强大处理能力

ElasticSearch 及其周边组件的功能非常强大,该堆栈可配置的灵活部分也很多。比如分布式集群、索引生命周期管理以及索引副本等功能。为简单起见,本文章将介绍在单台服务器配置 ELK 堆栈,以实现将 Python 应用程序的日志输出接入 ElasticSearch 的步骤。关于 ElasticSearch 的更多信息,请访问:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

安装 ELK 堆栈

Elastic 官方提供了多种在机器上安装 ELK 堆栈的方式。我们以一台安装有 Ubuntu 22.04 LTS 系统的服务器为例,演示如何通过 apt 包管理器安装 ELK 日志堆栈的方法。对于其他操作系统或安装方式,请参阅官方文档以了解更多信息。

今天我们的示例将会安装 ELK 堆栈的全部组件:ElasticSearch,本身是一个性能强大且易于拓展的数据和搜索引擎;Kibana,可视化数据并提供图形界面的组件;以及 Logstash,从任何方式收集日志,并输出至任意位置的可配置的数据流管线;这些组件将驱动整个 Python 应用程序的日志收集与处理流程。

安装 Java Runtime Environment(JRE)

ELK 堆栈主要由 Java 编写,因此第一步骤是要在系统中安装 Java 运行时环境(Java Runtime Environment,JRE)。我们以开源的 OpenJDK 为例,使用以下命令进行安装:

sudo apt install default-jre

在 Ubuntu 22.04 LTS 系统中,这将会安装 openjdk-11-jre 包。

安装配置 ElasticSearch

ElasticSearch 通过 apt 包管理器的安装非常简单,使用以下命令进行安装:

sudo apt install elasticsearch

安装完成后,注意安装脚本给出的超级用户的密码,以及最后一部分提示:

Reset the password of the elastic built-in superuser with 
'/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic'.


Generate an enrollment token for Kibana instances with 
 '/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana'.

Generate an enrollment token for Elasticsearch nodes with 
'/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s node'.

ElasticSearch 默认的超级用户的用户名是 elastic,密码是安装日志上显示的密码。如果忘记,可以通过执行上方日志的命令将其重置。

ElasticSearch 安装完毕后,使用 root 权限编辑 /etc/elasticsearch/elasticsearch.yml 文件,并修改或添加以下配置:

network.host: localhost
http.host: localhost

安装配置 Kibana

使用以下命令在服务器上安装 Kibana:

sudo apt install kibana

Kibana 安装好后,在服务器本地浏览器访问 http://localhost:5601/,以执行 Kibana 的初始化配置。配置过程中,可能需要填写 ElasticSearch 的配对令牌(enrollment token),该令牌的内容可以通过执行上一小节安装 ElasticSearch 后打印的提示命令获得。

如果用户是通过 SSH 连接到远程服务器进行配置的,可以考虑在 SSH 连接参数后追加 -L 5601:localhost:5601,通过 SSH 端口转发,从而在本地客户端浏览器中访问 localhost:5601 以达到相同效果

通过超级用户 elastic 以及对应密码登录 Kibana 面板后,我们鼓励新增两个单独的用户分别用作日志管理与日志推送,以实现权限控制。登录 Kibana 面板后,点击左侧的“Management” > “Stack Management”,然后首先新建用户的权限。我们以两个用户名称为 userpython-logger 为例:

权限名称集群权限索引权限
userkibana_admin对于 *(全部索引),拥有 all 权限
python-loggermonitor
management
对于 *(全部索引),拥有 all 权限

添加完用户权限后,我们实际添加名为 userpython-logger 的用户。添加过程中,要记得赋予用户同名的用户权限。事实上,这些用户名和权限名称都是可以自定义的,这里仅为示例。

添加完成后,编辑 /etc/kibana/kibana.yml 文件,添加或修改以下配置项:

elasticsearch.hosts: ['https://localhost:9200']
不想每次登录 Kibana 都输入用户名密码?

如果你不希望每次登录 Kibana 面板时都输入用户名密码,可以在 /etc/kibana/kibana.yml 文件中追加以下配置项。注意,务必锁好 Kibana 面板的访问控制,否则任意用户都可以看到你的日志(并有权限篡改它们!):

xpack.security.authc.providers:
  anonymous.anonymous1:
    order: 0
    credentials:
      username: "你的用户名"
      password: "你的密码"

安装配置 Logstash

Logstash 是控制日志从哪里收集、输出到哪里的一个“管道”管理器。我们通过以下命令安装 Logstash:

sudo apt install logstash

Logstash 的配置位于 /etc/logstash/conf.d/ 目录中,如果该目录中没有文件,则新建任何后缀名为 .conf 的文件,均可以添加一个“日志管道”。作为示例,我们在该目录中创建一个名为 01-python.conf 的文件,具体内容如下:

input {
  tcp {
    port => 12345
    codec => json
    tags => "python-service"
  }
}
output {
  if "python-service" in [tags] {
    elasticsearch {
      index => "python-service-%{+YYYY-MM-dd}"
      ssl_enabled => true
      ssl_verification_mode => none
      user => "python-logger"
      password => "对应密码"
    }
  }
}

在上述文件中,我们定义了一个从 TCP 12345 端口接受 JSON 格式的日志输入,将其输出到本地 ElasticSearch 的日志管线。在 ElasticSearch 中,每个日志都需要在一个索引(Index)中。业界惯常的索引命名习惯是按天分割索引文件,也就是上边的 index => "python-service-%{+YYYY-MM-dd}" 一行。这将创建名称例如 python-service-2023-12-31python-service-2024-01-01 这样的索引。

添加好 Logstash 的配置文件后,我们通过执行以下命令应用更改:

sudo systemctl restart logstash

可以通过以下命令查看 Logstash 本身的运行日志,以诊断任何报错:

sudo journalctl -u logstash

理论上重新启动 Logstash 后,我们可以通过 sudo netstat -unltp 注意到 Logstash 已经在监听 TCP 12345 端口。我们可以使用下列示例 Python 脚本向 Logstash 发送内容,并登录 Kibana 进行查看:

import logging
import logstash # 需要 pip install python-logstash
import sys

host = 'localhost'

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.TCPLogstashHandler(host, 12345, version=1))

test_logger.error('Hello, world! This is an error')
test_logger.info('Hello, world! This is an info')
test_logger.warning('Hello, world! This is a warning')

同样地,请为服务器端的 TCP 12345 端口(或者其他你喜欢的端口数字)设置恰当的访问权限控制,以避免互联网上的任意用户向日志索引中追加内容。

Logstash 本身的配置文件功能很多,可以从多种数据源及不同数据格式中接收日志,并输出到指定位置(不仅包括 ElasticSearch)。关于 Logstash 配置文档的更多信息,请查看:https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html

配置 Python 应用程序

到目前为止,ELK 堆栈的全部内容已经配置完成了。现在我们开始配置 Python 应用程序。如果你使用的是 Django,则需要在 LOGGING 配置项中进行类似这样的配置:

LOGGING = {
  ...
  'handlers': {
      'logstash': {
          'level': 'DEBUG',
          'class': 'logstash.LogstashHandler',
          'host': 'localhost',
          'port': 12345,
          'version': 1, # Version of logstash event schema. Default value: 0 (for backward compatibility of the library)
          'message_type': 'logstash',  # 'type' field in logstash message. Default value: 'logstash'.
          'fqdn': False, # Fully qualified domain name. Default value: false.
          'tags': ['tag1', 'tag2'], # list of tags. Default: None.
      },
  },
  'loggers': {
      'django.request': {
          'handlers': ['logstash'],
          'level': 'DEBUG',
          'propagate': True,
      },
  },
  ...
}

如果你使用其他框架或直接调用 logger 进行日志输出,配置方式可能像这样:

import logging
import logstash
import sys

host = 'localhost'

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 12345, version=1))

配置日志自动清除

默认情况下 ElasticSearch 不会删除任何日志信息。这可能会导致日志量增大时,服务器磁盘空间不足。这里给出了一个我使用 Python 编写的自动清除 xx 天前索引的脚本,可以通过定时运行(如 crontab)脚本的方式,实现日志的按天滚动清除:

代码详见
from datetime import datetime, timedelta
from typing import List
from dataclasses import dataclass
import requests
from requests.auth import HTTPBasicAuth

@dataclass(frozen=True)
class IndexConfig(object):
    delete_threshold: timedelta
    delete_no_suffix: bool = False

ELASTIC_USERNAME = "管理用户的用户名"
ELASTIC_PASSWORD = "对应密码"
ELASTIC_AUTH = HTTPBasicAuth(username=ELASTIC_USERNAME, password=ELASTIC_PASSWORD)
ELASTIC_API_BASEURL = "https://localhost:9200"
ELASTIC_INDEX_PREFIXES_CONFIG = {
    "python-service": IndexConfig(delete_threshold=timedelta(days=7)) # 在这里设置要自动删除的索引名称,以及间隔几天自动清除旧索引
}

def get_all_indexes() -> List[str]:
    return requests.get(f"{ELASTIC_API_BASEURL}/{index_prefix}*", auth=ELASTIC_AUTH, verify=False).json()

def delete_index(index_name: str):
    print(f"Delete index: {index_name}")
    return requests.delete(f"{ELASTIC_API_BASEURL}/{index_name}", auth=ELASTIC_AUTH, verify=False).json()

for index_prefix, index_config in ELASTIC_INDEX_PREFIXES_CONFIG.items():
    for index in get_all_indexes():
        index_suffix = index[len(index_prefix)+1:]
        
        if not index_suffix:
            if index_config.delete_no_suffix:
                delete_index(index)
            continue
        
        try:
            index_datetime = datetime.fromisoformat(index_suffix)
        except ValueError:
            print(f"Warning: index_datetime invalid parsing: {index_suffix}.")
            continue
        
        if datetime.now() - index_datetime >= index_config.delete_threshold:
            delete_index(index)

上述脚本需要 Python 的第三方库 requests。

其他登陆方式

Kibana 可以配置更多的登陆方式,比如使用 Google 单点登录(Google SSO),但这些功能属于 ELK 的高级订阅层级支持的功能。如果要了解更多信息,可以参考 https://www.elastic.co/guide/en/cloud/current/ec-securing-clusters-oidc-op.html

参考资料