# -*- coding:utf-8 -*-
import pymysql
import pandas as pd
import pymysql.cursors
from sqlalchemy import create_engine
import io
class mysql:
def __init__(self, host, port, dbuser, dbpwd, dbname):
self.host = host
self.port = port
self.dbuser = dbuser
self.dbpwd = dbpwd
self.dbname = dbname
self._conn = none
self.connect() # 預設鏈結資料庫, 最後要呼叫關閉鏈結的函式con_close()函式
def create_engine(self):
db_url = ('://:@:/?charset=utf8'.format(driver='mysql+pymysql', user=self.dbuser, port=self.port, pwd=self.dbpwd, host=self.host, name=self.dbname))
self.engine = create_engine(db_url)
def connect(self):
if not self._conn:
# print('\033[33m 已建立資料庫鏈結.......... \033[5m')
self._conn = pymysql.connect(host=self.host,
port=self.port,
user=self.dbuser,
passwd=self.dbpwd,
db=self.dbname,
use_unicode=true,
charset='utf8',
cursorclass=pymysql.cursors.dictcursor) # cursorclass=pymysql.cursors.dictcursor
# mysqldb預設查詢結果都是返回tuple,輸出時候不是很方便,必須按照0,1這樣讀取,
# 無意中在網上找到簡單的修改方法,就是傳遞乙個cursors.dictcursor就行。
else:
print('\033[33m 鏈結資料庫已存在,請直接操作!\033[5m')
pass
def con_close(self):
if self._conn:
# print('\033[33m 資料庫鏈結已關閉!!! \033[5m')
self._conn.close()
self._conn = none
pass
pass
def newcursor(self):
cur = self._conn.cursor()
if cur:
return cur
else:
print('\033[33m #error# get new cursor failed \033[5m')
return none
pass
def delcursor(self, cur):
if cur:
cur.close()
pass
pass
def export(self, sql, file_name, colfg='\t\t'):
"""將查詢結果匯出到外部檔案中
:param sql:
:param file_name:
:param colfg:
:return:
"""rt = self.query(sql)
if rt:
with open(file_name, 'w') as fd:
for row in rt:
in_info = ''
for col in row:
in_info += str(col) + colfg
in_info += '\n'
fd.write(in_info)
def query(self, sql, debug=0):
"""對資料庫執行查詢操作,sql為所需要執行的查詢語句,debug控制是否顯示查詢的表結構(預設不查詢即debug=0)
:param sql:
:param debug:
:return:
"""rt =
cur = self.newcursor()
if not cur:
return rt
try:
cur.execute(sql)
while 1:
ts = cur.fetchone()
if ts is none:
break
if debug:
filename =
for field in cur.description:
print(filename)
# print('\033[33m query success!!! \033[5m')
except exception as err:
print(err)
print('\033[33m exec sql failed: "" \033[5m'.format(sql))
finally:
self.delcursor(cur)
return rt
def exec(self, sql):
"""對資料庫執行增、刪、改操作,傳入需要執行的sql語句
:param sql:
:return:
"""rt = none
cur = self.newcursor()
if not cur:
return rt
try:
cur.execute(sql)
# print('\033[33m update/delete/insert success! \033[5m')
self._conn.commit()
except exception as err:
print('\033[33m exec sql failed: "" \033[5m'.format(sql))
print(err)
cur.rollback()
finally:
self.delcursor(cur)
return rt
def dataframe(self,sql):
"""將sql語句讀進pandas的資料框中
:param sql:
:return:
"""if self._conn:
pass
else:
self.connect()
data=pd.read_sql(sql,self._conn)
self.con_close()
return data
if dbname !="":
self.dbname = dbname
else:
pass
engine = self.create_engine()
# create_engine("mysql+mysqldb://root:123@localhost:3306/"+self.dbname +"?charset=utf8")
output = io.stringio()
df.to_csv(output, index=none, encoding='utf_8_sig')
output.seek(0)
df1 = pd.read_csv(output)
df1.to_sql(name=table, con=engine, if_exists=style, index=false, index_label=false,chunksize=10000000)
Python 自定義資料庫連線類
建立乙個配置檔案 e mypy mycnf.txt mssql host hzc user kk pwd kk dbname demodb資料庫連線類 e mypy mssql.py coding utf 8 python 3.5 import sys import pymssql import c...
自定義日誌工具類
import android.util.log public class logutil public static void d string tag,string msg public static void i string tag,string msg public static void ...
python自定義類
設計乙個person類的3種方式 1 使用內建型別list person mike 23,male 0 姓名,1 年紀,2 性別 print person 0 person 1 person 2 2 使用字典型別dic person1 person2 print person1 name perso...