最近用到了数据库存储,代码记不住,写一个记录
环境准备
这里默认你安装了MySQL,笔者使用的是MySQL的最新版,容器部署,端口是3306
pip install mysql-connector-python
连接数据库
import mysql.connector
from mysql.connector import Error# 配置信息
DB_CONFIG = {'host': 'localhost','port': 3306,'user': 'root','password': 'your_password', #输入你的密码'charset': 'utf8mb4'
}def create_connection(database=None):"""创建数据库连接"""config = DB_CONFIG.copy()if database:config['database'] = databasetry:conn = mysql.connector.connect(**config)print("连接成功")return connexcept Error as e:print(f"连接失败: {e}")return None
charset='utf8mb4':支持完整 Unicode(包括 emoji)database参数可选:建库时先不指定数据库,建表时再指定- 使用
try-except捕获连接异常
创建数据库
DATABASE_NAME = 'test_db'def create_database():"""创建数据库(如果不存在)"""conn = create_connection() # 不指定数据库if not conn:returncursor = conn.cursor()try:cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME} "f"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")conn.commit()print(f"数据库 '{DATABASE_NAME}' 创建成功(或已存在)")except Error as e:print(f"建库失败: {e}")finally:cursor.close()conn.close()
创建数据表
def create_table():"""创建数据表(如果不存在)"""conn = create_connection(DATABASE_NAME) # 指定数据库if not conn:returncursor = conn.cursor()try:cursor.execute("""CREATE TABLE IF NOT EXISTS users (id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',username VARCHAR(50) NOT NULL UNIQUE COMMENT '用户名',email VARCHAR(100) COMMENT '邮箱',age INT DEFAULT 0 COMMENT '年龄',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表'""")conn.commit()print("数据表 'users' 创建成功(或已存在)")except Error as e:print(f"建表失败: {e}")finally:cursor.close()conn.close()
ENGINE=InnoDB:支持事务、行级锁、外键IF NOT EXISTS:幂等操作,重复执行不报错finally中关闭 cursor 和 connection,防止资源泄漏
使用示例
if __name__ == '__main__':create_database()create_table()
插入数据(增)
def insert_user(username, email, age):"""插入单条记录"""conn = create_connection(DATABASE_NAME)if not conn:return Nonecursor = conn.cursor()try:sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"cursor.execute(sql, (username, email, age))conn.commit()print(f"插入成功,ID: {cursor.lastrowid}")return cursor.lastrowidexcept Error as e:print(f"插入失败: {e}")conn.rollback() # 出错回滚return Nonefinally:cursor.close()conn.close()
def insert_many_users(users_list):"""批量插入(效率更高)"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"# users_list: [('alice', 'alice@example.com', 25), ('bob', 'bob@example.com', 30)]cursor.executemany(sql, users_list)conn.commit()print(f"批量插入成功,共 {cursor.rowcount} 条记录")except Error as e:print(f"批量插入失败: {e}")conn.rollback()finally:cursor.close()conn.close()
调用示例
# 单条插入
insert_user("alice", "alice@example.com", 25)# 批量插入
users_data = [("charlie", "charlie@example.com", 35),("diana", "diana@example.com", 28),
]
insert_many_users(users_data)
- 必须使用
%s占位符:防止 SQL 注入 - 禁止字符串拼接 SQL:
f"INSERT ... VALUES ('{username}')"错误 - 出错时
rollback():保证数据一致性
删除数据(删)
def delete_user(user_id):"""根据 ID 删除"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "DELETE FROM users WHERE id = %s"cursor.execute(sql, (user_id,))conn.commit()if cursor.rowcount > 0:print(f"删除成功,影响 {cursor.rowcount} 行")else:print("未找到该用户")except Error as e:print(f"删除失败: {e}")conn.rollback()finally:cursor.close()conn.close()
def delete_users_by_age(max_age):"""删除年龄大于指定值的用户"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "DELETE FROM users WHERE age > %s"cursor.execute(sql, (max_age,))conn.commit()print(f"删除了 {cursor.rowcount} 条年龄大于 {max_age} 的记录")except Error as e:print(f"删除失败: {e}")conn.rollback()finally:cursor.close()conn.close()
| 场景 | rowcount 值 |
|---|---|
| 删除成功 | 实际删除的行数 |
| 条件不匹配 | 0 |
| 执行失败 | -1 |
使用示例:
print("\n--- 删除 ---")
delete_user(2)
查询数据(查)
def get_user_by_id(user_id):"""根据 ID 查询单条"""conn = create_connection(DATABASE_NAME)if not conn:return Nonecursor = conn.cursor(dictionary=True) # 返回字典格式try:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))result = cursor.fetchone()print(f"查询结果: {result}")return resultexcept Error as e:print(f"查询失败: {e}")return Nonefinally:cursor.close()conn.close()
def get_all_users(age_min=None, keyword=None):"""多条件查询"""conn = create_connection(DATABASE_NAME)if not conn:return []cursor = conn.cursor(dictionary=True)try:conditions = []params = []sql = "SELECT * FROM users WHERE 1=1"if age_min is not None:sql += " AND age >= %s"params.append(age_min)if keyword:sql += " AND (username LIKE %s OR email LIKE %s)"params.extend([f"%{keyword}%", f"%{keyword}%"])sql += " ORDER BY created_at DESC"cursor.execute(sql, params)results = cursor.fetchall()print(f"共查询到 {len(results)} 条记录")for row in results:print(row)return resultsexcept Error as e:print(f"查询失败: {e}")return []finally:cursor.close()conn.close()
| 方法 | 返回 | 适用场景 |
|---|---|---|
fetchone() |
单条记录 / None | 按唯一键查询 |
fetchall() |
列表(可能为空) | 多条记录 |
fetchmany(size) |
指定数量 | 分页查询 |
使用示例
# 4. 查
print("\n--- 查询单条 ---")
get_user_by_id(1)print("\n--- 条件查询 ---")
get_all_users(age_min=25, keyword="a")
更新数据(改)
def update_user(user_id, **kwargs):"""动态更新字段"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:# 构建动态 SQLfields = []values = []for key, value in kwargs.items():if value is not None:fields.append(f"{key} = %s")values.append(value)if not fields:print("没有要更新的字段")returnvalues.append(user_id)sql = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"cursor.execute(sql, values)conn.commit()if cursor.rowcount > 0:print(f"更新成功,影响 {cursor.rowcount} 行")else:print("未找到该用户或无变化")except Error as e:print(f"更新失败: {e}")conn.rollback()finally:cursor.close()conn.close()
使用示例
# 5. 改
print("\n--- 更新 ---")
update_user(1, email="alice_new@example.com", age=26)
# 6. 查验证更新
print("\n--- 更新后查询 ---")
get_user_by_id(1)
事务处理
def transfer_data():"""事务处理示例"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:conn.start_transaction()# 一系列操作cursor.execute("INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",('tx_user', 'tx@test.com', 20))cursor.execute("UPDATE users SET age = age + 1 WHERE username = %s", ('tx_user',))conn.commit()print("事务提交成功")except Error as e:print(f"事务失败,回滚: {e}")conn.rollback()finally:cursor.close()conn.close()
完整代码
import mysql.connector
from mysql.connector import Error# ==================== 配置信息 ====================
DB_CONFIG = {'host': 'localhost', # MySQL 服务器地址'port': 3306, # 端口'user': 'root', # 用户名'password': 'your_password', # 密码'charset': 'utf8mb4'
}DATABASE_NAME = 'test_db'# ==================== 连接工具函数 ====================
def create_connection(database=None):"""创建数据库连接"""config = DB_CONFIG.copy()if database:config['database'] = databasetry:conn = mysql.connector.connect(**config)return connexcept Error as e:print(f"连接失败: {e}")return None# ==================== 1. 建库 ====================
def create_database():"""创建数据库"""conn = create_connection()if not conn:returncursor = conn.cursor()try:# 创建数据库(如果不存在)cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")conn.commit()print(f"数据库 '{DATABASE_NAME}' 创建成功(或已存在)")except Error as e:print(f"建库失败: {e}")finally:cursor.close()conn.close()# ==================== 2. 建表 ====================
def create_table():"""创建数据表"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:cursor.execute("""CREATE TABLE IF NOT EXISTS users (id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',username VARCHAR(50) NOT NULL UNIQUE COMMENT '用户名',email VARCHAR(100) COMMENT '邮箱',age INT DEFAULT 0 COMMENT '年龄',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表'""")conn.commit()print("数据表 'users' 创建成功(或已存在)")except Error as e:print(f"建表失败: {e}")finally:cursor.close()conn.close()# ==================== 3. 增(INSERT)====================
def insert_user(username, email, age):"""插入单条记录"""conn = create_connection(DATABASE_NAME)if not conn:return Nonecursor = conn.cursor()try:sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"cursor.execute(sql, (username, email, age))conn.commit()print(f"插入成功,ID: {cursor.lastrowid}")return cursor.lastrowidexcept Error as e:print(f"插入失败: {e}")conn.rollback()return Nonefinally:cursor.close()conn.close()def insert_many_users(users_list):"""批量插入(效率更高)"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"# users_list: [('alice', 'alice@example.com', 25), ('bob', 'bob@example.com', 30)]cursor.executemany(sql, users_list)conn.commit()print(f"批量插入成功,共 {cursor.rowcount} 条记录")except Error as e:print(f"批量插入失败: {e}")conn.rollback()finally:cursor.close()conn.close()# ==================== 4. 删(DELETE)====================
def delete_user(user_id):"""根据ID删除"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "DELETE FROM users WHERE id = %s"cursor.execute(sql, (user_id,))conn.commit()if cursor.rowcount > 0:print(f"删除成功,影响 {cursor.rowcount} 行")else:print("未找到该用户")except Error as e:print(f"删除失败: {e}")conn.rollback()finally:cursor.close()conn.close()def delete_users_by_age(max_age):"""条件删除"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:sql = "DELETE FROM users WHERE age > %s"cursor.execute(sql, (max_age,))conn.commit()print(f"删除了 {cursor.rowcount} 条年龄大于 {max_age} 的记录")except Error as e:print(f"删除失败: {e}")conn.rollback()finally:cursor.close()conn.close()# ==================== 5. 查(SELECT)====================
def get_user_by_id(user_id):"""根据ID查询单条"""conn = create_connection(DATABASE_NAME)if not conn:return Nonecursor = conn.cursor(dictionary=True) # 返回字典格式try:sql = "SELECT * FROM users WHERE id = %s"cursor.execute(sql, (user_id,))result = cursor.fetchone()print(f"查询结果: {result}")return resultexcept Error as e:print(f"查询失败: {e}")return Nonefinally:cursor.close()conn.close()def get_all_users(age_min=None, keyword=None):"""多条件查询"""conn = create_connection(DATABASE_NAME)if not conn:return []cursor = conn.cursor(dictionary=True)try:conditions = []params = []sql = "SELECT * FROM users WHERE 1=1"if age_min is not None:sql += " AND age >= %s"params.append(age_min)if keyword:sql += " AND (username LIKE %s OR email LIKE %s)"params.extend([f"%{keyword}%", f"%{keyword}%"])sql += " ORDER BY created_at DESC"cursor.execute(sql, params)results = cursor.fetchall()print(f"共查询到 {len(results)} 条记录")for row in results:print(row)return resultsexcept Error as e:print(f"查询失败: {e}")return []finally:cursor.close()conn.close()# ==================== 6. 改(UPDATE)====================
def update_user(user_id, **kwargs):"""动态更新字段"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:# 构建动态SQLfields = []values = []for key, value in kwargs.items():if value is not None:fields.append(f"{key} = %s")values.append(value)if not fields:print("没有要更新的字段")returnvalues.append(user_id)sql = f"UPDATE users SET {', '.join(fields)} WHERE id = %s"cursor.execute(sql, values)conn.commit()if cursor.rowcount > 0:print(f"更新成功,影响 {cursor.rowcount} 行")else:print("未找到该用户或无变化")except Error as e:print(f"更新失败: {e}")conn.rollback()finally:cursor.close()conn.close()# ==================== 7. 事务示例 ====================
def transfer_data():"""事务处理示例"""conn = create_connection(DATABASE_NAME)if not conn:returncursor = conn.cursor()try:conn.start_transaction()# 一系列操作cursor.execute("INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",('tx_user', 'tx@test.com', 20))cursor.execute("UPDATE users SET age = age + 1 WHERE username = %s", ('tx_user',))conn.commit()print("事务提交成功")except Error as e:print(f"事务失败,回滚: {e}")conn.rollback()finally:cursor.close()conn.close()# ==================== 主程序 ====================
if __name__ == "__main__":# 1. 建库create_database()# 2. 建表create_table()# 3. 增insert_user("alice", "alice@example.com", 25)insert_user("bob", "bob@example.com", 30)# 批量插入users_data = [("charlie", "charlie@example.com", 35),("diana", "diana@example.com", 28),]insert_many_users(users_data)# 4. 查print("\n--- 查询单条 ---")get_user_by_id(1)print("\n--- 条件查询 ---")get_all_users(age_min=25, keyword="a")# 5. 改print("\n--- 更新 ---")update_user(1, email="alice_new@example.com", age=26)# 6. 查验证更新print("\n--- 更新后查询 ---")get_user_by_id(1)# 7. 删print("\n--- 删除 ---")delete_user(2)
执行结果:
数据库 'test_db' 创建成功(或已存在)
数据表 'users' 创建成功(或已存在)
插入成功,ID: 1
插入成功,ID: 2
批量插入成功,共 2 条记录--- 查询单条 ---
查询结果: {'id': 1, 'username': 'alice', 'email': 'alice@example.com', 'age': 25, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}--- 条件查询 ---
共查询到 4 条记录
{'id': 1, 'username': 'alice', 'email': 'alice@example.com', 'age': 25, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}
{'id': 2, 'username': 'bob', 'email': 'bob@example.com', 'age': 30, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}
{'id': 3, 'username': 'charlie', 'email': 'charlie@example.com', 'age': 35, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}
{'id': 4, 'username': 'diana', 'email': 'diana@example.com', 'age': 28, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}--- 更新 ---
更新成功,影响 1 行--- 更新后查询 ---
查询结果: {'id': 1, 'username': 'alice', 'email': 'alice_new@example.com', 'age': 26, 'created_at': datetime.datetime(2026, 5, 3, 18, 21, 34), 'updated_at': datetime.datetime(2026, 5, 3, 18, 21, 34)}--- 删除 ---
删除成功,影响 1 行Process finished with exit code 0
