所需的支持文件:
xlsxwriter>=3.0.0
pymysql>=1.0.0
pandas>=1.5.0
报告生成脚本:
#!/usr/bin/python3
# coding: utf-8
import xlsxwriter
import pymysql
import time
import sqlite3
import pandas as pd
from datetime import datetime, timedelta
import os
import configparser
import re
import calendar
class ZabbixDailyReport:
def __init__(self):
# Zabbix数据库配置
self.db_config = {
'host': 'localhost',
'port': 3306,
'user': 'zabbix',
'password': 'Ruipos@123',
'database': 'zabbix',
'charset': 'utf8mb4'
}
# 报表配置
self.report_dir = "/tmp/zabbix_reports"
self.local_db_file = os.path.join(self.report_dir, "zabbix_history.db")
os.makedirs(self.report_dir, exist_ok=True)
# 初始化本地数据库
self.init_local_database()
def init_local_database(self):
"""初始化SQLite数据库用于存储历史数据"""
conn = sqlite3.connect(self.local_db_file)
cursor = conn.cursor()
# 创建主机指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS host_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
report_date TEXT NOT NULL,
hostname TEXT NOT NULL,
ip TEXT,
host_groups TEXT,
cpu_current REAL,
cpu_max REAL,
cpu_avg REAL,
memory_current REAL,
memory_max REAL,
memory_avg REAL,
disk_current REAL,
disk_max REAL,
disk_avg REAL,
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(report_date, hostname)
)
''')
# 创建索引以提高查询性能
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date_host ON host_metrics (report_date, hostname)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_hostname ON host_metrics (hostname)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON host_metrics (report_date)')
conn.commit()
conn.close()
print("本地数据库初始化完成")
def connect_zabbix_db(self):
"""连接Zabbix数据库"""
try:
conn = pymysql.connect(**self.db_config)
return conn
except Exception as e:
print(f"连接Zabbix数据库失败: {e}")
return None
def is_valid_host(self, hostname, groups):
"""判断是否为有效的主机(过滤掉云服务、模板等)"""
# 要排除的关键词列表
exclude_patterns = [
'{#', 'API', 'Controller', 'Scheduler', 'Kubelet', 'Consul',
'AWS.', 'Azure', 'Control-M', 'Cosmos', 'CLOUD_SQL', 'GCE',
'OpenStack', 'Acronis', 'YugabyteDB', 'OCI', 'NUTANIX',
'Template', 'Discovery', 'Hypervisor', 'VMware'
]
# 检查主机名是否包含排除关键词
hostname_upper = hostname.upper()
for pattern in exclude_patterns:
if pattern.upper() in hostname_upper:
return False
# 检查主机组是否包含排除关键词
if groups:
groups_upper = groups.upper()
for pattern in exclude_patterns:
if pattern.upper() in groups_upper:
return False
# 排除模板和自动发现的主机
if hostname.startswith('Template') or hostname.startswith('Discovery'):
return False
return True
def get_all_hosts(self):
"""获取所有启用的主机(过滤掉云服务和特殊主机)"""
conn = self.connect_zabbix_db()
if not conn:
return []
try:
cursor = conn.cursor()
query = """
SELECT
h.hostid, h.host, h.name, h.status,
hi.ip, hi.port, hi.type,
GROUP_CONCAT(DISTINCT hg.name) as host_groups
FROM hosts h
LEFT JOIN interface hi ON h.hostid = hi.hostid AND hi.main = 1
LEFT JOIN hosts_groups hhg ON h.hostid = hhg.hostid
LEFT JOIN hstgrp hg ON hhg.groupid = hg.groupid
WHERE h.status = 0
GROUP BY h.hostid, h.host, h.name, hi.ip, hi.port, hi.type
"""
cursor.execute(query)
hosts = cursor.fetchall()
# 转换为字典列表并过滤
host_list = []
for host in hosts:
hostname = host[1] # h.host
groups = host[7] if host[7] else ''
# 过滤无效主机
if self.is_valid_host(hostname, groups):
host_list.append({
'hostid': host[0],
'host': hostname,
'name': host[2],
'status': host[3],
'ip': host[4],
'port': host[5],
'type': host[6],
'groups': groups
})
print(f"获取到 {len(host_list)} 台有效主机(已过滤云服务和特殊主机)")
# 显示过滤后的主机列表
if host_list:
print("有效主机列表:")
for host in host_list:
print(f" - {host['host']} ({host['ip']}) - 组: {host['groups']}")
return host_list
except Exception as e:
print(f"获取主机列表失败: {e}")
return []
finally:
conn.close()
def get_item_value(self, hostid, item_key):
"""获取监控项的最新值"""
conn = self.connect_zabbix_db()
if not conn:
return None
try:
cursor = conn.cursor()
query = """
SELECT i.itemid, i.value_type, h.value, h.clock
FROM items i
LEFT JOIN history h ON i.itemid = h.itemid
WHERE i.hostid = %s AND i.key_ = %s
ORDER BY h.clock DESC
LIMIT 1
"""
cursor.execute(query, (hostid, item_key))
result = cursor.fetchone()
if result:
return {
'itemid': result[0],
'value_type': result[1],
'value': result[2],
'clock': result[3]
}
return None
except Exception as e:
print(f"获取监控项 {item_key} 失败: {e}")
return None
finally:
conn.close()
def get_item_history_stats(self, hostid, item_key, hours=24, end_time=None):
"""获取监控项的历史统计信息"""
conn = self.connect_zabbix_db()
if not conn:
return {'current': 0, 'max': 0, 'avg': 0}
try:
cursor = conn.cursor()
# 计算时间范围
if end_time is None:
end_time = int(time.time())
else:
# 如果指定了结束时间,确保是当天的23:59:59
end_time = int(end_time.replace(hour=23, minute=59, second=59).timestamp())
start_time = end_time - hours * 3600
# 首先检查监控项是否存在
query_item = "SELECT itemid, value_type FROM items WHERE hostid = %s AND key_ = %s"
cursor.execute(query_item, (hostid, item_key))
item_info = cursor.fetchone()
if not item_info:
print(f"监控项不存在: {item_key} for hostid: {hostid}")
return {'current': 0, 'max': 0, 'avg': 0}
itemid, value_type = item_info
# 选择正确的历史表
if value_type == 0: # 浮点数
history_table = "history"
elif value_type == 3: # 数值
history_table = "history_uint"
else:
history_table = "history_str" # 字符串
# 获取当前值(最新值)
query_current = f"""
SELECT value FROM {history_table}
WHERE itemid = %s AND clock >= %s AND clock <= %s
ORDER BY clock DESC LIMIT 1
"""
cursor.execute(query_current, (itemid, start_time, end_time))
current_result = cursor.fetchone()
current_value = float(current_result[0]) if current_result else 0
# 获取最大值和平均值
query_stats = f"""
SELECT MAX(value), AVG(value)
FROM {history_table}
WHERE itemid = %s AND clock >= %s AND clock <= %s
"""
cursor.execute(query_stats, (itemid, start_time, end_time))
stats_result = cursor.fetchone()
max_value = float(stats_result[0]) if stats_result and stats_result[0] else 0
avg_value = float(stats_result[1]) if stats_result and stats_result[1] else 0
return {
'current': round(current_value, 2),
'max': round(max_value, 2),
'avg': round(avg_value, 2)
}
except Exception as e:
print(f"获取监控项统计 {item_key} 失败: {e}")
return {'current': 0, 'max': 0, 'avg': 0}
finally:
conn.close()
def get_host_metrics_for_date(self, host, target_date):
"""获取指定日期的主机监控数据"""
hostid = host['hostid']
hostname = host.get('name', host['host'])
ip = host.get('ip', 'N/A')
print(f"正在获取主机 {hostname} 在 {target_date} 的监控数据...")
# 将目标日期转换为datetime对象
target_datetime = datetime.strptime(target_date, "%Y-%m-%d")
# 获取各项指标
cpu_usage = self.get_item_history_stats(hostid, "system.cpu.util", 24, target_datetime)
memory_usage = self.get_item_history_stats(hostid, "vm.memory.util", 24, target_datetime)
disk_usage = self.get_item_history_stats(hostid, "vfs.fs.size[/,pused]", 24, target_datetime)
# 如果主要监控项不存在,尝试备用监控项
if cpu_usage['current'] == 0:
cpu_usage = self.get_item_history_stats(hostid, "system.cpu.load[all,avg1]", 24, target_datetime)
if memory_usage['current'] == 0:
memory_usage = self.get_item_history_stats(hostid, "vm.memory.size[pused]", 24, target_datetime)
metrics = {
'hostname': hostname,
'ip': ip,
'groups': host.get('groups', ''),
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'disk_usage': disk_usage
}
return metrics
def get_missing_dates(self):
"""获取需要补全数据的日期列表"""
conn = sqlite3.connect(self.local_db_file)
cursor = conn.cursor()
# 获取数据库中已有的最新日期
cursor.execute("SELECT MAX(report_date) FROM host_metrics")
result = cursor.fetchone()
last_date = result[0] if result[0] else None
conn.close()
if not last_date:
# 如果数据库为空,从11月1日开始(假设你的数据从11月开始)
start_date = datetime(2024, 11, 1).date()
else:
# 从最后日期+1天开始
start_date = datetime.strptime(last_date, "%Y-%m-%d").date() + timedelta(days=1)
end_date = datetime.now().date()
# 生成需要补全的日期列表
missing_dates = []
current_date = start_date
while current_date <= end_date:
missing_dates.append(current_date.strftime("%Y-%m-%d"))
current_date += timedelta(days=1)
print(f"需要补全的日期: {missing_dates}")
return missing_dates
def save_metrics_to_local_db(self, hosts_metrics, target_date=None):
"""保存指标数据到本地数据库"""
conn = sqlite3.connect(self.local_db_file)
cursor = conn.cursor()
if target_date is None:
target_date = datetime.now().strftime("%Y-%m-%d")
saved_count = 0
for host in hosts_metrics:
try:
cursor.execute('''
INSERT OR REPLACE INTO host_metrics
(report_date, hostname, ip, host_groups,
cpu_current, cpu_max, cpu_avg,
memory_current, memory_max, memory_avg,
disk_current, disk_max, disk_avg)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
target_date,
host['hostname'],
host['ip'],
host['groups'],
host['cpu_usage']['current'],
host['cpu_usage']['max'],
host['cpu_usage']['avg'],
host['memory_usage']['current'],
host['memory_usage']['max'],
host['memory_usage']['avg'],
host['disk_usage']['current'],
host['disk_usage']['max'],
host['disk_usage']['avg']
))
saved_count += 1
except Exception as e:
print(f"保存主机 {host['hostname']} 数据失败: {e}")
conn.commit()
conn.close()
print(f"成功保存 {saved_count} 台主机在 {target_date} 的监控数据到本地数据库")
return saved_count
def fill_missing_data(self):
"""补全缺失的日期数据"""
missing_dates = self.get_missing_dates()
if not missing_dates:
print("没有需要补全的数据")
return True
# 获取所有主机
hosts = self.get_all_hosts()
if not hosts:
print("没有找到可用的主机")
return False
total_filled = 0
for date_str in missing_dates:
print(f"\n=== 补全 {date_str} 的数据 ===")
hosts_metrics = []
for host in hosts:
metrics = self.get_host_metrics_for_date(host, date_str)
hosts_metrics.append(metrics)
time.sleep(0.1) # 避免数据库压力过大
# 保存到本地数据库
saved_count = self.save_metrics_to_local_db(hosts_metrics, date_str)
total_filled += saved_count
print(f"\n数据补全完成! 共补全 {len(missing_dates)} 天的数据,{total_filled} 条记录")
return True
def get_monthly_data(self, year=None, month=None):
"""获取月度数据,如果未指定年月则获取当前月数据"""
if year is None:
year = datetime.now().year
if month is None:
month = datetime.now().month
# 计算月份的第一天和最后一天
first_day = datetime(year, month, 1)
if month == 12:
last_day = datetime(year + 1, 1, 1) - timedelta(days=1)
else:
last_day = datetime(year, month + 1, 1) - timedelta(days=1)
# 如果当前日期小于最后一天,则使用当前日期
today = datetime.now()
if today < last_day:
last_day = today
start_date = first_day.strftime("%Y-%m-%d")
end_date = last_day.strftime("%Y-%m-%d")
conn = sqlite3.connect(self.local_db_file)
query = """
SELECT report_date, hostname, ip, host_groups,
cpu_current, cpu_max, cpu_avg,
memory_current, memory_max, memory_avg,
disk_current, disk_max, disk_avg
FROM host_metrics
WHERE report_date >= ? AND report_date <= ?
ORDER BY report_date, hostname
"""
df = pd.read_sql_query(query, conn, params=[start_date, end_date])
conn.close()
print(f"获取到 {len(df)} 条月度数据,时间范围: {start_date} 至 {end_date}")
return df
def generate_monthly_report(self, year=None, month=None):
"""生成月度详细报表"""
if year is None:
year = datetime.now().year
if month is None:
month = datetime.now().month
# 获取月度数据
monthly_data = self.get_monthly_data(year, month)
if monthly_data.empty:
print(f"没有找到 {year}年{month}月 的监控数据")
return False
# 生成Excel文件名
excel_file = os.path.join(self.report_dir, f"zabbix_monthly_report_{year}{month:02d}.xlsx")
workbook = xlsxwriter.Workbook(excel_file)
# 设置格式
header_format = workbook.add_format({
'bold': True, 'fg_color': '#366092', 'font_color': 'white',
'border': 1, 'align': 'center', 'text_wrap': True
})
cell_format = workbook.add_format({'border': 1, 'align': 'center'})
percent_format = workbook.add_format({'border': 1, 'align': 'center', 'num_format': '0.00%'})
warning_percent_format = workbook.add_format({
'border': 1, 'align': 'center', 'num_format': '0.00%',
'bg_color': '#FFC7CE', 'font_color': '#9C0006'
})
# 月度明细工作表
worksheet = workbook.add_worksheet('月度明细')
# 设置列头
headers = [
'日期', '主机名', 'IP地址', '主机组',
'CPU当前使用率', 'CPU最大使用率', 'CPU平均使用率',
'内存当前使用率', '内存最大使用率', '内存平均使用率',
'磁盘当前使用率', '磁盘最大使用率', '磁盘平均使用率'
]
for col, header in enumerate(headers):
worksheet.write(0, col, header, header_format)
# 写入数据
row = 1
for index, record in monthly_data.iterrows():
# 基础信息
worksheet.write(row, 0, record['report_date'], cell_format)
worksheet.write(row, 1, record['hostname'], cell_format)
worksheet.write(row, 2, record['ip'], cell_format)
worksheet.write(row, 3, record['host_groups'], cell_format)
# CPU使用率数据(带颜色标记)
cpu_current_format = warning_percent_format if record['cpu_current'] > 80 else percent_format
cpu_max_format = warning_percent_format if record['cpu_max'] > 80 else percent_format
cpu_avg_format = warning_percent_format if record['cpu_avg'] > 80 else percent_format
worksheet.write(row, 4, record['cpu_current'] / 100, cpu_current_format)
worksheet.write(row, 5, record['cpu_max'] / 100, cpu_max_format)
worksheet.write(row, 6, record['cpu_avg'] / 100, cpu_avg_format)
# 内存使用率数据(带颜色标记)
memory_current_format = warning_percent_format if record['memory_current'] > 80 else percent_format
memory_max_format = warning_percent_format if record['memory_max'] > 80 else percent_format
memory_avg_format = warning_percent_format if record['memory_avg'] > 80 else percent_format
worksheet.write(row, 7, record['memory_current'] / 100, memory_current_format)
worksheet.write(row, 8, record['memory_max'] / 100, memory_max_format)
worksheet.write(row, 9, record['memory_avg'] / 100, memory_avg_format)
# 磁盘使用率数据(带颜色标记)
disk_current_format = warning_percent_format if record['disk_current'] > 80 else percent_format
disk_max_format = warning_percent_format if record['disk_max'] > 80 else percent_format
disk_avg_format = warning_percent_format if record['disk_avg'] > 80 else percent_format
worksheet.write(row, 10, record['disk_current'] / 100, disk_current_format)
worksheet.write(row, 11, record['disk_max'] / 100, disk_max_format)
worksheet.write(row, 12, record['disk_avg'] / 100, disk_avg_format)
row += 1
# 设置列宽
column_widths = [12, 20, 15, 25, 12, 12, 12, 12, 12, 12, 12, 12, 12]
for col, width in enumerate(column_widths):
worksheet.set_column(col, col, width)
# 创建统计工作表
self._create_monthly_stats_sheet(workbook, monthly_data, header_format, cell_format)
workbook.close()
print(f"月度报表已生成: {excel_file}")
return excel_file
def _create_monthly_stats_sheet(self, workbook, monthly_data, header_format, cell_format):
"""创建月度统计工作表"""
stats_sheet = workbook.add_worksheet('月度统计')
stats_sheet.set_column('A:A', 25)
stats_sheet.set_column('B:B', 15)
# 计算统计信息
total_days = len(monthly_data['report_date'].unique())
total_hosts = len(monthly_data['hostname'].unique())
total_records = len(monthly_data)
# 计算告警统计
high_cpu_days = len(monthly_data[monthly_data['cpu_current'] > 80])
high_memory_days = len(monthly_data[monthly_data['memory_current'] > 80])
high_disk_days = len(monthly_data[monthly_data['disk_current'] > 80])
# 计算告警主机数
high_cpu_hosts = len(monthly_data[monthly_data['cpu_current'] > 80]['hostname'].unique())
high_memory_hosts = len(monthly_data[monthly_data['memory_current'] > 80]['hostname'].unique())
high_disk_hosts = len(monthly_data[monthly_data['disk_current'] > 80]['hostname'].unique())
stats_data = [
['统计项目', '数值'],
['总天数', total_days],
['总主机数', total_hosts],
['总记录数', total_records],
['', ''],
['告警统计', ''],
['CPU使用率>80%的天数', high_cpu_days],
['内存使用率>80%的天数', high_memory_days],
['磁盘使用率>80%的天数', high_disk_days],
['', ''],
['告警主机统计', ''],
['CPU告警主机数', high_cpu_hosts],
['内存告警主机数', high_memory_hosts],
['磁盘告警主机数', high_disk_hosts],
['', ''],
['报表生成时间', datetime.now().strftime("%Y-%m-%d %H:%M:%S")],
['', ''],
['颜色说明', ''],
['红色背景', '使用率超过80%']
]
for row, data in enumerate(stats_data):
for col, value in enumerate(data):
if row == 0 or row == 5 or row == 10 or row == 17:
stats_sheet.write(row, col, value, header_format)
else:
stats_sheet.write(row, col, value, cell_format)
def generate_today_report(self, hosts_metrics):
"""生成今日详细报表"""
today = datetime.now().strftime("%Y%m%d")
excel_file = os.path.join(self.report_dir, f"zabbix_daily_report_{today}.xlsx")
workbook = xlsxwriter.Workbook(excel_file)
# 设置格式
header_format = workbook.add_format({
'bold': True, 'fg_color': '#366092', 'font_color': 'white',
'border': 1, 'align': 'center'
})
cell_format = workbook.add_format({'border': 1, 'align': 'center'})
percent_format = workbook.add_format({'border': 1, 'align': 'center', 'num_format': '0.00%'})
# 主机资源工作表
worksheet = workbook.add_worksheet('今日监控')
headers = [
'主机名', 'IP地址', '主机组',
'CPU当前使用率', 'CPU最大使用率', 'CPU平均使用率',
'内存当前使用率', '内存最大使用率', '内存平均使用率',
'磁盘当前使用率', '磁盘最大使用率', '磁盘平均使用率'
]
for col, header in enumerate(headers):
worksheet.write(0, col, header, header_format)
for row, host in enumerate(hosts_metrics, 1):
worksheet.write(row, 0, host['hostname'], cell_format)
worksheet.write(row, 1, host['ip'], cell_format)
worksheet.write(row, 2, host['groups'], cell_format)
worksheet.write(row, 3, host['cpu_usage']['current'] / 100, percent_format)
worksheet.write(row, 4, host['cpu_usage']['max'] / 100, percent_format)
worksheet.write(row, 5, host['cpu_usage']['avg'] / 100, percent_format)
worksheet.write(row, 6, host['memory_usage']['current'] / 100, percent_format)
worksheet.write(row, 7, host['memory_usage']['max'] / 100, percent_format)
worksheet.write(row, 8, host['memory_usage']['avg'] / 100, percent_format)
worksheet.write(row, 9, host['disk_usage']['current'] / 100, percent_format)
worksheet.write(row, 10, host['disk_usage']['max'] / 100, percent_format)
worksheet.write(row, 11, host['disk_usage']['avg'] / 100, percent_format)
# 设置列宽
worksheet.set_column('A:A', 20)
worksheet.set_column('B:B', 15)
worksheet.set_column('C:C', 20)
for col in range(3, 12):
worksheet.set_column(col, col, 12)
# 创建统计工作表
stats_sheet = workbook.add_worksheet('统计信息')
stats_sheet.set_column('A:A', 25)
stats_sheet.set_column('B:B', 15)
# 计算统计信息
total_hosts = len(hosts_metrics)
high_cpu_hosts = len([h for h in hosts_metrics if h['cpu_usage']['current'] > 80])
high_memory_hosts = len([h for h in hosts_metrics if h['memory_usage']['current'] > 80])
high_disk_hosts = len([h for h in hosts_metrics if h['disk_usage']['current'] > 80])
stats_data = [
['统计项目', '数量'],
['总主机数', total_hosts],
['CPU使用率>80%', high_cpu_hosts],
['内存使用率>80%', high_memory_hosts],
['磁盘使用率>80%', high_disk_hosts],
['报表生成时间', datetime.now().strftime("%Y-%m-%d %H:%M:%S")],
['数据时间范围', '最近24小时']
]
for row, data in enumerate(stats_data):
for col, value in enumerate(data):
if row == 0:
stats_sheet.write(row, col, value, header_format)
else:
stats_sheet.write(row, col, value, cell_format)
workbook.close()
print(f"今日报表已生成: {excel_file}")
return excel_file
def generate_daily_report(self):
"""生成每日报表"""
print("开始生成Zabbix每日监控报表...")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 获取所有主机
hosts = self.get_all_hosts()
if not hosts:
print("没有找到可用的主机")
return False
# 获取每个主机的监控数据
hosts_metrics = []
for host in hosts:
metrics = self.get_host_metrics_for_date(host, datetime.now().strftime("%Y-%m-%d"))
hosts_metrics.append(metrics)
time.sleep(0.1) # 避免数据库压力过大
# 保存到本地数据库
self.save_metrics_to_local_db(hosts_metrics)
# 生成今日报表
self.generate_today_report(hosts_metrics)
print("每日监控报表生成完成!")
return True
def main():
"""主函数"""
reporter = ZabbixDailyReport()
# 首先补全缺失的数据
print("=== 检查并补全缺失数据 ===")
reporter.fill_missing_data()
# 生成今日报表
print("\n=== 生成今日报表 ===")
success_daily = reporter.generate_daily_report()
# 生成月度报表
print("\n=== 生成月度报表 ===")
current_date = datetime.now()
success_monthly = reporter.generate_monthly_report(current_date.year, current_date.month)
if success_daily and success_monthly:
print("\n所有报表生成成功!")
return 0
else:
print("\n部分报表生成失败!")
return 1
if __name__ == "__main__":
# 安装依赖: pip3 install xlsxwriter pymysql pandas
exit(main())
邮件发送脚本(提示:请更新个人的邮箱地址和授权码)
#!/usr/bin/python3
# coding: utf-8
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from datetime import datetime
import glob
import time
class ReportSender:
def __init__(self):
# 邮件配置 - 使用正确的465端口和SSL
self.email_config = {
'smtp_server': 'smtp.qq.com',
'smtp_port': 465, # 修正:使用465端口
'sender_email': 'yourqq@qq.com',
'sender_password': '授权码',
'receiver_email': 'yourqq@qq.com'
}
# 报表目录
self.report_dir = "/tmp/zabbix_reports"
def send_report_email(self, attachment_path=None, report_type="daily"):
"""发送报表邮件"""
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = self.email_config['sender_email']
msg['To'] = self.email_config['receiver_email']
current_date = datetime.now().strftime("%Y-%m-%d")
if report_type == "monthly":
subject = f"Zabbix月度监控报表 - {datetime.now().strftime('%Y年%m月')}"
body = f"""
尊敬的收件人:
附件是{datetime.now().strftime('%Y年%m月')}的Zabbix监控系统月度报表。
报表包含以下内容:
- 月度明细:当月每天所有主机的CPU、内存、磁盘使用率数据
- 月度统计:总体统计信息和告警分析
报表生成时间:{current_date}
如有任何问题,请及时联系。
此致
敬礼
Zabbix监控系统
"""
else:
subject = f"Zabbix每日监控报表 - {current_date}"
body = f"""
尊敬的收件人:
附件是{current_date}的Zabbix监控系统每日报表。
报表包含以下内容:
- 今日监控:所有主机的CPU、内存、磁盘使用率数据
- 统计信息:总体统计和告警情况
报表生成时间:{current_date}
如有任何问题,请及时联系。
此致
敬礼
Zabbix监控系统
"""
msg['Subject'] = subject
# 添加邮件正文
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
if attachment_path and os.path.exists(attachment_path):
with open(attachment_path, 'rb') as file:
attach = MIMEApplication(file.read(), _subtype='vnd.openxmlformats-officedocument.spreadsheetml.sheet')
attach.add_header('Content-Disposition', 'attachment',
filename=os.path.basename(attachment_path))
msg.attach(attach)
print(f"已添加附件: {os.path.basename(attachment_path)}")
else:
print("警告: 附件文件不存在")
return False
# 连接SMTP服务器并发送邮件 - 使用正确的SSL连接
print("正在连接SMTP服务器...")
# 修正:使用SMTP_SSL连接465端口
server = smtplib.SMTP_SSL(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.set_debuglevel(1) # 开启调试信息
print("正在登录邮箱...")
server.login(self.email_config['sender_email'], self.email_config['sender_password'])
print("正在发送邮件...")
server.send_message(msg)
server.quit()
print(f"? 邮件发送成功: {subject}")
return True
except smtplib.SMTPAuthenticationError as e:
print(f"? SMTP认证失败: {e}")
print("请检查邮箱密码/授权码是否正确")
return False
except Exception as e:
print(f"? 邮件发送失败: {e}")
return False
def find_latest_report(self, report_type="daily"):
"""查找最新的报表文件"""
if report_type == "monthly":
pattern = os.path.join(self.report_dir, "zabbix_monthly_report_*.xlsx")
else:
pattern = os.path.join(self.report_dir, "zabbix_daily_report_*.xlsx")
files = glob.glob(pattern)
if not files:
print(f"未找到{report_type}报表文件")
return None
# 返回最新的文件
latest_file = max(files, key=os.path.getctime)
print(f"找到最新{report_type}报表: {os.path.basename(latest_file)}")
return latest_file
def send_latest_daily_report(self):
"""发送最新的日报表"""
print("=== 发送最新日报表 ===")
report_file = self.find_latest_report("daily")
if report_file:
return self.send_report_email(report_file, "daily")
return False
def send_latest_monthly_report(self):
"""发送最新的月报表"""
print("=== 发送最新月报表 ===")
report_file = self.find_latest_report("monthly")
if report_file:
return self.send_report_email(report_file, "monthly")
return False
def list_available_reports(self):
"""列出所有可用的报表文件"""
print("=== 可用报表文件 ===")
# 日报表
daily_files = glob.glob(os.path.join(self.report_dir, "zabbix_daily_report_*.xlsx"))
print("日报表:")
for file in sorted(daily_files):
print(f" ???? {os.path.basename(file)}")
# 月报表
monthly_files = glob.glob(os.path.join(self.report_dir, "zabbix_monthly_report_*.xlsx"))
print("月报表:")
for file in sorted(monthly_files):
print(f" ???? {os.path.basename(file)}")
return daily_files + monthly_files
def main():
"""主函数"""
sender = ReportSender()
print("Zabbix报表邮件发送工具")
print("=" * 50)
# 列出可用报表
sender.list_available_reports()
print()
# 发送最新的日报表
print("1. 发送最新日报表")
success_daily = sender.send_latest_daily_report()
print()
# 发送最新的月报表
print("2. 发送最新月报表")
success_monthly = sender.send_latest_monthly_report()
if success_daily or success_monthly:
print("\n? 邮件发送任务完成!")
return 0
else:
print("\n? 邮件发送失败!")
return 1
if __name__ == "__main__":
exit(main())
月度调度脚本(判断当月的具体天数,安排相应的月份任务)
#!/usr/bin/python3
# coding: utf-8
import sys
import os
import logging
from datetime import datetime, timedelta
# 添加脚本路径
sys.path.append('/app/scripts')
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('/var/log/monthly_report.log')
]
)
logger = logging.getLogger(__name__)
class MonthlyScheduler:
def __init__(self):
self.report_dir = "/tmp/zabbix_reports"
def get_previous_month(self):
"""获取上个月的年份和月份"""
today = datetime.now()
# 如果是1月,上个月就是去年的12月
if today.month == 1:
prev_year = today.year - 1
prev_month = 12
else:
prev_year = today.year
prev_month = today.month - 1
logger.info(f"计算上个月: {prev_year}年{prev_month}月")
return prev_year, prev_month
def run_monthly_report(self):
"""执行月报生成和发送任务"""
try:
logger.info("=== 开始执行月度报告任务 ===")
# 获取上个月份
year, month = self.get_previous_month()
# 导入报表生成模块 - 使用正确的文件名 report1.py
from report1 import ZabbixDailyReport
# 生成月报
logger.info(f"开始生成 {year}年{month}月 监控报表")
reporter = ZabbixDailyReport()
report_file = reporter.generate_monthly_report(year, month)
if not report_file or not os.path.exists(report_file):
logger.error("月报生成失败")
return False
logger.info(f"月报生成成功: {report_file}")
# 导入邮件发送模块 - 使用正确的文件名 send_mail.py
from send_mail import ReportSender
# 发送邮件
logger.info("开始发送月报邮件")
sender = ReportSender()
success = sender.send_specific_report(report_file, "monthly")
if success:
logger.info("月报邮件发送成功")
logger.info("=== 月度报告任务完成 ===")
return True
else:
logger.error("月报邮件发送失败")
return False
except Exception as e:
logger.error(f"月度报告任务执行异常: {e}")
return False
def main():
"""主函数"""
scheduler = MonthlyScheduler()
success = scheduler.run_monthly_report()
if success:
print("月度报告任务执行成功")
return 0
else:
print("月度报告任务执行失败")
return 1
if __name__ == "__main__":
exit(main())
Dockerfile:
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装系统依赖(包括vim)
#RUN apt-get update && apt-get install -y \
# vim \
# && rm -rf /var/lib/apt/lists/*
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple/
# 创建必要的目录
RUN mkdir -p /app/scripts /tmp/zabbix_reports /var/log
# 复制脚本文件
COPY report1.py /app/scripts/
COPY send_mail.py /app/scripts/
COPY monthly_scheduler.py /app/scripts/
# 设置执行权限
RUN chmod +x /app/scripts/*.py
# 创建日志文件
RUN touch /var/log/monthly_report.log /var/log/daily_report.log && \
chmod 666 /var/log/monthly_report.log /var/log/daily_report.log
# 设置环境变量
ENV PYTHONPATH=/app/scripts
# 启动命令 - 保持容器运行
CMD ["tail", "-f", "/dev/null"]
最后自行配置crontab,根据个人需求实现每月自动发送邮件的功能,便于监控工作。希望能对屏幕前的你有所帮助。


雷达卡


京公网安备 11010802022788号







