Python实现数据库数据导入的多种方式(含单行与批量,适用于SQL Server、MySQL)
几种将数据插入数据库的办法(单行插入、批量插入,针对SQL Server、MySQL,使用insert into)
常见的四种插入途径:
一、单行数据插入(构建insert into语句)
二、批量数据插入(构建insert into语句)
三、大量数据分批次插入(构建insert into语句)
四、借助pandas.DataFrame插入(支持单行与批量)
示例数据
columnsName = [
"SKU", "endCategoryName", "endCategoryID",
"rootCategoryName", "rootCategoryID", "CategoryTreeName", "CategoryTreeID"
]
# 定义值的列表
valueList = [
[19417978, "Nail Art Tools", 107876, "Health & Beauty", 26395,
"Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"],
[19418353, "Other Fitness, Running & Yoga", 13362, "Sporting Goods", 888,
"Sporting Goods>>Fitness, Running & Yoga>>Other Fitness, Running & Yoga", "888>>15273>>13362"],
[19418070, "Flags", 43533, "Garden & Patio", 159912, "Garden & Patio>>Décor>>Flags", "159912>>20498>>43533"],
[19417996, "Knitting Needles", 71215, "Crafts", 14339,
"Crafts>>Needlecrafts & Yarn>>Crocheting & Knitting>>Knitting Needles", "14339>>160706>>3094>>71215"],
[19418048, "Binders & Notebooks", 102950, "Home, Furniture & DIY", 11700,
"Home, Furniture & DIY>>Stationery & School Equipment>>Binders & Notebooks", "11700>>16092>>102950"]
]
零、构建插入引擎的方式
from sqlalchemy import create_engine
# 将特殊字符进行URL编码。若密码包含特殊字符,需先编码再传入。无特殊字符可省略。
from urllib.parse import quote_plus
# pandas.DataFrame会用到
import pandas as pd
# 批量插入需要用到text
from sqlalchemy import text
def createMyEngine():
# driver='mysql' # MySQL的插入驱动
driver='mssql' # SQL Server的插入驱动
host='10.10.13.11'
port=1433
user='testUser'
password='testUserPassword'
database='testDatabase'
charset='UTF-8'
if driver == 'mysql':
conn_str = (
f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset='utf8mb4'"
)
elif driver == 'mssql':
conn_str = (
f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?charset='UTF-8'"
)
else:
raise ValueError("不支持的驱动类型")
engine = create_engine(conn_str, pool_pre_ping=True)
return engine
一、单行数据插入(构建insert into语句)
通过构造insert into语句来插入数据,使用 %s 作为占位符,在execute方法中传入列表形式的数值作为参数。这种方式比较灵活,仅适用于单行数据插入
def insertData_OneByOne():
engine = createMyEngine()
tableName = 'test_table'
# values = [1478549, "Nail Art Tools", 107876, "Health & Beauty", 26395
# , "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"]
values = valueList[0]
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
# 执行数据插入操作
try:
with engine.connect() as connection:
connection.execute(sql, values)
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
二、批量数据插入(构建insert into语句)
通过构造insert into语句来插入数据,使用 :parameter 作为占位符,在execute方法中用 text 装饰插入语句,传入列表形式的数值作为参数。这种方式比较灵活,适用于小批量数据插入。构造过程相对复杂
def insertData_ManyByOne():
# 小批量数据插入
engine = createMyEngine()
tableName = 'test_table'
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (
:sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
)
"""
try:
with engine.connect() as connection:
connection.execute(text(sql), [
{
"sku": row[0],
"end_name": row[1],
"end_id": row[2],
"root_name": row[3],
"root_id": row[4],
"tree_name": row[5],
"tree_id": row[6]
}
for row in valueList # values 应为包含元组的可迭代对象
])
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
三、大量数据分批次插入(构建insert into语句)
基本思路是将大量数据分成多个小批次进行插入。这种方式比较灵活,适用于小批量数据插入。构造过程相对复杂
def insertData_SoManyByOne(longValueList, batchSize=100):
# 大量数据的分批插入
# 注意:占位符和插入的参数名需要一一对应。
engine = createMyEngine()
tableName = 'test_table'
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (
:sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
)
"""
for i in range(0, len(longValueList), batchSize):
batchList = longValueList[i:i + batchSize]
try:
with engine.connect() as connection:
connection.execute(text(sql), [
{
"sku": row[0],
"end_name": row[1],
"end_id": row[2],
"root_name": row[3],
"root_id": row[4],
"tree_name": row[5],
"tree_id": row[6]
}
for row in batchList # values 应为包含元组的可迭代对象
])
print(f"已提交批次 {i // batchSize + 1}/{(len(longValueList) + batchSize-1) // batchSize}")
except Exception as e:
print(f"插入失败:{str(e)}")
四、借助pandas.DataFrame插入(支持单行与批量)
基本思路是将数据组织成DataFrame后进行插入。这种方式适用于小批量插入,构造相对简单。但整批插入时,若出错会导致整个批次失败
def insertData_ByDataFrame():
# 定义列名
dataDF = pd.DataFrame(valueList, columns=columnsName)
engine = createMyEngine()
tableName = 'test_table'
try:
dataDF.to_sql(tableName, con=engine, if_exists='append', index=False)
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
附录:代码汇总
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:Windshield
# Date: 2023/2/6 15:52
# fileName: test.py
from sqlalchemy import create_engine
# 将特殊字符转成URL编码。若密码中存在特殊字符,则需要先进行URL编码再传入。没有可以不需要。
from urllib.parse import quote_plus
import pandas as pd
# 批量插入需要用到text
from sqlalchemy import text
def createMyEngine():
# driver='mysql' # MySQL的插入驱动
driver='mssql' # SQL Server的插入驱动
host='10.10.13.11'
port=1433
user='testUser'
password='testUserPassword'
database='testDatabase'
charset='UTF-8'
if driver == 'mysql':
conn_str = (
f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset='utf8mb4'"
)
elif driver == 'mssql':
conn_str = (
f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?charset='UTF-8'"
)
else:
raise ValueError("Unsupported driver")
engine = create_engine(conn_str, pool_pre_ping=True)
return engine
def insertData_OneByOne():
engine = createMyEngine()
tableName = 'test_table'
# values = [1478549, "Nail Art Tools", 107876, "Health & Beauty", 26395
# , "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"]
values = valueList[0]
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
# 执行插入操作
try:
with engine.connect() as connection:
connection.execute(sql, values)
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
def insertData_ManyByOne():
# 小批量插入
engine = createMyEngine()
tableName = 'test_table'
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (
:sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
)
"""
try:
with engine.connect() as connection:
connection.execute(text(sql), [
{
"sku": row[0],
"end_name": row[1],
"end_id": row[2],
"root_name": row[3],
"root_id": row[4],
"tree_name": row[5],
"tree_id": row[6]
}
for row in valueList # values 应为包含元组的可迭代对象
])
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
def insertData_SoManyByOne(longValueList, batchSize=100):
# 大量数据的分批插入
# 注:占位符和插入的参数名需要一一对应。
engine = createMyEngine()
tableName = 'test_table'
sql = f"""
INSERT INTO {tableName} (
SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID,
CategoryTreeName, CategoryTreeID
) VALUES (
:sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
)
"""
for i in range(0, len(longValueList), batchSize):
batchList = longValueList[i:i + batchSize]
try:
with engine.connect() as connection:
connection.execute(text(sql), [
{
"sku": row[0],
"end_name": row[1],
"end_id": row[2],
"root_name": row[3],
"root_id": row[4],
"tree_name": row[5],
"tree_id": row[6]
}
for row in batchList # values 应为包含元组的可迭代对象
])
print(f"已提交批次 {i // batchSize + 1}/{(len(longValueList) + batchSize-1) // batchSize}")
except Exception as e:
print(f"插入失败:{str(e)}")
def insertData_ByDataFrame():
# 定义列名
dataDF = pd.DataFrame(valueList, columns=columnsName)
engine = createMyEngine()
tableName = 'test_table'
try:
dataDF.to_sql(tableName, con=engine, if_exists='append', index=False)
print("数据插入成功!")
except Exception as e:
print(f"插入失败:{str(e)}")
columnsName = [
"SKU", "endCategoryName", "endCategoryID",
"rootCategoryName", "rootCategoryID", "CategoryTreeName", "CategoryTreeID"
]
# 定义值列表
valueList = [
[19417978, "Nail Art Tools", 107876, "Health & Beauty", 26395,
"Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"],
[19418353, "Other Fitness, Running & Yoga", 13362, "Sporting Goods", 888,
"Sporting Goods>>Fitness, Running & Yoga>>Other Fitness, Running & Yoga", "888>>15273>>13362"],
[19418070, "Flags", 43533, "Garden & Patio", 159912, "Garden & Patio>>Décor>>Flags", "159912>>20498>>43533"],
[19417996, "Knitting Needles", 71215, "Crafts", 14339,
"Crafts>>Needlecrafts & Yarn>>Crocheting & Knitting>>Knitting Needles", "14339>>160706>>3094>>71215"],
[19418048, "Binders & Notebooks", 102950, "Home, Furniture & DIY", 11700,
"Home, Furniture & DIY>>Stationery & School Equipment>>Binders & Notebooks", "11700>>16092>>102950"]
]
if __name__ == '__main__':
pass
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13630.html