重新梳理了一下mysql方法
import logging
import pymysql
class MYSQL_DB(object):
def __init__(self, mysql_da_data):
self.cursor = None
self.con = None
self.mysql_da_data = mysql_da_data
@staticmethod
def connect(database):
try:
con = pymysql.connect(**database)
except Exception as e:
print(e)
return False
return con
def _select(self,perform_sql):
self.cursor.execute(perform_sql)
res=self.sql_fetch_json()
return res
def _update_or_insert(self,perform_sql):
self.cursor.execute(perform_sql)
self.con.commit()
self.closed()
def _cursor(self):
con = self.connect(self.mysql_da_data)
cursor = con.cursor()
if cursor:
return con, cursor
raise ConnectionError
@staticmethod
def to_json(result):
_list = []
for l in result:
_list.append(list(l))
return _list
def sql_fetch_json(self):
keys = []
for column in self.cursor.description:
keys.append(column[0])
key_number = len(keys)
json_data = []
for row in self.cursor.fetchall():
item = dict()
for q in range(key_number):
item[keys[q]] = row[q]
json_data.append(item)
print(json_data)
self.closed()
return json_data
def closed(self):
print("sql 执行完毕")
self.cursor.close()
def execute(self, perform_sql):
print('执行sql:%s' % perform_sql)
logging.info('执行sql:%s' % perform_sql)
self.con, self.cursor = self._cursor()
way=perform_sql[:6].lower()
if way == 'select':
self._select(perform_sql)
else:
self._update_or_insert(perform_sql)
if __name__ == '__main__':
MYSQL_DB_DATA1 = {
"host": "localhost",
"user": "root",
"password": "123456",
"db": "django1",
"port": 3306
}
sql = '''select * from test
'''
db = MYSQL_DB(MYSQL_DB_DATA1)
db.execute(sql)