匯入需要的python包
import pymysql
import traceback
import pandas as pd
from sqlalchemy import create_engine
import os
import numpy as np
將sql查詢結果匯出檔案
def mysql_to_file(path_file,sql, host, user, password, db_name, port=3306):
"""將sql查詢結果匯出檔案
param path:檔案儲存路徑+檔名+檔案字尾
param sql:sql查詢語句
param host:mysql連線ip
param user:使用者名稱
param password:密碼
param db_name:需要使用的資料庫
param port:埠,預設3306
return df:查詢結果,txt或csv格式
"""# 開啟資料庫連線
conn = pymysql.connect(host=host, port=port, user=user,password=password,db=db_name)
try:
results = pd.read_sql(sql, con=conn)
print(results.head(10))
conn.close()
results.to_csv(path_file, sep='\t', index=false)
return results
except:
print("*****錯誤,請檢查sql*******")
# 列印錯誤資訊
traceback.print_exc()
# 如果發生錯誤則回滾
conn.rollback()
將sql查詢結果轉成dataframe
def mysql_to_df(sql, host, user, password, db_name, port=3306):
"""將sql查詢結果轉成dataframe
param sql:sql查詢語句
param host:mysql連線ip
param user:使用者名稱
param password:密碼
param db_name:需要使用的資料庫
param port:埠,預設3306
return df:查詢結果,dateframe格式
"""# 開啟資料庫連線
conn = pymysql.connect(host=host, port=port, user=user,password=password,db=db_name)
try:
results = pd.read_sql(sql, con=conn)
print(results.head(10))
conn.close()
return results
except:
print("*****錯誤,請檢查sql*******")
# 列印錯誤資訊
traceback.print_exc()
# 如果發生錯誤則回滾
conn.rollback()
檔案寫入mysql
def file_to_mysql(file_name, tab_name, host, user, password, db_name, port=3306):
"""檔案寫入mysql
param file_name:寫入mysql的檔名
param tab_name:寫入資料的表
param host:mysql連線ip
param user:使用者名稱
param password:密碼
param db_name:需要使用的資料庫
param port:埠,預設3306
return :none
"""# 需要確定執行環境上有pymysql
sql_connect = create_engine(r'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user,password,host,port,db_name))
# 讀取檔案
df = pd.read_csv(file_name, sep='\t', dtype=np.str)
# 寫入dataframe
print("*****檔案開始寫入.*****".format(file_name, db_name, tab_name))
print("*****寫入完成*****")
dataframe寫入mysql
def df_to_mysql(df, tab_name, host, user, password, db_name, port=3306):
"""dataframe寫入mysql
param df:需要寫入mysql的dataframe
param tab_name:寫入資料的表
param host:mysql連線ip
param user:使用者名稱
param password:密碼
param db_name:需要使用的資料庫
param port:埠,預設3306
return :none
"""# 需要確定執行環境上有pymysql
sql_connect = create_engine(r'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user,password,host,port,db_name))
# 寫入dataframe
print("*****dataframe開始寫入.*****".format(df, db_name, tab_name))
print("*****寫入完成*****")
執行樣例
if __name__=="__main__":
rootpath = 工作路徑
os.chdir(rootpath)
host = ip
port = oprt
user = user
password = password
db_name = db_name
file_name = file_name
tab_name = tab_name tab_name
# 檔案寫入mysql
#file_to_mysql(file_name, tab_name, host, user, password, db_name)
# 查詢
sql = """select * from tab_name;"""
df1 = mysql_to_df(sql, host, user, password, db_name)
#df2 = mysql_to_file(rootpath+'test_1.txt', sql, host, user, password, db_name)
Python實現 Kafka批量匯入匯出
kafka connect非常強大,但是也有侷限性,不能個性化的定製,如果需要參考我的另外乙個部落格部落格位址 python實現起來其實也很簡單,就是利用消費者匯出,生產者匯入。而且我效率也很不錯 下面是乙個從某個topic某個分割槽讀資料,然後寫到另外乙個topic的完整 usr bin env ...
使用T SQL實現資料匯出 匯入
今天嘗試使用transact sql進行資料的匯出匯入,收穫頗豐。與使用dts相比,效率要高很多!一 開啟opendatasource功能 開始 所有程式 microsoft sql server 2005 配置工具 sql server外圍應用配置器 功能的外圍應用配置器 例項名 database...
匯入匯出資料
從檔案中裝載資料 hive load data local inpath overwrite into table t2 partition province beijing local linux本地的檔案。無local 是hdfs的檔案 注意 從本地檔案系統中將資料匯入到hive表的過程中,其實...