Python通过ssh代理连接MySQL
Admin
2022-08-01 17:44:19
Python
#!/usr/bin/env python # -*- coding:utf-8 -*- import sshtunnel import pymysql db_host = "127.0.0.1" db_user = "root" db_password = "123123" db_port = 3306 db_database = "test" ssh_keyFile = "/Users/test/.ssh/id_rsa" class NewClient(object): def __init__(self, host, username, password, port, database, ssh_forward=None): self.host = host self.username = username self.password = password self.port = port self.database = database self.ssh_forward = ssh_forward self.conn = self.connect() def connect(self): if self.ssh_forward is not None: forward = sshtunnel.SSHTunnelForwarder( ssh_address_or_host=(self.ssh_forward["host"], self.ssh_forward["port"]), ssh_username=self.ssh_forward["user"], ssh_pkey=self.ssh_forward["keyFile"], remote_bind_address=(self.host, self.port)) forward.start() conn = pymysql.connect( host="127.0.0.1", user=self.username, passwd=self.password, port=forward.local_bind_port, db=self.database, charset="utf8", connect_timeout=31536000, max_allowed_packet=36777216 ) else: conn = pymysql.connect( host=self.host, user=self.username, passwd=self.password, port=self.port, db=self.database, charset="utf8") return conn def query(self, sql): cursor = self.conn.cursor() execute = cursor.execute(sql) for line in range(execute): result = cursor.fetchone() yield result cursor.close() def query_one(self, sql): cursor = self.conn.cursor() cursor.execute(sql) result = cursor.fetchone() cursor.close() return result def update_one(self, sql): cursor = self.conn.cursor() try: cursor.execute(sql) self.conn.commit() print(sql, "ok") except Exception as e: print(sql, "no", e) self.conn.rollback() cursor.close() def main(): client = NewClient( host=db_host, username=db_user, password=db_password, port=db_port, database=db_database, ssh_forward={"host": "proxy.ywfuns.com", "port": 22, "keyFile": ssh_keyFile, "user": "proxy"} ) # 遍历 sql = "select id, name from asset;" for line in client.query(sql): print(line) client.conn.close() if __name__ == "__main__": main()