【コピペでOK】9割の機能を網羅!PytonからSQLServerを扱うクラスを作ってみました。

当ページのリンクには広告が含まれています。

こちらの記事では、Python から SQLServerに接続するための方法について解説しました。

今回は、以前 SQLite 向けに作成した、データの抽出や更新が簡単に出来る Python用のクラスを SQLServer向けに修正しましたので、ソースコードの公開と解説をしたいと思います。

コピー&ペーストで簡単に使えますので、用途に合わせて追加&修正の上、ご利用下さい。

単にSQLServerへの接続や切断だけではなく、テーブルの存在チェック、テーブル作成、トランザクション付きの複数SQLの実行、テーブル一覧やカラム一覧の取得機能も盛り込んでいます。

目次

クラスの使い方

今回は SQLServerConnect というクラス名にしています。

使う際には、下記の通り pyodbcのパッケージをインポートしておいて下さい。

import pyodbc

環境によっては pyodbc ドライバをインストールする必要がありますので、もしうまく動かない場合は、こちらの記事も合わせてご覧下さい。

リファレンス(メソッドの名前と仕様)

今回のクラスに実装されているメソッドの一覧と仕様は以下の通りです。

機能メソッド名と引数戻り値補足
コンストラクタ SQLServerConnect(
host,
dbname,
user,
password,
port=1433
インスタンス接続先ホスト名、データベース名、ユーザー名、パスワード、ポートを指定する。
任意のSQLを実行execute(sql)なし任意のSQLを1つだけ実行
複数のSQLをまとめて実行execute_all(sqls)なしリストに格納された複数のSQLをトランザクション付きで実行。
エラーが発生するとロールバックされる。
問い合わせSQLの実行execute_query(sql)リストselect等の問い合わせSQLを実行し、全ての結果をリストで返す。
例: [ (aa , 123 , True) , (bb , 567 , False) ]
1つの値を返すSQLの実行execute_scalor(sql)単一の値1個の値を返す問い合わせSQLを実行し、結果を取得
テーブルの作成create(
tablename,
columns,
primarykey = '',
isdrop=False
)
なしテーブル名、カラム名、主キーを指定してテーブルを作成する。
既にテーブルがあればエラーとなるが、
isdrop=Trueにすることで、既にテーブルが存在していれば削除を実行する。
テーブルの削除drop(tablename)なしテーブルが存在していれば削除する。
テーブルの存在チェックexists(tablename)True/Falseテーブルが存在していれば True、存在しなければ Falseを返す。
テーブル名の変更rename
(
old_tablename,
new_tablename
)
なしold_tablename を new_table_nameに変更する
カラムの追加add_column
(
tablename,
columns
)
なしtablenameで指定したテーブルにcolumnsで指定した列を追加する。
columnsはカラム名をリスト形式で指定する。
カラム名の後に型を指定することも可能。
例:[ 'col1 varchar2(10)' , 'col2 datetime' ]
Create文の取得get_create_statment
(
tablename
)
strtablenameで指定したテーブルのCreate文を文字列で返す。
テーブル一覧の取得get_table_list
(
table_type=’’
)
リストデータベースに登録されているテーブル及びビューの名前をリストで返す。
例:[ 'table1' , 'table2' , 'table3' ]
table_type に 'view' を指定するとビューのみ、'table'を指定するとテーブルのみの一覧が取得できる。
カラムの名前と型の一覧を取得get_column_type(
tablename
)
リストtablenameで指定したテーブルにおいて、カラム名と型をタプルに格納し、リストで返す。
例:[ ('col1' , 'text') , ('col2' , 'real') ]
カラムの名前の一覧を取得get_column_list(
tablename
)
リストtablenameで指定したテーブルにおいて、カラム名をリストで返す。
例:[ 'col1' , 'col2' , 'col3' ]

簡単な使い方のサンプル

test といテーブルを作成し、データの追加、検索、カラム追加など、一通りの処理を実行するサンプルです。

'host name' , 'db name' , 'user name' , 'password' は、ご自身の環境に合わせて修正をお願いします。

#DBへの接続
db = SQLServerConnect('host name','db name','user name','password')

#指定したテーブルが存在すれば、テーブルを削除
db.drop('test2')

#テーブルの新規作成
db.create('test','name nvarchar(10),num numeric(5,1)','name,num',isdrop=True)

#Insert文の作成
sqls = []
for n in range(10):
    sqls.append("insert into test values('data{0}',{0})".format(n))

#1つのトランザクション内で複数SQLの実行
db.execute_all(sqls)

#指定テーブルのデータを取得
data = db.execute_query('select * from test')
print(data)

#指定テーブルへの列追加
db.add_column('test',['add1 nvarchar(10)','add2 numeric(10,3)','add3 timestamp'])

#テーブルの存在チェック
print(db.exists('test'))
print(db.exists('test9'))

#テーブルとViewの一覧の取得
print(db.get_table_list())

#View の一覧を取得
print(db.get_table_list('view'))

#テーブルの一覧を取得
print(db.get_table_list('table'))

#指定したテーブルのカラム名の一覧を取得
print(db.get_column_list('test'))

#指定したテーブルのカラム名とデータ型の取得
print(db.get_column_type('test'))

#スカラ(結果が1つだけのSelect文)の実行
print(db.execute_scalor("select sum(num) as num from test"))

#データベース名の変更
db.rename('test','test2')

クラスのソースコード

以下が今回のクラスのソースコードです。

そのままコピー&ペーストで張り付けて利用できます。

あとは、自由に変更してお使いください。

一応、Pythonで推奨される命名規則とコメントの書き方を使っています。

import pyodbc

class SQLServerConnect:
    '''
    SQLServerのヘルパークラス
    '''
    GET_TABLE_LIST_QUERY = "SELECT * FROM (SELECT name,CASE type WHEN 'U' THEN 'table' ELSE 'view' END AS type FROM sys.objects WHERE type IN ('U','V')) t WHERE name LIKE '{0}' AND type LIKE '{1}'"
    GET_COLUMN_LIST_QUERY = "SELECT t.name AS TABLENAME,c.name AS COLUMNNAME,type_name(c.user_type_id) AS TYPE FROM sys.objects t INNER JOIN sys.columns c ON c.object_id=t.object_id WHERE t.name LIKE '{0}' ORDER BY c.column_id"
    GET_ALTER_TABLE_QUERY = "ALTER TABLE {0} ADD {1}"
    GET_RENAME_TABLE_QUERY = "EXEC sp_rename '{0}','{1}';"
    
    def __init__(self,host,dbname,user,password,port=1433):
        '''
        DBの接続情報を保持する
        Parameters
        ----------
        host : str
           ホスト名
        dbname : str
            DB名
        user : str
            ユーザー名
        password : str
            パスワード
        port : integer
            ポート
        '''
        self.host = host
        self.dbname = dbname
        self.user = user
        self.password = password
        self.port = port
     
    def __connect(self):
        return pyodbc.connect("DRIVER={{SQL Server}};SERVER={0};PORT={1};DATABASE={2};UID={3};PWD={4}".format(self.host,self.port,self.dbname,self.user,self.password))
    
    def execute(self,sql):
        '''
        SQLを実行し、結果を取得する
        Parameters
        ----------
        columns : str
            実行したいSQL
        '''
        conn = self.__connect()
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur.close()
        conn.close()
     
    def execute_all(self,sqls):
        '''
        複数のSQLをトランザクション配下で実行する
        Parameters
        ----------
        columns : strs
            実行したいSQLのリスト
        '''
        conn = self.__connect()
        cur = conn.cursor()
        try:
            for sql in sqls:
                cur.execute(sql)
            conn.commit()
        except pyodbc.Error as e:
            conn.rollback()
        cur.close()
        conn.close()
    
    def execute_query(self,sql):
        '''
        select 系のSQLを実行し、結果を全て取得する
        Parameters
        ----------
        columns : str
            実行したいSQL
        Returns
        ----------
        data: list
            1行分をタプルとし、複数行をリストとして返す
            <例> [('RX100','Sony',35000),('RX200','Sony',42000)]
        '''
        conn = self.__connect()
        cur = conn.cursor()
        cur.execute(sql)
        res = cur.fetchall()
        cur.close()
        conn.close()
        return res
    
    def execute_scalor(self,sql):
        '''
        結果の値が1つしかないSQLを実行し、結果を取得する
        Parameters
        ----------
        columns : str
            実行したいSQL
        Returns
        ----------
        res:
            実行結果により返された値
        '''
        conn = self.__connect()
        cur = conn.cursor()
        cur.execute(sql)
        res = cur.fetchone()
        cur.close()
        conn.close()
        return res[0] if res != None else None
    
    def create(self,tablename,columns,primarykey = '',isdrop=False):
        '''
        テーブルを作成する
        Parameters
        ----------
        columns : str
            「列名」又は「列名+型」をカンマ区切りで指定
            <例> 'product text,price int,maker,year'
        primarykey: str
            プライマリーキーをカンマ区切りで指定
            <例> 'product,year'
        '''
        if isdrop :
            self.drop(tablename)
        pkey = ',primary key({0})'.format(primarykey) if primarykey != '' else ''
        sql = 'create table {0}({1} {2})'.format(tablename,columns,pkey)
        self.execute(sql)
    
    def drop(self,tablename):
        '''
        指定されたテーブルが存在すれば削除、無ければ何もしない
        Parameters
        ----------
        tablename : str
            削除したいテーブル名
        '''
        res = self.execute_query(self.GET_TABLE_LIST_QUERY.format(tablename,'%%'))
        if res != []:
            self.execute("drop {0} {1}".format(res[0][1],tablename))
  
    def exists(self,tablename):
        '''
        指定したテーブル、又はビューの有無を判定する
        Parameters
        ----------
        columns : str
            実行したいSQL
        Returns
        ----------
            テーブル又はビューが存在すればTrue 存在しなければ False
        '''
        res = self.execute_scalor(self.GET_TABLE_LIST_QUERY.format(tablename,'%%'))
        return False if res == None else True
    
    def rename(self,old_tablename,new_tablename):
        '''
        テーブル名を変更する
        Parameters
        ----------
        old_tablename : str
            変更前のテーブル名
        new_tablename : str
            変更後のテーブル名
        '''
        self.execute(self.GET_RENAME_TABLE_QUERY.format(old_tablename,new_tablename))
    
    def add_column(self,tablename,columns):
        '''
        テーブル名を変更する
        Parameters
        ----------
        tablename : str
            テーブル名
        columns : str
            「列名」又は「列名+型」をリスト形式で指定
            <例> ['product text','price numeric(5,2)','maker integer']
        '''
        for column in columns:
            self.execute(self.GET_ALTER_TABLE_QUERY.format(tablename,column))
    
    def get_table_list(self,table_type=""):
        '''
        登録されているテーブルの一覧を取得する
        Parameters
        ----------
        table_type : str
            'table' => テーブルのみ、'view' => ビューのみ、'' => テーブルとビューの両方
        Returns
        ----------
        res:str
            リスト形式のテーブル名一覧
            <例>  ['talbe1','table2','table3']
        '''
        res = self.execute_query(self.GET_TABLE_LIST_QUERY.format('%%','%' + table_type + '%'))
        return [name[0] for name in res]
    
    def get_column_type(self,tablename):
        '''
        指定したテーブルのカラムと型を一覧で取得する
        Parameters
        ----------
        tablename : str
            テーブル名
        Returns
        ----------
        res:str
            リスト形式でカラム名と型のタプルを返す
            <例>  [('column1','int'),('column2','text'),('column3',real)]
        '''
        res = self.execute_query(self.GET_COLUMN_LIST_QUERY.format(tablename))
        return [(name[1],name[2]) for name in res]
    
    def get_column_list(self,tablename):
        '''
        指定したテーブルのカラム名を一覧で取得する
        Parameters
        ----------
        tablename : str
            テーブル名
        Returns
        ----------
        res:str
            リスト形式のカラム名一覧
            <例>  ['column1','column2','column3']
        '''
        res = self.execute_query(self.GET_COLUMN_LIST_QUERY.format(tablename))
        return [name[1] for name in res]

補足説明

この章では、本クラスについての改良ポイント、課題について解説します。

もし再利用されるのであれば、この点についてご留意ください。

データベース固有のSQL定義

テーブルやカラムの一覧を取得するSQL はデータベースごとに異なる可能性があるため、クラスの先頭に定数(実際にはクラス変数)として定義しています。

定数名内容
GET_TABLE_LIST_QUERYテーブルとViewの一覧を取得するためのSQLを記述
GET_COLUMN_LIST_QUERYカラムの一覧を取得するためのSQLを記述
GET_ALTER_TABLE_QUERYカラムの追加を行うためのSQLを記述
GET_RENAME_TABLE_QUERYテーブル名の変更を行うためのSQLを記述

テーブル一覧を取得するSQL

SQLServer の場合、テーブルとビューは sys.objects というテーブルで管理されており、type の値でテーブル('U')かビュー('V')が判断できます。

SELECT * FROM 
(
    SELECT name,CASE type WHEN 'U' THEN 'table' ELSE 'view' END AS type 
    FROM sys.objects
    WHERE type IN ('U','V')
) t
WHERE name LIKE '{0}' AND type LIKE '{1}'

カラム一覧を取得するSQL

SQLServer の場合、 カラム一覧は sys.columns で管理されていますが、ここにはテーブル名がありません。

その代わり、object_id があるので、これを sys.objects と結合しています。

SELECT t.name AS TABLENAME,c.name AS COLUMNNAME,type_name(c.user_type_id) AS TYPE
FROM sys.objects t
INNER JOIN sys.columns c ON c.object_id=t.object_id
WHERE t.name LIKE '{0}'
ORDER BY c.column_id

カラムを追加するSQL

SQLServer の場合、 カラム名は ALTER TABLE テーブル名 ADD カラム名 データ型 で追加可能です。

ALTER TABLE {0} ADD {1}

テーブル名を変更するSQL

SQLServer の場合、ALTER TABLE 文が用意されておらず、代わりに EXEC sp_rename 旧テーブル名,新テーブル名 で変更します。

EXEC sp_rename '{0}','{1}';

まとめ

今回は Python で SQLite を扱うヘルパークラスを、 SQLServer 用に移植してみました。

テーブル一覧の取得やカラム一覧の取得、create文の取得など、たまに使いたくなる機能も実装しています。

このクラスがあれば、おおよその事が出来ると思いますので、後は皆さんの利用方法に合わせて拡張して下さい。

この記事が皆さんのお役に立てれば幸いです。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次