こんちには。
データアナリティクス事業本部 インテグレーション部 機械学習チームの中村です。
今回はBacklogの課題とwikiとファイルをPythonでええ感じに出力(エクスポート)する方法を紹介します。
なお、本記事と似たようなことは過去に以下の記事で投稿していますが、今回はwikiやファイルを含めて取り扱いました。
本記事の方法でできること・できないこと
本記事の方法では以下ができます
- 課題
- 課題の記載内容とコメント(テキストファイルとして、コメントには日時・作成者を包含)の出力
- 課題に添付されたファイルの出力
- 添付ファイルもダウンロード
- wiki
- wikiの記載内容の出力
- wikiの階層構造はディレクトリ構造として維持
- wikiの添付ファイルもダウンロード
- ファイル
- ファイルにアップロードされているもの出力
- ディレクトリ階層構造は維持
- その他
- プロジェクトIDはプロジェクト名から取得
- Backlogはテキスト記載フォーマットにmarkdownとbacklogの二つ形式があるが自動で認識して拡張子を変更
- Backlog APIへのリトライ処理
以下の対応はできておりませんので、あくまでサンプルとしてお使いください。
- Backlog APIへのリクエスト時のエラーハンドリング
- テストコードの実装
出力形式
エクスポート結果は例えば以下のような形式で出力されます。
output/{プロジェクト名}
├ issues/
│ ├ {プロジェクト名}-001-{チケットのタイトル}/
│ │ ├ body.backlog
│ │ └ attachment/
│ │ ├ hoge.png
│ │ └ fuga.csv
│ └ {プロジェクト名}-002-{チケットのタイトル}/
│ ├ body.backlog
│ └ attachment/
│ ├ hogehoge.png
│ └ fugafuga.csv
├ wikis/
│ └ Home/
│ ├ body.backlog
│ ├ 子ページ1/
│ │ ├ body.backlog
│ │ └ attachment/
│ │ ├ foo.png
│ │ └ foo.csv
│ └ 子ページ2/
│ ├ body.backlog
│ └ attachment/
│ ├ bar.png
│ └ bar.csv
└ files/
├ dir1/
│ ├ file1.png
│ └ file1.csv
├ file2.png
└ file2.csv
使い方
準備
Pythonのバージョンやライブラリのバージョンは以下です。必要に応じてpipなどでインストールしてください。
(私はpipenv環境でやっていますが環境に合わせてお好みで実施ください)
- Python : 3.10.2
- requests : 2.31.0
- python-dotenv : 1.0.0
あらかじめ以下の環境変数を.env
に記述しておきます。
(環境変数の設定方法もお好みでOKです)
BACKLOG_API_KEY={APIキー}
BACKLOG_BASE_URL={ベースURL}
BACKLOG_BASE_URLはBacklogのページのFQDNを設定します。
- 例 :
www.example.co.jp
BACKLOG_API_KEYキーは、ページ右上から「個人設定」を開き、
「API」から新しいAPIキーを発行するか、既存のAPIキーをコピーしてください。
プロジェクト名は、課題番号に必ず付くプレフィックスと同じです。
コード構成
コードは以下のようになっています。
├ main.py
├ backlog_exporter.py
└ backlog_api_handler.py
main.py
main.py
は実際にコンソールから実行するプログラムで、名前は何でもOKで必要に応じてカスタマイズします。
例えば以下のように使用します。
import os
from dotenv import load_dotenv
load_dotenv(verbose=True)
from backlog_api_handler import BacklogApiHandler
from backlog_exporter import BacklogExporter
def main(output_dir="./output"):
backlog_api_handler = BacklogApiHandler(
api_key=os.getenv("BACKLOG_API_KEY"),
base_url=os.getenv("BACKLOG_BASE_URL"),
)
backlog_exporter = BacklogExporter(
backlog_api_handler,
)
backlog_exporter.export_issues(project_name='<プロジェクト名>', output_dir=output_dir)
backlog_exporter.export_wikis(project_name='<プロジェクト名>', output_dir=output_dir)
backlog_exporter.export_files(project_name='<プロジェクト名>', output_dir=output_dir)
return
if __name__ == "__main__":
main()
コード内のプロジェクト名
は取得したいプロジェクトによって変更します。
課題番号に付与されるキーと同じものを指定してください。
backlog_exporter.py
次にbacklog_exporter.py
の中身は以下のようになっています。
import pathlib
import re
from backlog_api_handler import BacklogApiHandler
COMMENT_TEMPLATE = \
"""
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
comment created: {created}
author: {createdUserName}<{createdUserMailAddress}>
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
{content}
"""
def clean_text_of_name(target):
cleaned = re.sub(r'[\\|/|:|?|"|<|>|\|\*]', '-', target)
cleaned = cleaned.strip(" ")
return cleaned
def clean_text_keep_tree(target):
cleaned = re.sub(r'[\\|:|?|"|<|>|\||\*]', '-', target)
cleaned = cleaned.strip(" ")
return cleaned
def comments_to_text(comments):
text = "\n\n"
for comment in comments:
comment_data = {
"content": comment['content'],
"createdUserName": comment['createdUser']['name'],
"createdUserMailAddress": comment['createdUser']['mailAddress'],
"created": comment['created'],
}
if comment_data["content"] is None:
continue
text = text + COMMENT_TEMPLATE.format(**comment_data)
return text
class BacklogExporter:
def __init__(self, backlog_api_handler: BacklogApiHandler):
self.backlog_api_handler = backlog_api_handler
def export_issues(self, project_name, output_dir):
project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)
formatting_rule = self.backlog_api_handler.get_formatting_rule_by_project_name(project_name)
issues_of_all = self.backlog_api_handler.get_issues_by_project_id(project_id)
for issue in issues_of_all:
self.__export_issue(issue, project_name, output_dir, formatting_rule)
def export_wikis(self, project_name, output_dir):
project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)
formatting_rule = self.backlog_api_handler.get_formatting_rule_by_project_name(project_name)
wikis_of_all = self.backlog_api_handler.get_wikis_by_project_id(project_id)
for wiki in wikis_of_all:
self.__export_wiki(wiki, project_name, output_dir, formatting_rule)
def export_files(self, project_name, output_dir):
project_id = self.backlog_api_handler.get_project_id_by_project_name(project_name)
files = self.backlog_api_handler.get_files_by_project_id(project_id)
for file in files:
self.__export_file(project_name, project_id, file, pathlib.Path(output_dir))
def __export_issue(self, issue, project_name, output_dir, formatting_rule):
issue_id = issue['id']
issue_number = issue['keyId']
issue_title = clean_text_of_name(issue['summary'])
issue_body = issue['description']
output_path = pathlib.Path(output_dir).joinpath(
project_name,
"issues",
f"{project_name}-{issue_number:03d}-{issue_title}",
"body.md" if formatting_rule == "markdown" else "body.backlog"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
print(output_path)
with open(output_path, "wt", encoding='utf-8') as f:
f.writelines(issue_body)
comments = self.backlog_api_handler.get_comments_by_issue_id(issue_id)
text_of_comments = comments_to_text(comments)
f.writelines(text_of_comments)
attachments = self.backlog_api_handler.get_attachments_by_issue_id(issue_id)
for attachment in attachments:
self.__export_attachment_of_issue(issue_id, attachment, output_path.parent)
def __export_attachment_of_issue(self, issue_id, attachment, output_base_path: pathlib.Path):
attachment_id = attachment['id']
attachment_name = attachment['name']
attachment_name = clean_text_of_name(attachment_name)
output_path = output_base_path.joinpath("attachment", f"{attachment_name}")
output_path.parent.mkdir(parents=True, exist_ok=True)
attachment_body = self.backlog_api_handler.get_attachment_body_of_issue(issue_id, attachment_id)
print(output_path)
with open(output_path, "wb") as f:
f.write(attachment_body)
def __export_wiki(self, wiki, project_name, output_dir, formatting_rule):
wiki_id = wiki["id"]
wiki_name_tree = clean_text_keep_tree(wiki["name"])
output_path = pathlib.Path(output_dir).joinpath(
project_name, "wikis", f"{wiki_name_tree}",
"body.md" if formatting_rule == "markdown" else "body.backlog"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
wiki_body = self.backlog_api_handler.get_wiki_body_by_wiki_id(wiki_id)
print(output_path)
with open(output_path, "wt", encoding='utf-8') as f:
f.writelines(wiki_body)
attachments = wiki["attachments"]
for attachment in attachments:
self.__export_attachment_of_wiki(wiki_id, attachment, output_path.parent)
def __export_attachment_of_wiki(self, wiki_id, attachment, output_base_path: pathlib.Path):
attachment_id = attachment['id']
attachment_name = attachment['name']
attachment_name = clean_text_of_name(attachment_name)
output_path = output_base_path.joinpath("attachment", f"{attachment_name}")
output_path.parent.mkdir(parents=True, exist_ok=True)
attachment_body = self.backlog_api_handler.get_attachment_body_of_wiki(wiki_id, attachment_id)
print(output_path)
with open(output_path, "wb") as f:
f.write(attachment_body)
def __export_file(self, project_name, project_id, file, output_base_path: pathlib.Path):
file_id = file['id']
file_name = clean_text_of_name(file['name'])
file_dir = clean_text_keep_tree(file['dir'])
output_path = output_base_path.joinpath(project_name, f"files{file_dir}{file_name}")
output_path.parent.mkdir(parents=True, exist_ok=True)
file_body = self.backlog_api_handler.get_file_body(project_id, file_id)
print(output_path)
with open(output_path, "wb") as f:
f.write(file_body)
BacklogExporter
の実装がメインで、エクスポートする処理を担っています。
main.py
からはこのBacklogExporter
の処理を呼び出します。
BacklogExporter
はおおきな工夫点はないのですが、前回記事と同様にファイル名に使えない文字をすべて-
に置き換えています。
またツリーを維持したい場合は、/
を置き換えると困るので、以下のようにcleanする関数を2つ準備しています。
def clean_text_of_name(target):
cleaned = re.sub(r'[\\|/|:|?|"|<|>|\|\*]', '-', target)
cleaned = cleaned.strip(" ")
return cleaned
def clean_text_keep_tree(target):
cleaned = re.sub(r'[\\|:|?|"|<|>|\||\*]', '-', target)
cleaned = cleaned.strip(" ")
return cleaned
backlog_api_handler.py
backlog_api_handler.py
の中身は以下のようになっています。
import json
import requests
from requests import Session
from urllib3.util import Retry
from requests import Session
from requests.adapters import HTTPAdapter
GET_ISSUES_PAGE_SIZE = 100
GET_COMMENTS_PAGE_SIZE = 100
GET_FILES_PAGE_SIZE = 1000
API_REQUEST_RETRY_COUNT = 3
API_REQUEST_RETRY_BACKOFF_FACTOR = 60
API_REQUEST_RETRY_BACKOFF_MAX = 60
class LogRetry(Retry):
def __init__(self, *args, **kwargs):
history = kwargs.get("history")
if history is not None:
retry_count = len(history)
sleep_time = kwargs["backoff_factor"] * (2 ** (retry_count-1))
sleep_time = sleep_time if sleep_time < kwargs['backoff_max'] else kwargs['backoff_max']
print(f'retry={retry_count-1}/{retry_count+kwargs["total"]}, sleep_to_next={sleep_time}s, result={history[-1]}')
super().__init__(*args, **kwargs)
class BacklogApiHandler:
def __init__(self, api_key, base_url):
self.api_key = api_key
self.base_url = base_url
self.session = Session()
retries = LogRetry(
total=API_REQUEST_RETRY_COUNT,
backoff_factor=API_REQUEST_RETRY_BACKOFF_FACTOR,
backoff_max=API_REQUEST_RETRY_BACKOFF_MAX,
status_forcelist=[429],
allowed_methods={'GET'},
)
self.session.mount('https://', HTTPAdapter(max_retries=retries))
def get_project_id_by_project_name(self, project_name):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-project/#
payload = {
'apiKey': f'{self.api_key}'
}
response = self.session.get(f"{self.base_url}/api/v2/projects/{project_name}", params=payload)
return json.loads(response.content)['id']
def get_formatting_rule_by_project_name(self, project_name):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-project/#
payload = {
'apiKey': f'{self.api_key}'
}
response = self.session.get(f"{self.base_url}/api/v2/projects/{project_name}", params=payload)
return json.loads(response.content)['textFormattingRule']
def get_issues_by_project_id(self, project_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-issue-list/#
payload = {
'apiKey': f'{self.api_key}',
'projectId[]': f'{project_id}',
'count': f'{GET_ISSUES_PAGE_SIZE}'
}
issues_of_page = None
issues_of_all = []
page_count = 0
while issues_of_page is None or len(issues_of_page) > 0:
payload = {
**payload,
'offset': f'{GET_ISSUES_PAGE_SIZE*page_count}',
}
response = self.session.get(f'{self.base_url}/api/v2/issues', params=payload)
issues_of_page = json.loads(response.text)
issues_of_all.extend(issues_of_page)
page_count += 1
issues_of_all = sorted(issues_of_all, key=lambda v: v["keyId"])
return issues_of_all
def get_comments_by_issue_id(self, issue_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-comment-list/#
payload = {
'apiKey': f'{self.api_key}',
'count': f'{GET_COMMENTS_PAGE_SIZE}',
'order': 'asc'
}
comments_of_page = None
comments_of_all = []
min_id = 0
while comments_of_page is None or len(comments_of_page) > 0:
payload = {
**payload,
'minId': min_id
}
response = self.session.get(f"{self.base_url}/api/v2/issues/{issue_id}/comments", params=payload)
comments_of_page = json.loads(response.text)
if len(comments_of_page) > 0:
min_id = comments_of_page[-1]['id']
comments_of_all.extend(comments_of_page)
return comments_of_all
def get_attachments_by_issue_id(self, issue_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-list-of-issue-attachments/#
payload = {
'apiKey': f'{self.api_key}',
}
response = self.session.get(f"{self.base_url}/api/v2/issues/{issue_id}/attachments", params=payload)
attachments = json.loads(response.text)
return attachments
def get_attachment_body_of_issue(self, issue_id, attachment_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-issue-attachment/#
payload = {
'apiKey': f'{self.api_key}',
}
response = self.session.get(
f"{self.base_url}/api/v2/issues/{issue_id}/attachments/{attachment_id}",
params=payload)
return response.content
def get_wikis_by_project_id(self, project_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page-list/#
payload = {
'projectIdOrKey': f'{project_id}'
, 'apiKey': f'{self.api_key}'
}
response = self.session.get(f"{self.base_url}/api/v2/wikis", params=payload)
wikis = json.loads(response.text)
wikis = sorted(wikis, key=lambda v: v["name"])
return wikis
def get_wiki_body_by_wiki_id(self, wiki_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page/#
payload = {
'apiKey': f'{self.api_key}'
}
response = self.session.get(f"{self.base_url}/api/v2/wikis/{wiki_id}", params=payload)
body = json.loads(response.text)["content"]
return body
def get_attachment_body_of_wiki(self, wiki_id, attachment_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-wiki-page-attachment/#
payload = {
'apiKey': f'{self.api_key}',
}
response = self.session.get(
f"{self.base_url}/api/v2/issues/{wiki_id}/attachments/{attachment_id}",
params=payload)
return response.content
def get_files_by_project_id(self, project_id, path="/"):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-list-of-shared-files/#
payload = {
'apiKey': f'{self.api_key}',
'count': f'{GET_FILES_PAGE_SIZE}'
}
files_of_page = None
files_of_all = []
page_count = 0
while files_of_page is None or len(files_of_page) > 0:
payload = {
**payload,
'offset': f'{GET_FILES_PAGE_SIZE*page_count}',
}
response = self.session.get(f'{self.base_url}/api/v2/projects/{project_id}/files/metadata{path}', params=payload)
files_of_page = json.loads(response.text)
files_of_all.extend(files_of_page)
page_count += 1
dirs = [ f for f in files_of_all if f['type'] == 'directory' ]
files_of_all = [ f for f in files_of_all if f['type'] == 'file' ]
for d in dirs:
files_of_child = self.get_files_by_project_id(project_id, path='{}{}'.format(d['dir'], d['name']))
files_of_all.extend(files_of_child)
return files_of_all
def get_file_body(self, project_id, file_id):
# https://developer.nulab.com/ja/docs/backlog/api/2/get-file/#
payload = {
'apiKey': f'{self.api_key}',
}
response = self.session.get(
f"{self.base_url}/api/v2/projects/{project_id}/files/{file_id}",
params=payload)
return response.content
BacklogApiHandler
の実装がメインで、BacklogのAPIを実際に呼び出します。
BacklogExporter
からはこのBacklogApiHandler
の処理を呼び出します。
BacklogApiHandler
で使用しているAPIは以下となります。
APIドキュメント | 実装時の考慮事項 |
---|---|
プロジェクト情報の取得 | |
課題一覧の取得 | ページングを考慮して繰り返しリクエスト |
課題コメントの取得 | ページングを考慮して繰り返しリクエスト |
課題添付ファイル一覧の取得 | 一覧だがこのAPIはページングがない |
課題添付ファイルのダウンロード | |
Wikiページ一覧の取得 | wikiは一覧取得時にツリー構造ごと取得できる 一覧だがこのAPIはページングがない |
Wikiページ情報の取得 | |
Wiki添付ファイルのダウンロード | |
共有ファイル一覧の取得 | あるパスを指定した一覧取得となるため、typeを見ながらdirectoryは再帰的に実行 その際、ページングを考慮して繰り返しリクエスト |
共有ファイルのダウンロード |
要所に各APIに合わせた処理を実装しており、ページングを考慮したリクエスト、全件を取得するための再帰的に実行するリクエストなどを工夫しています。
また、ステータスコードが429(Too many requests)の場合は、urllib3
のRetry
を使ってリトライを以下のように実装しています。
self.session = Session()
retries = LogRetry(
total=API_REQUEST_RETRY_COUNT,
backoff_factor=API_REQUEST_RETRY_BACKOFF_FACTOR,
backoff_max=API_REQUEST_RETRY_BACKOFF_MAX,
status_forcelist=[429],
allowed_methods={'GET'},
)
self.session.mount('https://', HTTPAdapter(max_retries=retries))
このself.session
経由でget
を実行することでリトライを実現しています。
定数は以下のように設定し、毎回60秒待つ形にしています。
API_REQUEST_RETRY_COUNT = 3
API_REQUEST_RETRY_BACKOFF_FACTOR = 60
API_REQUEST_RETRY_BACKOFF_MAX = 60
この仕様は以下に沿って固定で60秒待ってリトライするように設定していますが、用途に応じて変更することは可能です。
- https://developer.nulab.com/ja/docs/backlog/rate-limit/
- https://dev.classmethod.jp/articles/backilogapi_get_issue_attached_files
またリトライ時のログが出力されるよう、以下のLogRetry
クラスを定義しています。
class LogRetry(Retry):
def __init__(self, *args, **kwargs):
history = kwargs.get("history")
if history is not None:
retry_count = len(history)
sleep_time = kwargs["backoff_factor"] * (2 ** (retry_count-1))
sleep_time = sleep_time if sleep_time < kwargs['backoff_max'] else kwargs['backoff_max']
print(f'retry={retry_count-1}/{retry_count+kwargs["total"]}, sleep_to_next={sleep_time}s, result={history[-1]}')
super().__init__(*args, **kwargs)
その他大きく難しい点はないのですが、ご興味があれば実装を確認して頂ければと思います。
まとめ
いかがでしたでしょうか。
本記事が、Backlogをお使いになられている方の参考になれば幸いです。