DLL をまとめて整理 (python)

データベースのDDL(データ定義言語)ファイルを読み込んで、テーブルとカラムに関する情報を抽出し、それをJSONとExcel形式で出力するプログラムです。データベースの設計情報を整理したり、ドキュメント化したりする際に非常に便利です。

import os
import re
import json
import sys
import importlib.util
from datetime import datetime
from pathlib import Path
import openpyxl
from openpyxl.styles import Font

# 必要モジュールの確認
required_modules = ['openpyxl']
missing_modules = [m for m in required_modules if importlib.util.find_spec(m) is None]
if missing_modules:
    for m in missing_modules:
        print(f"pip install {m}")
    sys.exit(1)

def parse_column_line(line):
    comment = ''
    if '--' in line:
        line, comment = line.split('--', 1)
    elif '//' in line:
        line, comment = line.split('//', 1)
    comment = comment.strip()
    line = line.strip().rstrip(',')
    if not line:
        return None

    match = re.match(r'"?([\w\d_]+)"?\s+([A-Z0-9_]+(?:\([^)]*\))?)\s*(NOT NULL|NULL)?', line, re.IGNORECASE)
    if not match:
        return None

    name, datatype, constraint = match.groups()
    attributes = []
    if constraint and 'NOT NULL' in constraint.upper():
        attributes.append("NOT NULL")
    return {
        "name": name,
        "type": datatype,
        "comment": comment,
        "attributes": attributes
    }

def extract_tables_from_text(text, filename):
    tables = {}
    pattern = re.compile(r'CREATE\s+TABLE\s+("?[\w\d_]+"?)\s*\((.*?)\)\s*;', re.DOTALL | re.IGNORECASE)
    matches = pattern.findall(text)

    table_comment_map = {}
    lines = text.splitlines()
    last_comment = ""
    for i, line in enumerate(lines):
        if line.strip().startswith('------------------------------'):
            if i > 0 and lines[i-1].strip():
                last_comment = lines[i-1].strip()
        if line.strip().upper().startswith("CREATE TABLE"):
            match_name = re.match(r'CREATE\s+TABLE\s+("?[\w\d_]+"?)', line.strip(), re.IGNORECASE)
            if match_name:
                table_comment_map[match_name.group(1).strip('"')] = last_comment
                last_comment = ""

    for match in matches:
        table_name = match[0].strip('"')
        column_block = match[1]
        columns = []
        for col_line in column_block.splitlines():
            col = parse_column_line(col_line)
            if col:
                col["source_file"] = filename
                columns.append(col)
        tables[table_name] = {
            "table_name": table_name,
            "table_comment": table_comment_map.get(table_name, ""),
            "columns": columns
        }
    return tables

def apply_constraints(tables, ddl_text, source_filename):
    index_matches = re.findall(r'CREATE\s+INDEX\s+(\S+)\s+ON\s+("?[\w\d_]+"?)\s*\((.*?)\)', ddl_text, re.IGNORECASE)
    for index_name, table_raw, cols_raw in index_matches:
        table_name = table_raw.strip('"')
        cols = re.findall(r'"?([\w\d_]+)"?', cols_raw)
        for col in cols:
            apply_attribute(tables, table_name, col, f"INDEX:{index_name}", source_filename)

    alter_matches = re.findall(
        r'ALTER\s+TABLE\s+("?[\w\d_]+"?)\s+ADD(?:\s*\(\s*)?(CONSTRAINT\s+(\S+)\s+)?(PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE)\s*\((.*?)\)',
        ddl_text, re.IGNORECASE
    )
    for table_raw, _, constraint_name, constraint_type, cols_raw in alter_matches:
        table_name = table_raw.strip('"')
        cols = re.findall(r'"?([\w\d_]+)"?', cols_raw)
        attr = f"{constraint_type.upper()}"
        if constraint_name:
            attr += f":{constraint_name}"
        for col in cols:
            apply_attribute(tables, table_name, col, attr, source_filename)

def apply_sequence_attributes(tables, ddl_text, source_filename, skipped_sequence_files):
    sequence_matches = re.findall(r'CREATE\s+SEQUENCE\s+(\S+)', ddl_text, re.IGNORECASE)
    for seq in sequence_matches:
        matched = False
        candidate_col = seq.lower().replace('_seq', '')
        for table in tables.values():
            for col in table["columns"]:
                if col["name"].lower() == candidate_col:
                    col["attributes"].append("SEQUENCE")
                    if "source_file" not in col or not col["source_file"]:
                        col["source_file"] = source_filename
                    matched = True
        if not matched:
            skipped_sequence_files.append(f"{source_filename}: CREATE SEQUENCE skipped → {seq} (対象カラムなし)")

def apply_add_column_statements(tables, ddl_text, source_filename, log_file):
    matches = re.findall(
        r'ALTER\s+TABLE\s+("?[\w\d_]+"?)\s+ADD\s*\(\s*(.*?)\s*\)\s*;', ddl_text, re.IGNORECASE | re.DOTALL
    )
    for table_raw, col_block in matches:
        table_name = table_raw.strip('"')
        col_lines = col_block.splitlines()
        for col_line in col_lines:
            parsed = parse_column_line(col_line)
            if parsed:
                parsed["source_file"] = source_filename
                found = False
                for tname, table in tables.items():
                    if tname.lower() == table_name.lower():
                        table["columns"].append(parsed)
                        log(f"➕ {table_name} に {parsed['name']}(型: {parsed['type']})を追加", log_file)
                        found = True
                        break
                if not found:
                    log(f"⚠️ {table_name} が存在しないため {parsed['name']} の追加をスキップ", log_file)

def apply_attribute(tables, table_name, col_name, attribute, source_filename):
    for tname, table in tables.items():
        if tname.lower() == table_name.lower():
            for col in table["columns"]:
                if col["name"] == col_name:
                    if attribute not in col["attributes"]:
                        col["attributes"].append(attribute)
                    if "source_file" not in col or not col["source_file"]:
                        col["source_file"] = source_filename

def save_to_excel(tables, output_path):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = "DDL解析結果"
    headers = ["No.", "テーブル名", "項目", "形式", "説明", "属性", "ブロック", "ファイル名"]
    ws.append(headers)
    
    # ヘッダーのフォントを太字に設定
    for cell in ws[1]:
        cell.font = Font(bold=True)
    
    row_count = 1  # 最初の行はヘッダーなので、データは2行目からスタート
    for table in tables.values():
        for col in table["columns"]:
            row_count += 1
            ws.append([
                row_count - 1,  # No.を1からカウント
                table["table_name"],
                col["name"],
                col["type"],
                col["comment"],
                ' '.join(col["attributes"]),
                table.get("table_comment", ""),
                col.get("source_file", "")
            ])
    
    # セルの幅を自動調整
    for col in ws.columns:
        max_length = 0
        column = col[0].column_letter  # 列のアルファベット(例: "A", "B", "C" など)
        for cell in col:
            try:
                if len(str(cell.value)) > max_length:
                    max_length = len(cell.value)
            except:
                pass
        adjusted_width = (max_length + 2)
        ws.column_dimensions[column].width = adjusted_width

    wb.save(output_path)


def log(message, logfile):
    logfile.write(message + '\n')

def main():
    input_dir = Path(".")
    timestamp = datetime.now().strftime('%Y%m%d%H%M')
    output_dir = input_dir / timestamp
    output_dir.mkdir(exist_ok=True)

    log_path = output_dir / "parse_log.txt"
    with open(log_path, 'w', encoding='utf-8') as log_file:

        all_tables = {}
        failed_files = []
        all_ddl_files = {}
        skipped_sequence_files = []

        for file in input_dir.glob("*"):
            if file.suffix.lower() not in [".txt", ".dll"]:
                continue
            try:
                log(f"📄 処理開始: {file.name}", log_file)
                content = file.read_text(encoding="utf-8")
                all_ddl_files[file.name] = content
                tables = extract_tables_from_text(content, file.name)

                if not tables:
                    log(f"⚠️ テーブル抽出なし: {file.name}", log_file)

                for tname, tdata in tables.items():
                    log(f"  📌 抽出テーブル: {tname}", log_file)
                    for col in tdata["columns"]:
                        attr_text = ', '.join(col['attributes']) if col['attributes'] else 'なし'
                        log(f"     └─ カラム: {col['name']}(型: {col['type']}、属性: {attr_text})", log_file)
                    if tname not in all_tables:
                        all_tables[tname] = tdata
            except Exception as e:
                log(f"❌ {file.name} failed: {e}", log_file)
                failed_files.append(file.name)

        for fname, ddl_text in all_ddl_files.items():
            log(f"🔧 属性付与: {fname}", log_file)
            apply_constraints(all_tables, ddl_text, fname)
            apply_sequence_attributes(all_tables, ddl_text, fname, skipped_sequence_files)
            apply_add_column_statements(all_tables, ddl_text, fname, log_file)

        result_json = output_dir / "parsed_result.json"
        with open(result_json, 'w', encoding='utf-8') as f:
            json.dump(list(all_tables.values()), f, ensure_ascii=False, indent=2)

        result_excel = output_dir / "parsed_result.xlsx"
        save_to_excel(all_tables, result_excel)

        if failed_files or skipped_sequence_files:
            fail_path = output_dir / "parse_failed.txt"
            with open(fail_path, 'w', encoding='utf-8') as f:
                for name in failed_files:
                    f.write(name + "\n")
                for line in skipped_sequence_files:
                    f.write(line + "\n")

        log(f"✅ JSON出力: {result_json}", log_file)
        log(f"✅ Excel出力: {result_excel}", log_file)
        log(f"📊 処理済みファイル数: {len(all_ddl_files)} 件", log_file)
        if failed_files or skipped_sequence_files:
            log(f"⚠️ パース失敗・スキップ → parse_failed.txt", log_file)

if __name__ == "__main__":
    main()

1. 必要なモジュールの確認

プログラムが実行される前に、必要なPythonモジュール(openpyxl)がインストールされているか確認します。もしインストールされていなければ、pip install コマンドを表示して終了します。これにより、必要な環境が整っていることを確認できます。

2. カラム行の解析

DDLファイル内のテーブル定義から、各カラムの名前やデータ型、制約(例えば、NOT NULL)を抽出します。また、コメントも取り出し、カラムの説明として利用できるようにします。この処理によって、テーブル定義を正確に把握できます。

3. テーブル定義の抽出

DDLファイルからCREATE TABLE文を探し出し、そのテーブルに含まれるカラム情報やテーブルのコメントを抽出します。コメントはテーブル全体の説明として後で出力できます。

4. 制約情報の適用

CREATE INDEXALTER TABLE文を解析して、インデックスや制約(主キーや外部キーなど)がどのテーブル、カラムに適用されるのかを反映します。これにより、テーブルの詳細な制約情報も手に入ります。

5. シーケンス属性の追加

CREATE SEQUENCE文があれば、それに関連するカラムに「SEQUENCE」属性を追加します。シーケンスとは、通常、IDの自動インクリメントなどで使用されるため、これを適切に処理します。

6. カラム追加文の処理

ALTER TABLE ... ADD COLUMN文があれば、既存のテーブルに新しいカラムが追加されたことを反映します。これにより、DDLファイル内で定義された変更が正確に出力されます。

7. 結果のExcelファイル出力

解析したテーブルとカラムの情報をExcel形式で保存します。Excelでは、テーブル名、カラム名、型、コメント、属性などが表形式で表示され、整理されたデータを簡単に確認できます。

8. ログの記録

処理中の進行状況やエラーメッセージはログファイルに記録されます。例えば、ファイルの読み込みが失敗した場合や、シーケンスの適用対象となるカラムが見つからなかった場合などです。これにより、後から処理の状態を確認できます。

9. メインの処理の流れ

メイン関数では、指定したディレクトリ内のDDLファイルを一つずつ処理します。ファイルを読み込み、テーブル情報を抽出し、必要な属性を追加して、最終的に結果をJSONとExcelファイルとして保存します。失敗したファイルやシーケンス情報が適用されなかったカラムも、別ファイルにリストとして出力されます。

出力結果

  • JSONファイル: 各テーブルのカラム情報や制約を含むJSONファイルとして保存されます。
  • Excelファイル: テーブルの詳細な情報をExcel形式で保存します。カラム情報、データ型、属性などが表に整理されており、視覚的に分かりやすくなっています。
  • ログファイル: プログラムの実行状況やエラー、警告が記録されたログファイルが出力されます。

このプログラムは、DDLファイルを効率的に解析して、データベースの設計情報を整理したい時に非常に役立ちます。特に、データベースの設計が複雑な場合や、たくさんのテーブルを持つシステムでは、このツールを使うことで作業が大幅に効率化できます。