基于Python的数据导出和邮件发送
背景
由于运维工作需要,经常需要将一些数据从数据库中导出,发送给运营和需求部门,天天去手动查询,又有点太费时间了,于是研究学习了Python的基本功能,通过Python脚本和Linux 的crontab命令实现了每天自动化的数据查询和邮件发送。代码实现
定义了以下几个代码模块实现了配置文件读取、日志记录、数据库连接访问查询、导出到xlsx和带附件的邮件发送功能。
1、demo.py 示例文件
2、config.yml 配置文件
3、common_log.py 实现日志记录
4、common_db.py 实现数据库连接和访问
5、common_xlsx.py 实现数据表格的处理
6、common_email.py 实现带附件的邮件发送1、demo.py 示例# coding: utf-8 import common_db as mydb import common_xlsx as my_xlsx import common_log as mylog import common_email as my_email if __name__ == "__main__": phone = "13********7" sql = """ select orderno,orderAmount,actualAmount,phone order where phone= "{0}" """.format(phone) # 查询订单 result = mydb.select_by_parameters(sql) filename = "测试.xlsx" sheet_name = "订单查询" # 邮件接收人 receivers = "zhangsan@aliyun.com" receivers_cc = "lisi@aliyun.com" # 邮件抄送人 if len(result) > 0: # 创建工作表 my_xlsx.create_xlsx(filename) mylog.logger.info("创建工作表成功"+filename) # 将查询结果放入工作表 my_xlsx.create_sheet_in_xlsx(filename, sheet_name, result,0) mylog.logger.info("创建工作簿成功"+sheet_name) # 对工作簿求和 my_xlsx.sum_col_for_sheet(filename, sheet_name) mylog.logger.info("求和汇总成功") # 删除空工作表 my_xlsx.delete_sheet_from_xlsx(filename,"Sheet") # 发送邮件 my_email.to_send_email("测试查询结果", receivers, receivers_cc, filename, "订单查询结果.xlsx") mylog.logger.info("发送邮件成功")2、config.yml——配置文件mysql: host: 192.168.x.x port: 3306 username: xxxx password: xxxx database: xxxx log: log_path: D:log log_size: 8 log_num: 3 email: smtp: smtp.126.com # 发送方邮件地址 from: xxxx@aliyun.com # 发送方授权码 password: fdsafafdsafsa 3、common_log.py 实现日志记录import logging.handlers import logging import yaml import os import sys # 提供日志功能 class logger: # 先读取XML文件中的配置数据 # 由于config.xml放置在与当前文件相同的目录下,因此通过 __file__ 来获取XML文件的目录,然后再拼接成绝对路径 # 这里利用了lxml库来解析XML # root = etree.parse(os.path.join(os.path.dirname(__file__), "config.xml")).getroot() # 先读取yml中的配置数据 # 由于 # 读取日志文件保存路径 with open("config.yml", "r") as f: result = yaml.load(f, Loader=yaml.FullLoader) config_log = result["log"] logpath = config_log["log_path"] # root.find("logpath").text # 读取日志文件容量,转换为字节 logsize = 1024*1024*int(config_log["log_size"]) # 读取日志文件保存个数 lognum = int(config_log["log_num"]) # 日志文件名:由用例脚本的名称,结合日志保存路径,得到日志文件的绝对路径 logname = os.path.join(logpath, sys.argv[0].split("/")[-1].split(".")[0])+".log" # 初始化logger log = logging.getLogger() # 日志格式,可以根据需要设置 fmt = logging.Formatter("[%(asctime)s][%(filename)s][line:%(lineno)d][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") # 日志输出到文件,这里用到了上面获取的日志名称,大小,保存个数 handle1 = logging.handlers.RotatingFileHandler(logname, maxBytes=logsize, backupCount=lognum) handle1.setFormatter(fmt) # 同时输出到屏幕,便于实施观察 handle2 = logging.StreamHandler(stream=sys.stdout) handle2.setFormatter(fmt) log.addHandler(handle1) log.addHandler(handle2) # 设置日志基本,这里设置为INFO,表示只有INFO级别及以上的会打印 log.setLevel(logging.INFO) # 日志接口,用户只需调用这里的接口即可,这里只定位了INFO, WARNING, ERROR三个级别的日志,可根据需要定义更多接口 @classmethod def info(cls, msg): cls.log.info(msg) return @classmethod def warning(cls, msg): cls.log.warning(msg) return @classmethod def error(cls, msg): cls.log.error(msg) return4、common_db.py 实现数据库连接和访问import time import pymysql from common_log import * # 连接数据库 def get_connection(): _conn_status = True _max_retries_count = 10 # 设置最大重试次数 _conn_retries_count = 0 # 初始重试次数 _conn_timeout = 3 # 连接超时时间为3秒 with open("config.yml", "r") as f: result = yaml.load(f, Loader=yaml.FullLoader) config_mysql = result["mysql"] while _conn_status and _conn_retries_count <= _max_retries_count: try: connect = pymysql.connect(host=config_mysql["host"], user=config_mysql["username"], password=config_mysql["password"], database=config_mysql["database"], port=config_mysql["port"]) _conn_status = False # 如果conn成功则_status为设置为False则退出循环,返回db连接对象 logger.info("连接数据库成功") return connect except Exception as e: _conn_retries_count += 1 logger.info("第%s次连接数据库失败"%(_conn_retries_count)) logger.error(e) time.sleep(3) continue # 查询函数 def select_by_parameters(sql, params=None): try: connect = get_connection() cursor = connect.cursor(pymysql.cursors.DictCursor) cursor.execute(sql, params) result = cursor.fetchall() return result except Exception as e: logger.error("执行查询报错") logger.info(sql) logger.error(e) finally: try: cursor.close() except Exception as e: logger.error("关闭游标对象报错") logger.error(e) try: connect.close() except Exception as e: print(e) print("数据库链接关闭异常")5、common_xlsx.py 实现数据表格的处理import openpyxl import os from openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font, colors from openpyxl.utils import get_column_letter # 定义边框 thin_border = Border(left=Side(style="thin", color="FFFFFF"), right=Side(style="thin", color="FFFFFF"), top=Side(style="thin", color="FFFFFF"), bottom=Side(style="thin", color="FFFFFF")) # 居中对齐 alignment_center = Alignment(horizontal="center", vertical="center") # 右对齐 alignment_right = Alignment(horizontal="center", vertical="center") # 双行填充 fill_double = PatternFill(fgColor="FFDCE6F1", fill_type="solid") # 单行填充 fill_single = PatternFill(fgColor="FFB8CCE4", fill_type="solid") # 表头填充 fill_head = PatternFill(fgColor="FF366092", fill_type="solid") font_head = Font(bold=True, color="FFFFFFFF") # 1、创建xlsx脚本——在指定的filepath,创建指定的filename的xlsx文件 def create_xlsx(filename): wb = openpyxl.Workbook() wb.save(filename) # 2、增加工作簿脚本——读取指定路径下的xlsx文件,在工作表第index位置增加一个工作簿,并将mysql查询结果result写入到该工作簿 def create_sheet_in_xlsx(filename, sheet_name, result, index): # 加载文件 wb = openpyxl.load_workbook(filename) # 在指定位置创建工作表 wb.create_sheet(sheet_name, index) # 获取新建的工作表 ws = wb[sheet_name] j = 1 if len(result) > 0: # 写表头 for key, value in (result[0].items()): ws.cell(1, j, format(key)).border = thin_border # 定义对齐方式 ws.cell(1, j).alignment = alignment_center # 字体 颜色为白色 ws.cell(1, j).font = font_head # 填充 ws.cell(1, j).fill = fill_head # 边框 j = j + 1 ws.row_dimensions[1].height = 30 # 写数据 # 根据结果集行数量进行循环 for i in range(len(result)): # 循环当前行,定义j变量为列使用 j = 1 for key, value in (result[i].items()): if i % 2 == 0: ws.cell(i+2, j).fill = fill_double else: ws.cell(i+2, j).fill = fill_single ws.cell(i + 2, j, value) ws.cell(i + 2, j).border = thin_border ws.cell(i + 2, j).alignment = alignment_center ws.row_dimensions[i + 2].height = 20 if "时间" in format(key): ws.cell(i+2, j).alignment = alignment_center ws.cell(i+2, j).number_format = "yyyy-mm-dd hh:mm:ss" j = j + 1 continue # 如果字段名称是数量的话,不保留小数 if "数量" in format(key): ws.cell(i+2, j).number_format = "0" j = j + 1 continue # 如果是字段名称包含金额的话,则右对齐 if "金额" in format(key): ws.cell(i+2, j).alignment = alignment_right ws.cell(i+2, j).number_format = "#,##0.00" j = j + 1 continue j = j + 1 # 保存工作表 wb.save(filename) # 3、删除工作簿 def delete_sheet_from_xlsx(filename,sheet_name): wb = openpyxl.load_workbook(filename) wb.remove_sheet(wb[sheet_name]) wb.save(filename) # 4、往工作簿中追加行 将mysql的查询结果追加到工作表sheet_name末尾 def add_result_to_sheet(filename,new_sheet_name,result): wb = openpyxl.load_workbook(filename) ws = wb[new_sheet_name] # 获取最大行 mr = ws.max_row if len(result) > 0: for i in range(len(result)): # 循环当前行,定义j变量为列使用 j = 1 for key, value in (result[i].items()): if i % 2 == 0: ws.cell(i + mr, j).fill = fill_double else: ws.cell(i + mr, j).fill = fill_single ws.cell(i + mr, j, value) ws.cell(i + mr, j).border = thin_border ws.cell(i + mr, j).alignment = alignment_center ws.row_dimensions[i + 2].height = 20 if "时间" in format(key): ws.cell(i + mr, j).number_format = "yyyy-mm-dd hh:mm:ss" j = j + 1 continue # 如果字段名称是数量的话,不保留小数 if "数量" in format(key): ws.cell(i + mr, j).number_format = "0" j = j + 1 continue # 如果是字段名称包含金额的话,则右对齐 if "金额" in format(key): ws.cell(i + mr, j).alignment = alignment_right ws.cell(i + mr, j).number_format = "#,##0.00" j = j + 1 continue j = j + 1 wb.save(filename) # 5、删除工作簿中最大行 def delete_max_row_from_sheet(filename,sheet_name): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] ws.delete_rows(ws.max_row) wb.save(filename) # 6、删除工作簿中指定列 def delete_col_from_sheet(filename,sheet_name,colno): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] ws.delete_cols(colno) wb.save(filename) # 7、对表格中金额和数量字段进行求和 def sum_col_for_sheet(filename,sheet_name): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] for col in list(ws.columns): l = [c.value for c in col] if "金额" in l[0] or "数量" in l[0]: # 本列行的数量 row_size = len(col) + 1 # 本列的列号是 col_no = col[1].column col_code = get_column_letter(col[1].column) ws.cell(row_size, col_no, "=sum(" + str(col_code) + str(2) + ":" + str(col_code) + str( row_size - 1) + ")").border = thin_border ws.cell(row_size, col_no).font = font_head # 填充 ws.cell(row_size, col_no).fill = fill_head # 测试添加样式 if "金额" in l[0]: ws.cell(row_size, col_no).number_format = "#,##0.00" ws.cell(row_size, col_no).alignment = alignment_right if "数量" in l[0]: ws.cell(row_size, col_no).number_format = "0" ws.cell(row_size, col_no).alignment = alignment_center ws.row_dimensions[col_no].height = 20 wb.save(filename)6、common_email.py 实现带附件的邮件发送from email.mime.text import MIMEText from email.header import Header from email.mime.multipart import MIMEMultipart from smtplib import SMTP_SSL from common_log import * import yaml # # file_Name是路径名称加文件名和扩展名; # new_file_name是在邮件附件中显示的名称 # receivers是收件人列表,中间逗号隔开 # receivers_cc是抄送人列表 # mail_subject是邮件主题 def to_send_email(mail_subject,receivers,receivers_cc,file_name,new_file_name): with open("config.yml", "r") as f: result = yaml.load(f, Loader=yaml.FullLoader) config_email = result["email"] password = config_email["password"] msg = MIMEMultipart("related") msgAlternative = MIMEMultipart("alternative") msgAlternative.attach(MIMEText("见附件
", "html", "utf-8")) msg.attach(msgAlternative) # file_name 是指文件路径加名称和扩展名 file1 = MIMEText( open(file_name, "rb").read(), "base64", "utf-8" ) file1["Content-Type"] = "application/octet-stream" # new_file_name是指邮件附件中显示的名称 file1.add_header("Content-Disposition", "attachment",filename=new_file_name) msg.attach(file1) msg["Subject"] = Header(mail_subject, "utf-8").encode() msg["From"] = config_email["from"] msg["To"] = receivers msg["Cc"] = receivers_cc try: smtp = SMTP_SSL(config_email["smtp"]) smtp.login(msg["From"], password) smtp.sendmail(msg["From"], msg["To"].split(",") + msg["Cc"].split(","), msg.as_string()) logger.info("发送邮件成功,邮件接收人是:%s,邮件抄送人是:%s"%(receivers, receivers_cc)) except Exception as e: logger.error("发送邮件出现错误,邮件接收人是:%s,邮件抄送人是:%s" % (receivers, receivers_cc)) logger.error(e) finally: try: smtp.quit() except Exception as e: logger.error(e)