pip install 对应驱动(如 pymysql),通过
import 导入库,创建数据库连接对象,再执行 SQL 语句即可
以下是关于 Python 如何调用数据库包 的完整指南,涵盖主流数据库类型、核心库的使用方式、最佳实践及典型场景示例,本文将通过结构化内容帮助您快速掌握 Python 与数据库交互的核心技能。
常用数据库驱动包概览
| 数据库类型 | 推荐库 | 特点 |
|---|---|---|
| SQLite | sqlite3 (内置) |
轻量级文件型数据库,无需服务器,适合小型应用 |
| MySQL | pymysql, mysql-connector-python |
高性能开源关系型数据库,广泛用于 Web 开发 |
| PostgreSQL | psycopg2 |
功能丰富的对象-关系型数据库,支持 JSON/GIS 等高级特性 |
| MariaDB | mariadb |
MySQL 分支,兼容性强,性能优化更好 |
| Microsoft SQL Server | pyodbc, pymssql |
企业级商业数据库,需注意 TLS/SSL 配置 |
| Oracle | cx_Oracle |
大型企业级数据库,依赖底层客户端库 |
| MongoDB | pymongo |
NoSQL 文档型数据库,适合灵活的数据模型 |
| Redis | redis-py |
内存键值存储,常用于缓存、会话管理等场景 |
| 通用接口 | SQLAlchemy |
ORM 框架,提供统一 API 操作多种数据库 |
核心库使用详解
SQLite(原生 sqlite3 模块)
import sqlite3
# 创建/连接数据库(自动创建同名文件)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
age INTEGER)''')
# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Alice", 25))
conn.commit() # 必须提交事务!
# 查询数据
cursor.execute("SELECT FROM users")
print(cursor.fetchall()) # [(1, 'Alice', 25)]
# 参数化查询防注入
cursor.execute("SELECT FROM users WHERE age > ?", (20,))
# 关闭连接
conn.close()
关键点:
单文件存储,零配置启动
️ 并发写入需加锁(默认线程不安全)
适合原型开发、移动应用本地存储
MySQL(以 pymysql 为例)
import pymysql
# 建立连接
conn = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='mydb',
charset='utf8mb4' # 支持 Emoji
)
try:
with conn.cursor() as cursor:
# 批量插入
sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
data = [('Phone', 999), ('Laptop', 1999)]
cursor.executemany(sql, data)
conn.commit()
# 带占位符的复杂查询
cursor.execute("SELECT FROM products WHERE price < %s", (1500,))
print(cursor.fetchone()) # 返回第一条匹配记录
finally:
conn.close()
关键配置:
️ charset='utf8mb4' 解决中文乱码问题
远程连接需开放防火墙端口(默认3306)
️ 生产环境建议配合连接池(如 DBUtils)
PostgreSQL(psycopg2 标准方案)
import psycopg2
from psycopg2 import sql
# 连接字符串格式更灵活
conn = psycopg2.connect(
dbname="mydb",
user="postgres",
password="your_password",
host="localhost",
port=5432
)
# 使用 Compositional API 构建动态 SQL(防止注入)
query = sql.SQL("SELECT FROM orders WHERE status = {}").format(
sql.Literal('shipped')
)
with conn.cursor() as cursor:
cursor.execute(query)
# 分页查询示例
page_size = 10
offset = 20
cursor.execute(sql.SQL("SELECT FROM large_table LIMIT %s OFFSET %s"), [page_size, offset])
print(cursor.fetchmany(page_size))
# 事务回滚示例
try:
cursor.execute("UPDATE account SET balance = balance 100 WHERE id=1")
cursor.execute("UPDATE account SET balance = balance + 100 WHERE id=2")
except Exception as e:
conn.rollback()
print(f"Transaction failed: {e}")
else:
conn.commit()
优势特性:
强大的窗口函数支持(RANK(), OVER())
JSONB 字段原生支持
TIMESTAMPTZ 时区处理更完善
MongoDB(pymongo 异步友好)
from pymongo import MongoClient
from bson.objectid import ObjectId
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
collection = db['users']
# 插入文档
result = collection.insert_one({
"name": "Bob",
"email": "bob@example.com",
"roles": ["user", "editor"]
})
print("Inserted ID:", result.inserted_id)
# 复杂查询
pipeline = [
{"$match": {"age": {"$gte": 18}}},
{"$sort": {"created_at": -1}},
{"$limit": 5}
]
results = list(collection.aggregate(pipeline))
# 更新操作
collection.update_many(
{"status": "active"},
{"$set": {"last_login": datetime.now()}}
)
# 删除操作
collection.delete_one({"_id": ObjectId("5f8d...")})
设计模式建议:
️ 嵌套文档 vs 引用关系的选择
定期清理过期日志(TTL Indexes)
️ 敏感字段加密存储(Encryption at Rest)
通用操作流程对照表
| 操作步骤 | SQL 数据库实现 | MongoDB 实现 |
|---|---|---|
| 建立连接 | psycopg2.connect() |
MongoClient() |
| 创建集合/表 | CREATE TABLE SQL 语句 |
首次插入自动创建集合 |
| 插入数据 | cursor.execute(INSERT) |
collection.insert_one/many() |
| 查询数据 | cursor.execute(SELECT) → fetch |
collection.find().to_list() |
| 更新数据 | UPDATE ... WHERE 语句 |
update_one/many() |
| 删除数据 | DELETE ... WHERE 语句 |
delete_one/many() |
| 事务管理 | BEGIN; COMMIT/ROLLBACK; |
多文档事务(WiredTiger 引擎) |
| 索引创建 | CREATE INDEX 语句 |
create_index() 方法 |
进阶技巧与最佳实践
连接池管理
from DBUtils.PooledDB import PooledDB
pool = PooledDB(
creator=pymysql,
maxconnections=20, # 最大连接数
blocking=True, # 超限时等待而非报错
ping=0 # 检测无效连接的频率
)
# 使用时从池中获取连接
conn = pool.connection()
适用场景:高并发 Web 服务、长时间运行的任务队列
ORM 框架选择
| ORM | 特点 | 适用场景 |
|---|---|---|
| SQLAlchemy | 功能强大,支持多数据库,声明式映射 | 复杂业务系统,微服务架构 |
| Peewee | 轻量级,API 简洁,学习曲线平缓 | 中小型项目,快速迭代 |
| Django Model | 全功能集成,自带管理后台 | Django 项目标配 |
| Tortoise ORM | async/await 原生支持 | 异步编程需求 |
安全防护要点
- 禁止字符串拼接 SQL(永远使用参数化查询)
- 敏感信息移出代码(使用环境变量/Secret Manager)
- 最小权限原则分配数据库账号
- ️ 启用 SSL/TLS 加密传输(尤其公共网络环境)
常见错误排查手册
| 错误类型 | 典型特征 | 解决方案 |
|---|---|---|
| OperationalError | Access denied for user |
检查用户名/密码/主机权限 |
| ProgrammingError | syntax error at or near... |
检查 SQL 语法,使用三引号包裹长语句 |
| InterfaceError | Cursor closed |
确保连接未提前关闭 |
| DataError | invalid input syntax |
校验数据类型(如日期格式) |
| NotConnectedError | no connection to the server |
检查数据库服务是否启动 |
| DeprecationWarning | implicitly waiting |
显式指定等待超时时间 |
相关问答 FAQs
Q1: 为什么有时会出现 “No module named ‘XXX'” 的错误?
A: 这是最常见的导入错误,主要原因包括:①未安装对应库(需执行 pip install pymysql);②虚拟环境未激活;③拼写错误(注意大小写),建议使用 pip list 查看已安装包列表,并通过 python -m site 查看 sys.path 确认路径。
Q2: 如何处理大数据量的逐行处理以避免内存溢出?
A: 采用服务器端游标(Server-Side Cursor):
# PyMySQL 示例
with conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute("SELECT FROM huge_table")
while True:
row = cursor.fetchone()
if not row:
break
process(row) # 逐行处理逻辑
这种方式只会加载当前行到内存,特别适合百万级数据的导出/分析场景,对于 MongoDB,可使用 find().batch_size(100) 实现类似效果。
您已掌握 Python 与各类数据库交互的核心方法,实际开发中建议根据具体需求选择合适的数据库类型和驱动库,并始终遵循安全编码规范,如需进一步学习特定数据库的高级特性,可参考官方
