Backlogの課題とwikiとファイルをPythonでええ感じに出力(エクスポート)する方法

2023.07.25

こんちには。

データアナリティクス事業本部 インテグレーション部 機械学習チームの中村です。

今回はBacklogの課題とwikiとファイルをPythonでええ感じに出力(エクスポート)する方法を紹介します。

なお、本記事と似たようなことは過去に以下の記事で投稿していますが、今回はwikiやファイルを含めて取り扱いました。

本記事の方法でできること・できないこと

本記事の方法では以下ができます

  • 課題
    • 課題の記載内容とコメント(テキストファイルとして、コメントには日時・作成者を包含)の出力
    • 課題に添付されたファイルの出力
    • 添付ファイルもダウンロード
  • wiki
    • wikiの記載内容の出力
    • wikiの階層構造はディレクトリ構造として維持
    • wikiの添付ファイルもダウンロード
  • ファイル
    • ファイルにアップロードされているもの出力
    • ディレクトリ階層構造は維持
  • その他
    • プロジェクトIDはプロジェクト名から取得
    • Backlogはテキスト記載フォーマットにmarkdownとbacklogの二つ形式があるが自動で認識して拡張子を変更
    • Backlog APIへのリトライ処理

以下の対応はできておりませんので、あくまでサンプルとしてお使いください。

  • Backlog APIへのリクエスト時のエラーハンドリング
  • テストコードの実装

出力形式

エクスポート結果は例えば以下のような形式で出力されます。

output/{プロジェクト名}
├ issues/
│  ├ {プロジェクト名}-001-{チケットのタイトル}/
│  │  ├ body.backlog
│  │  └ attachment/
│  │      ├ hoge.png
│  │      └ fuga.csv
│  └ {プロジェクト名}-002-{チケットのタイトル}/
│      ├ body.backlog
│      └ attachment/
│          ├ hogehoge.png
│          └ fugafuga.csv
├ wikis/
│  └ Home/
│      ├ body.backlog
│      ├ 子ページ1/
│      │  ├ body.backlog
│      │  └ attachment/
│      │      ├ foo.png
│      │      └ foo.csv
│      └ 子ページ2/
│          ├ body.backlog
│          └ attachment/
│              ├ bar.png
│              └ bar.csv
└ files/
    ├ dir1/
    │  ├ file1.png
    │  └ file1.csv
    ├ file2.png
    └ file2.csv

使い方

準備

Pythonのバージョンやライブラリのバージョンは以下です。必要に応じてpipなどでインストールしてください。

(私はpipenv環境でやっていますが環境に合わせてお好みで実施ください)

  • Python : 3.10.2
  • requests : 2.31.0
  • python-dotenv : 1.0.0

あらかじめ以下の環境変数を.envに記述しておきます。

(環境変数の設定方法もお好みでOKです)

BACKLOG_API_KEY={APIキー}
BACKLOG_BASE_URL={ベースURL}

BACKLOG_BASE_URLはBacklogのページのFQDNを設定します。

  • 例 : www.example.co.jp

BACKLOG_API_KEYキーは、ページ右上から「個人設定」を開き、

「API」から新しいAPIキーを発行するか、既存のAPIキーをコピーしてください。

プロジェクト名は、課題番号に必ず付くプレフィックスと同じです。

コード構成

コードは以下のようになっています。

├ main.py
├ backlog_exporter.py
└ backlog_api_handler.py

main.py

main.pyは実際にコンソールから実行するプログラムで、名前は何でもOKで必要に応じてカスタマイズします。

例えば以下のように使用します。

import os

from dotenv import load_dotenv
load_dotenv(verbose=True)

from backlog_api_handler import BacklogApiHandler
from backlog_exporter import BacklogExporter

def main(output_dir="./output"):

    backlog_api_handler = BacklogApiHandler(
        api_key=os.getenv("BACKLOG_API_KEY"),
        base_url=os.getenv("BACKLOG_BASE_URL"),
    )

    backlog_exporter = BacklogExporter(
        backlog_api_handler,
    )

    backlog_exporter.export_issues(project_name='<プロジェクト名>', output_dir=output_dir)
    backlog_exporter.export_wikis(project_name='<プロジェクト名>', output_dir=output_dir)
    backlog_exporter.export_files(project_name='<プロジェクト名>', output_dir=output_dir)

    return

if __name__ == "__main__":
    main()

コード内のプロジェクト名は取得したいプロジェクトによって変更します。

課題番号に付与されるキーと同じものを指定してください。

backlog_exporter.py

次にbacklog_exporter.pyの中身は以下のようになっています。

import pathlib
import re

from backlog_api_handler import BacklogApiHandler

COMMENT_TEMPLATE = \
"""
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
comment created: {created}
author: {createdUserName}<{createdUserMailAddress}>
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

{content}

"""

def clean_text_of_name(target):
    cleaned = re.sub(r'[\\|/|:|?|"|<|>|\|\*]', '-', target)
    cleaned = cleaned.strip(" ")
    return cleaned

def clean_text_keep_tree(target):
    cleaned = re.sub(r'[\\|:|?|"|<|>|\||\*]', '-', target)
    cleaned = cleaned.strip(" ")
    return cleaned

def comments_to_text(comments):

    text = "\n\n"
    for comment in comments:
        comment_data = {
            "content": comment['content'],
            "createdUserName": comment['createdUser']['name'],
            "createdUserMailAddress": comment['createdUser']['mailAddress'],
            "created": comment['created'],
        }
        if comment_data["content"] is None:
            continue
        text = text + COMMENT_TEMPLATE.format(**comment_data)

    return text

class BacklogExporter:

    def __init__(self, backlog_api_handler: BacklogApiHandler):
        self.backlog_api_handler = backlog_api_handler

    def export_issues(self, project_name, output_dir):

        project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)

        formatting_rule = self.backlog_api_handler.get_formatting_rule_by_project_name(project_name)

        issues_of_all = self.backlog_api_handler.get_issues_by_project_id(project_id)

        for issue in issues_of_all:
            self.__export_issue(issue, project_name, output_dir, formatting_rule)

    def export_wikis(self, project_name, output_dir):

        project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)

        formatting_rule = self.backlog_api_handler.get_formatting_rule_by_project_name(project_name)

        wikis_of_all = self.backlog_api_handler.get_wikis_by_project_id(project_id)

        for wiki in wikis_of_all:
            self.__export_wiki(wiki, project_name, output_dir, formatting_rule)

    def export_files(self, project_name, output_dir):

        project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)

        files = self.backlog_api_handler.get_files_by_project_id(project_id)

        for file in files:
            self.__export_file(project_name, project_id, file, pathlib.Path(output_dir))

    def __export_issue(self, issue, project_name, output_dir, formatting_rule):

        issue_id = issue['id']
        issue_number = issue['keyId']
        issue_title = clean_text_of_name(issue['summary'])
        issue_body = issue['description']

        output_path = pathlib.Path(output_dir).joinpath(
            project_name,
            "issues",
            f"{project_name}-{issue_number:03d}-{issue_title}",
            "body.md" if formatting_rule == "markdown" else "body.backlog"
        )
        output_path.parent.mkdir(parents=True, exist_ok=True)

        print(output_path)
        with open(output_path, "wt", encoding='utf-8') as f:
            f.writelines(issue_body)

            comments = self.backlog_api_handler.get_comments_by_issue_id(issue_id)
            text_of_comments = comments_to_text(comments)
            f.writelines(text_of_comments)

        attachments = self.backlog_api_handler.get_attachments_by_issue_id(issue_id)

        for attachment in attachments:
            self.__export_attachment_of_issue(issue_id, attachment, output_path.parent)

    def __export_attachment_of_issue(self, issue_id, attachment, output_base_path: pathlib.Path):

        attachment_id = attachment['id']
        attachment_name = attachment['name']
        attachment_name = clean_text_of_name(attachment_name)

        output_path = output_base_path.joinpath("attachment", f"{attachment_name}")
        output_path.parent.mkdir(parents=True, exist_ok=True)

        attachment_body = self.backlog_api_handler.get_attachment_body_of_issue(issue_id, attachment_id)

        print(output_path)
        with open(output_path, "wb") as f:
            f.write(attachment_body)

    def __export_wiki(self, wiki, project_name, output_dir, formatting_rule):
        wiki_id = wiki["id"]
        wiki_name_tree = clean_text_keep_tree(wiki["name"])

        output_path = pathlib.Path(output_dir).joinpath(
            project_name, "wikis", f"{wiki_name_tree}",
            "body.md" if formatting_rule == "markdown" else "body.backlog"
        )
        output_path.parent.mkdir(parents=True, exist_ok=True)

        wiki_body = self.backlog_api_handler.get_wiki_body_by_wiki_id(wiki_id)

        print(output_path)
        with open(output_path, "wt", encoding='utf-8') as f:
            f.writelines(wiki_body)

        attachments = wiki["attachments"]
        for attachment in attachments:
            self.__export_attachment_of_wiki(wiki_id, attachment, output_path.parent)

    def __export_attachment_of_wiki(self, wiki_id, attachment, output_base_path: pathlib.Path):

        attachment_id = attachment['id']
        attachment_name = attachment['name']
        attachment_name = clean_text_of_name(attachment_name)

        output_path = output_base_path.joinpath("attachment", f"{attachment_name}")
        output_path.parent.mkdir(parents=True, exist_ok=True)

        attachment_body = self.backlog_api_handler.get_attachment_body_of_wiki(wiki_id, attachment_id)

        print(output_path)
        with open(output_path, "wb") as f:
            f.write(attachment_body)

    def __export_file(self, project_name, project_id, file, output_base_path: pathlib.Path):

        file_id = file['id']
        file_name = clean_text_of_name(file['name'])
        file_dir = clean_text_keep_tree(file['dir'])

        output_path = output_base_path.joinpath(project_name, f"files{file_dir}{file_name}")
        output_path.parent.mkdir(parents=True, exist_ok=True)

        file_body = self.backlog_api_handler.get_file_body(project_id, file_id)

        print(output_path)
        with open(output_path, "wb") as f:
            f.write(file_body)

BacklogExporterの実装がメインで、エクスポートする処理を担っています。

main.pyからはこのBacklogExporterの処理を呼び出します。

BacklogExporterはおおきな工夫点はないのですが、前回記事と同様にファイル名に使えない文字をすべて-に置き換えています。

またツリーを維持したい場合は、/を置き換えると困るので、以下のようにcleanする関数を2つ準備しています。

def clean_text_of_name(target):
    cleaned = re.sub(r'[\\|/|:|?|"|<|>|\|\*]', '-', target)
    cleaned = cleaned.strip(" ")
    return cleaned

def clean_text_keep_tree(target):
    cleaned = re.sub(r'[\\|:|?|"|<|>|\||\*]', '-', target)
    cleaned = cleaned.strip(" ")
    return cleaned

backlog_api_handler.py

backlog_api_handler.pyの中身は以下のようになっています。

import json
import requests
from requests import Session

from urllib3.util import Retry
from requests import Session
from requests.adapters import HTTPAdapter

GET_ISSUES_PAGE_SIZE = 100
GET_COMMENTS_PAGE_SIZE = 100
GET_FILES_PAGE_SIZE = 1000

API_REQUEST_RETRY_COUNT = 3
API_REQUEST_RETRY_BACKOFF_FACTOR = 60
API_REQUEST_RETRY_BACKOFF_MAX = 60

class LogRetry(Retry):
    def __init__(self, *args, **kwargs):
        history = kwargs.get("history")
        if history is not None:
            retry_count = len(history)
            sleep_time = kwargs["backoff_factor"] * (2 ** (retry_count-1))
            sleep_time = sleep_time if sleep_time < kwargs['backoff_max'] else kwargs['backoff_max']
            print(f'retry={retry_count-1}/{retry_count+kwargs["total"]}, sleep_to_next={sleep_time}s, result={history[-1]}')
        super().__init__(*args, **kwargs)

class BacklogApiHandler:

    def __init__(self, api_key, base_url):
        self.api_key = api_key
        self.base_url = base_url

        self.session = Session()
        retries = LogRetry(
            total=API_REQUEST_RETRY_COUNT,
            backoff_factor=API_REQUEST_RETRY_BACKOFF_FACTOR,
            backoff_max=API_REQUEST_RETRY_BACKOFF_MAX,
            status_forcelist=[429],
            allowed_methods={'GET'},
        )
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

    def get_project_id_by_project_name(self, project_name):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-project/#
        payload = {
            'apiKey': f'{self.api_key}'
        }
        response = self.session.get(f"{self.base_url}/api/v2/projects/{project_name}", params=payload)
        return json.loads(response.content)['id']

    def get_formatting_rule_by_project_name(self, project_name):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-project/#
        payload = {
            'apiKey': f'{self.api_key}'
        }
        response = self.session.get(f"{self.base_url}/api/v2/projects/{project_name}", params=payload)
        return json.loads(response.content)['textFormattingRule']

    def get_issues_by_project_id(self, project_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-issue-list/#
        payload = {
            'apiKey': f'{self.api_key}',
            'projectId[]': f'{project_id}',
            'count': f'{GET_ISSUES_PAGE_SIZE}'
        }

        issues_of_page = None
        issues_of_all = []
        page_count = 0
        while issues_of_page is None or len(issues_of_page) > 0:
            payload = {
                **payload,
                'offset': f'{GET_ISSUES_PAGE_SIZE*page_count}',
            }
            response = self.session.get(f'{self.base_url}/api/v2/issues', params=payload)
            issues_of_page = json.loads(response.text)
            issues_of_all.extend(issues_of_page)
            page_count += 1

        issues_of_all = sorted(issues_of_all, key=lambda v: v["keyId"])

        return issues_of_all

    def get_comments_by_issue_id(self, issue_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-comment-list/#
        payload = {
            'apiKey': f'{self.api_key}',
            'count': f'{GET_COMMENTS_PAGE_SIZE}',
            'order': 'asc'
        }

        comments_of_page = None
        comments_of_all = []
        min_id = 0
        while comments_of_page is None or len(comments_of_page) > 0:
            payload = {
                **payload,
                'minId': min_id
            }
            response = self.session.get(f"{self.base_url}/api/v2/issues/{issue_id}/comments", params=payload)
            comments_of_page = json.loads(response.text)

            if len(comments_of_page) > 0:
                min_id = comments_of_page[-1]['id']
            comments_of_all.extend(comments_of_page)

        return comments_of_all

    def get_attachments_by_issue_id(self, issue_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-list-of-issue-attachments/#
        payload = {
            'apiKey': f'{self.api_key}',
        }
        response = self.session.get(f"{self.base_url}/api/v2/issues/{issue_id}/attachments", params=payload)
        attachments = json.loads(response.text)
        return attachments

    def get_attachment_body_of_issue(self, issue_id, attachment_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-issue-attachment/#
        payload = {
            'apiKey': f'{self.api_key}',
        }
        response = self.session.get(
            f"{self.base_url}/api/v2/issues/{issue_id}/attachments/{attachment_id}",
            params=payload)
        return response.content

    def get_wikis_by_project_id(self, project_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page-list/#
        payload = {
            'projectIdOrKey': f'{project_id}'
            , 'apiKey': f'{self.api_key}'
        }
        response = self.session.get(f"{self.base_url}/api/v2/wikis", params=payload)
        wikis = json.loads(response.text)
        wikis = sorted(wikis, key=lambda v: v["name"])
        return wikis

    def get_wiki_body_by_wiki_id(self, wiki_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page/#
        payload = {
            'apiKey': f'{self.api_key}'
        }
        response = self.session.get(f"{self.base_url}/api/v2/wikis/{wiki_id}", params=payload)
        body = json.loads(response.text)["content"]
        return body


    def get_attachment_body_of_wiki(self, wiki_id, attachment_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page-attachment/#
        payload = {
            'apiKey': f'{self.api_key}',
        }
        response = self.session.get(
            f"{self.base_url}/api/v2/issues/{wiki_id}/attachments/{attachment_id}",
            params=payload)
        return response.content

    def get_files_by_project_id(self, project_id, path="/"):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-list-of-shared-files/#
        payload = {
            'apiKey': f'{self.api_key}',
            'count': f'{GET_FILES_PAGE_SIZE}'
        }

        files_of_page = None
        files_of_all = []
        page_count = 0
        while files_of_page is None or len(files_of_page) > 0:
            payload = {
                **payload,
                'offset': f'{GET_FILES_PAGE_SIZE*page_count}',
            }
            response = self.session.get(f'{self.base_url}/api/v2/projects/{project_id}/files/metadata{path}', params=payload)
            files_of_page = json.loads(response.text)
            files_of_all.extend(files_of_page)
            page_count += 1

        dirs = [ f for f in files_of_all if f['type'] == 'directory' ]
        files_of_all = [ f for f in files_of_all if f['type'] == 'file' ]

        for d in dirs:
            files_of_child = self.get_files_by_project_id(project_id, path='{}{}'.format(d['dir'], d['name']))
            files_of_all.extend(files_of_child)

        return files_of_all

    def get_file_body(self, project_id, file_id):
        # https://developer.nulab.com/ja/docs/backlog/api/2/get-file/#
        payload = {
            'apiKey': f'{self.api_key}',
        }
        response = self.session.get(
            f"{self.base_url}/api/v2/projects/{project_id}/files/{file_id}",
            params=payload)
        return response.content

BacklogApiHandlerの実装がメインで、BacklogのAPIを実際に呼び出します。

BacklogExporterからはこのBacklogApiHandlerの処理を呼び出します。

BacklogApiHandlerで使用しているAPIは以下となります。

APIドキュメント 実装時の考慮事項
プロジェクト情報の取得
課題一覧の取得 ページングを考慮して繰り返しリクエスト
課題コメントの取得 ページングを考慮して繰り返しリクエスト
課題添付ファイル一覧の取得 一覧だがこのAPIはページングがない
課題添付ファイルのダウンロード
Wikiページ一覧の取得 wikiは一覧取得時にツリー構造ごと取得できる
一覧だがこのAPIはページングがない
Wikiページ情報の取得
Wiki添付ファイルのダウンロード
共有ファイル一覧の取得 あるパスを指定した一覧取得となるため、typeを見ながらdirectoryは再帰的に実行
その際、ページングを考慮して繰り返しリクエスト
共有ファイルのダウンロード

要所に各APIに合わせた処理を実装しており、ページングを考慮したリクエスト、全件を取得するための再帰的に実行するリクエストなどを工夫しています。

また、ステータスコードが429(Too many requests)の場合は、urllib3Retryを使ってリトライを以下のように実装しています。

        self.session = Session()
        retries = LogRetry(
            total=API_REQUEST_RETRY_COUNT,
            backoff_factor=API_REQUEST_RETRY_BACKOFF_FACTOR,
            backoff_max=API_REQUEST_RETRY_BACKOFF_MAX,
            status_forcelist=[429],
            allowed_methods={'GET'},
        )
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

このself.session経由でgetを実行することでリトライを実現しています。

定数は以下のように設定し、毎回60秒待つ形にしています。

API_REQUEST_RETRY_COUNT = 3
API_REQUEST_RETRY_BACKOFF_FACTOR = 60
API_REQUEST_RETRY_BACKOFF_MAX = 60

この仕様は以下に沿って固定で60秒待ってリトライするように設定していますが、用途に応じて変更することは可能です。

またリトライ時のログが出力されるよう、以下のLogRetryクラスを定義しています。

class LogRetry(Retry):
    def __init__(self, *args, **kwargs):
        history = kwargs.get("history")
        if history is not None:
            retry_count = len(history)
            sleep_time = kwargs["backoff_factor"] * (2 ** (retry_count-1))
            sleep_time = sleep_time if sleep_time < kwargs['backoff_max'] else kwargs['backoff_max']
            print(f'retry={retry_count-1}/{retry_count+kwargs["total"]}, sleep_to_next={sleep_time}s, result={history[-1]}')
        super().__init__(*args, **kwargs)

その他大きく難しい点はないのですが、ご興味があれば実装を確認して頂ければと思います。

まとめ

いかがでしたでしょうか。

本記事が、Backlogをお使いになられている方の参考になれば幸いです。