python自定義資料庫工具類

2021-09-27 09:16:24 字數 4063 閱讀 9803

# -*- coding:utf-8 -*-

import pymysql

import pandas as pd

import pymysql.cursors

from sqlalchemy import create_engine

import io

class mysql:

def __init__(self, host, port, dbuser, dbpwd, dbname):

self.host = host

self.port = port

self.dbuser = dbuser

self.dbpwd = dbpwd

self.dbname = dbname

self._conn = none

self.connect() # 預設鏈結資料庫, 最後要呼叫關閉鏈結的函式con_close()函式

def create_engine(self):

db_url = ('://:@:/?charset=utf8'.format(driver='mysql+pymysql', user=self.dbuser, port=self.port, pwd=self.dbpwd, host=self.host, name=self.dbname))

self.engine = create_engine(db_url)

def connect(self):

if not self._conn:

# print('\033[33m 已建立資料庫鏈結.......... \033[5m')

self._conn = pymysql.connect(host=self.host,

port=self.port,

user=self.dbuser,

passwd=self.dbpwd,

db=self.dbname,

use_unicode=true,

charset='utf8',

cursorclass=pymysql.cursors.dictcursor) # cursorclass=pymysql.cursors.dictcursor

# mysqldb預設查詢結果都是返回tuple,輸出時候不是很方便,必須按照0,1這樣讀取,

# 無意中在網上找到簡單的修改方法,就是傳遞乙個cursors.dictcursor就行。

else:

print('\033[33m 鏈結資料庫已存在,請直接操作!\033[5m')

pass

def con_close(self):

if self._conn:

# print('\033[33m 資料庫鏈結已關閉!!! \033[5m')

self._conn.close()

self._conn = none

pass

pass

def newcursor(self):

cur = self._conn.cursor()

if cur:

return cur

else:

print('\033[33m #error# get new cursor failed \033[5m')

return none

pass

def delcursor(self, cur):

if cur:

cur.close()

pass

pass

def export(self, sql, file_name, colfg='\t\t'):

"""將查詢結果匯出到外部檔案中

:param sql:

:param file_name:

:param colfg:

:return:

"""rt = self.query(sql)

if rt:

with open(file_name, 'w') as fd:

for row in rt:

in_info = ''

for col in row:

in_info += str(col) + colfg

in_info += '\n'

fd.write(in_info)

def query(self, sql, debug=0):

"""對資料庫執行查詢操作,sql為所需要執行的查詢語句,debug控制是否顯示查詢的表結構(預設不查詢即debug=0)

:param sql:

:param debug:

:return:

"""rt =

cur = self.newcursor()

if not cur:

return rt

try:

cur.execute(sql)

while 1:

ts = cur.fetchone()

if ts is none:

break

if debug:

filename =

for field in cur.description:

print(filename)

# print('\033[33m query success!!! \033[5m')

except exception as err:

print(err)

print('\033[33m exec sql failed: "" \033[5m'.format(sql))

finally:

self.delcursor(cur)

return rt

def exec(self, sql):

"""對資料庫執行增、刪、改操作,傳入需要執行的sql語句

:param sql:

:return:

"""rt = none

cur = self.newcursor()

if not cur:

return rt

try:

cur.execute(sql)

# print('\033[33m update/delete/insert success! \033[5m')

self._conn.commit()

except exception as err:

print('\033[33m exec sql failed: "" \033[5m'.format(sql))

print(err)

cur.rollback()

finally:

self.delcursor(cur)

return rt

def dataframe(self,sql):

"""將sql語句讀進pandas的資料框中

:param sql:

:return:

"""if self._conn:

pass

else:

self.connect()

data=pd.read_sql(sql,self._conn)

self.con_close()

return data

if dbname !="":

self.dbname = dbname

else:

pass

engine = self.create_engine()

# create_engine("mysql+mysqldb://root:123@localhost:3306/"+self.dbname +"?charset=utf8")

output = io.stringio()

df.to_csv(output, index=none, encoding='utf_8_sig')

output.seek(0)

df1 = pd.read_csv(output)

df1.to_sql(name=table, con=engine, if_exists=style, index=false, index_label=false,chunksize=10000000)

Python 自定義資料庫連線類

建立乙個配置檔案 e mypy mycnf.txt mssql host hzc user kk pwd kk dbname demodb資料庫連線類 e mypy mssql.py coding utf 8 python 3.5 import sys import pymssql import c...

自定義日誌工具類

import android.util.log public class logutil public static void d string tag,string msg public static void i string tag,string msg public static void ...

python自定義類

設計乙個person類的3種方式 1 使用內建型別list person mike 23,male 0 姓名,1 年紀,2 性別 print person 0 person 1 person 2 2 使用字典型別dic person1 person2 print person1 name perso...