Python实现数据库数据录入的多元途径(涵盖单行与批量,适用于SQL Server、MySQL)

Python实现数据库数据导入的多种方式(含单行与批量,适用于SQL Server、MySQL)

几种将数据插入数据库的办法(单行插入、批量插入,针对SQL Server、MySQL,使用insert into)

常见的四种插入途径:
一、单行数据插入(构建insert into语句)
二、批量数据插入(构建insert into语句)
三、大量数据分批次插入(构建insert into语句)
四、借助pandas.DataFrame插入(支持单行与批量)

示例数据

columnsName = [
    "SKU", "endCategoryName", "endCategoryID",
    "rootCategoryName", "rootCategoryID", "CategoryTreeName", "CategoryTreeID"
]

# 定义值的列表
valueList = [
    [19417978, "Nail Art Tools", 107876, "Health & Beauty", 26395,
     "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"],
    [19418353, "Other Fitness, Running & Yoga", 13362, "Sporting Goods", 888,
     "Sporting Goods>>Fitness, Running & Yoga>>Other Fitness, Running & Yoga", "888>>15273>>13362"],
    [19418070, "Flags", 43533, "Garden & Patio", 159912, "Garden & Patio>>Décor>>Flags", "159912>>20498>>43533"],
    [19417996, "Knitting Needles", 71215, "Crafts", 14339,
     "Crafts>>Needlecrafts & Yarn>>Crocheting & Knitting>>Knitting Needles", "14339>>160706>>3094>>71215"],
    [19418048, "Binders & Notebooks", 102950, "Home, Furniture & DIY", 11700,
     "Home, Furniture & DIY>>Stationery & School Equipment>>Binders & Notebooks", "11700>>16092>>102950"]
]

零、构建插入引擎的方式

from sqlalchemy import create_engine
# 将特殊字符进行URL编码。若密码包含特殊字符,需先编码再传入。无特殊字符可省略。
from urllib.parse import quote_plus
# pandas.DataFrame会用到
import pandas as pd
# 批量插入需要用到text
from sqlalchemy import text


def createMyEngine():
    # driver='mysql'    # MySQL的插入驱动
    driver='mssql'      # SQL Server的插入驱动
    host='10.10.13.11'
    port=1433
    user='testUser'
    password='testUserPassword'
    database='testDatabase'
    charset='UTF-8'
    if driver == 'mysql':
        conn_str = (
            f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset='utf8mb4'"
        )
    elif driver == 'mssql':
        conn_str = (
            f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?charset='UTF-8'"
        )
    else:
        raise ValueError("不支持的驱动类型")
    engine = create_engine(conn_str, pool_pre_ping=True)
    return engine

一、单行数据插入(构建insert into语句)

通过构造insert into语句来插入数据,使用 %s 作为占位符,在execute方法中传入列表形式的数值作为参数。这种方式比较灵活,仅适用于单行数据插入

def insertData_OneByOne():
    engine = createMyEngine()
    tableName = 'test_table'
    # values = [1478549, "Nail Art Tools", 107876, "Health & Beauty", 26395
    #     , "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"]
    values = valueList[0]
    sql = f"""
                INSERT INTO {tableName} (
                    SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                """
    # 执行数据插入操作
    try:
        with engine.connect() as connection:
            connection.execute(sql, values)
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")

二、批量数据插入(构建insert into语句)

通过构造insert into语句来插入数据,使用 :parameter 作为占位符,在execute方法中用 text 装饰插入语句,传入列表形式的数值作为参数。这种方式比较灵活,适用于小批量数据插入。构造过程相对复杂

def insertData_ManyByOne():
    # 小批量数据插入
    engine = createMyEngine()
    tableName = 'test_table'
    sql = f"""
        INSERT INTO {tableName} (
           SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
        ) VALUES (
            :sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
        )
    """
    try:
        with engine.connect() as connection:
            connection.execute(text(sql), [
                {
                    "sku": row[0],
                    "end_name": row[1],
                    "end_id": row[2],
                    "root_name": row[3],
                    "root_id": row[4],
                    "tree_name": row[5],
                    "tree_id": row[6]
                }
                for row in valueList  # values 应为包含元组的可迭代对象
            ])
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")

三、大量数据分批次插入(构建insert into语句)

基本思路是将大量数据分成多个小批次进行插入。这种方式比较灵活,适用于小批量数据插入。构造过程相对复杂

def insertData_SoManyByOne(longValueList, batchSize=100):
    # 大量数据的分批插入
    # 注意:占位符和插入的参数名需要一一对应。
    engine = createMyEngine()
    tableName = 'test_table'
    sql = f"""
        INSERT INTO {tableName} (
           SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
        ) VALUES (
            :sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
        )
    """
    for i in range(0, len(longValueList), batchSize):
        batchList = longValueList[i:i + batchSize]
        try:
            with engine.connect() as connection:
                connection.execute(text(sql), [
                    {
                        "sku": row[0],
                        "end_name": row[1],
                        "end_id": row[2],
                        "root_name": row[3],
                        "root_id": row[4],
                        "tree_name": row[5],
                        "tree_id": row[6]
                    }
                    for row in batchList  # values 应为包含元组的可迭代对象
                ])
            print(f"已提交批次 {i // batchSize + 1}/{(len(longValueList) + batchSize-1) // batchSize}")
        except Exception as e:
            print(f"插入失败:{str(e)}")

四、借助pandas.DataFrame插入(支持单行与批量)

基本思路是将数据组织成DataFrame后进行插入。这种方式适用于小批量插入,构造相对简单。但整批插入时,若出错会导致整个批次失败

def insertData_ByDataFrame():
    # 定义列名
    dataDF = pd.DataFrame(valueList, columns=columnsName)
    engine = createMyEngine()
    tableName = 'test_table'
    try:
        dataDF.to_sql(tableName, con=engine, if_exists='append', index=False)
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")

附录:代码汇总

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:Windshield
# Date: 2023/2/6 15:52
# fileName: test.py
from sqlalchemy import create_engine
# 将特殊字符转成URL编码。若密码中存在特殊字符,则需要先进行URL编码再传入。没有可以不需要。
from urllib.parse import quote_plus
import pandas as pd
# 批量插入需要用到text
from sqlalchemy import text


def createMyEngine():
    # driver='mysql'    # MySQL的插入驱动
    driver='mssql'      # SQL Server的插入驱动
    host='10.10.13.11'
    port=1433
    user='testUser'
    password='testUserPassword'
    database='testDatabase'
    charset='UTF-8'
    if driver == 'mysql':
        conn_str = (
            f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset='utf8mb4'"
        )
    elif driver == 'mssql':
        conn_str = (
            f"mssql+pymssql://{user}:{password}@{host}:{port}/{database}?charset='UTF-8'"
        )
    else:
        raise ValueError("Unsupported driver")
    engine = create_engine(conn_str, pool_pre_ping=True)
    return engine


def insertData_OneByOne():
    engine = createMyEngine()
    tableName = 'test_table'
    # values = [1478549, "Nail Art Tools", 107876, "Health & Beauty", 26395
    #     , "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"]
    values = valueList[0]
    sql = f"""
                INSERT INTO {tableName} (
                    SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                """
    # 执行插入操作
    try:
        with engine.connect() as connection:
            connection.execute(sql, values)
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")


def insertData_ManyByOne():
    # 小批量插入
    engine = createMyEngine()
    tableName = 'test_table'
    sql = f"""
        INSERT INTO {tableName} (
           SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
        ) VALUES (
            :sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
        )
    """
    try:
        with engine.connect() as connection:
            connection.execute(text(sql), [
                {
                    "sku": row[0],
                    "end_name": row[1],
                    "end_id": row[2],
                    "root_name": row[3],
                    "root_id": row[4],
                    "tree_name": row[5],
                    "tree_id": row[6]
                }
                for row in valueList  # values 应为包含元组的可迭代对象
            ])
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")


def insertData_SoManyByOne(longValueList, batchSize=100):
    # 大量数据的分批插入
    # 注:占位符和插入的参数名需要一一对应。
    engine = createMyEngine()
    tableName = 'test_table'
    sql = f"""
        INSERT INTO {tableName} (
           SKU, endCategoryName, endCategoryID, rootCategoryName, rootCategoryID, 
                    CategoryTreeName, CategoryTreeID
        ) VALUES (
            :sku, :end_name, :end_id, :root_name, :root_id, :tree_name, :tree_id
        )
    """
    for i in range(0, len(longValueList), batchSize):
        batchList = longValueList[i:i + batchSize]
        try:
            with engine.connect() as connection:
                connection.execute(text(sql), [
                    {
                        "sku": row[0],
                        "end_name": row[1],
                        "end_id": row[2],
                        "root_name": row[3],
                        "root_id": row[4],
                        "tree_name": row[5],
                        "tree_id": row[6]
                    }
                    for row in batchList  # values 应为包含元组的可迭代对象
                ])
            print(f"已提交批次 {i // batchSize + 1}/{(len(longValueList) + batchSize-1) // batchSize}")
        except Exception as e:
            print(f"插入失败:{str(e)}")


def insertData_ByDataFrame():
    # 定义列名
    dataDF = pd.DataFrame(valueList, columns=columnsName)
    engine = createMyEngine()
    tableName = 'test_table'
    try:
        dataDF.to_sql(tableName, con=engine, if_exists='append', index=False)
        print("数据插入成功!")
    except Exception as e:
        print(f"插入失败:{str(e)}")


columnsName = [
    "SKU", "endCategoryName", "endCategoryID",
    "rootCategoryName", "rootCategoryID", "CategoryTreeName", "CategoryTreeID"
]

# 定义值列表
valueList = [
    [19417978, "Nail Art Tools", 107876, "Health & Beauty", 26395,
     "Health & Beauty>>Nail Care, Manicure & Pedicure>>Nail Art>>Nail Art Tools", "26395>>47945>>260764>>107876"],
    [19418353, "Other Fitness, Running & Yoga", 13362, "Sporting Goods", 888,
     "Sporting Goods>>Fitness, Running & Yoga>>Other Fitness, Running & Yoga", "888>>15273>>13362"],
    [19418070, "Flags", 43533, "Garden & Patio", 159912, "Garden & Patio>>Décor>>Flags", "159912>>20498>>43533"],
    [19417996, "Knitting Needles", 71215, "Crafts", 14339,
     "Crafts>>Needlecrafts & Yarn>>Crocheting & Knitting>>Knitting Needles", "14339>>160706>>3094>>71215"],
    [19418048, "Binders & Notebooks", 102950, "Home, Furniture & DIY", 11700,
     "Home, Furniture & DIY>>Stationery & School Equipment>>Binders & Notebooks", "11700>>16092>>102950"]
]


if __name__ == '__main__':
    pass

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13630.html

(0)
LomuLomu
上一篇 2025 年 9 月 18 日
下一篇 2025 年 9 月 18 日

相关推荐

  • 详细解读2025最新clion激活码与新手友好clion破解教程

    声明:以下破解补丁与激活码均源自网络公开分享,仅限个人学习研究,禁止商业用途。若条件允许,请支持正版 JetBrains! CLion 是 JetBrains 专为 C/C++ 打造的跨平台 IDE,支持 Windows、macOS 与 Linux。下文将以图文形式演示如何借助破解补丁实现“永久激活”,解锁全部高级特性。 无论您使用哪个版本、哪种系统,步骤都…

    2025 年 10 月 20 日
    30700
  • IDEA激活码管理与破解授权详解

    本教程适用于IDEA、PyCharm、DataGrip、Goland等,支持Jetbrains全家桶! 废话不多说,先上最新 IDEA 版本破解成功的截图,如下,可以看到已经成功破解到 2099 年辣,舒服! 接下来,我就将通过图文的方式, 来详细讲解如何激活 IDEA至 2099 年。 当然这个激活方法,同样适用于之前的旧版本! 不管你是什么操作系统,什么…

    IDEA破解教程 2025 年 12 月 9 日
    18100
  • pycharm破解教程合集+激活码资源同步

    声明:本教程所涉及的 PyCharm 破解补丁与激活码均源自网络收集,仅供个人学习参考,禁止商业用途。如有版权争议,请及时联系作者删除。若经济条件允许,强烈建议购买官方正版授权! 废话少说,先放一张 PyCharm 2025.2.1 成功激活到 2099 年的截图镇楼,爽到飞起! 下面用图文结合的方式,手把手教你搞定最新版 PyCharm 的破解流程。 嫌折…

    PyCharm激活码 2025 年 11 月 25 日
    17100
  • 基于源码分析 SHOW GLOBAL STATUS 的实现原理

    问题 在 MySQL 中,查询全局状态变量的方式一般有两种:SHOW GLOBAL STATUS和performance_schema.global_status。 但不知道大家注意到没有,performance_schema.global_status 返回的状态变量数要远远少于 SHOW GLOBAL STATUS 。 具体来说, 在 MySQL 8.4…

    未分类 2025 年 1 月 12 日
    61400
  • 永久pycharm激活码离线教程搭配最新破解使用

    本方法适用于JetBrains全系列开发工具,包括PyCharm、IntelliJ IDEA、DataGrip、Goland等所有产品! 话不多说,先来看最新版PyCharm破解成功的界面截图,可以看到许可证有效期已经激活到2099年,非常稳定! 接下来,我将通过图文详解的方式,手把手教你如何将PyCharm激活到2099年。这个激活方案同样兼容历史旧版本!…

    PyCharm激活码 2026 年 2 月 12 日
    19900

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信