1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
| import os
import argparse
import re
import requests
import json
import logging
from urllib.parse import unquote
from datetime import datetime
from github import Github, Auth
from bs4 import BeautifulSoup
CATEGORY_MAP = ["生活", "技术", "法律", "瞬间", "社会"]
PUBLISH_LABEL = "发布"
def setup_logger(debug=False):
logger = logging.getLogger('issue-to-hugo')
level = logging.DEBUG if debug else logging.INFO
logger.setLevel(level)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def extract_cover_image(body):
"""提取并删除封面图(正文第一张图片)"""
img_pattern = r"!\[.*?\]\((https?:\/\/[^\)]+)\)"
match = re.search(img_pattern, body)
if match:
img_url = match.group(1)
body = body.replace(match.group(0), "")
return img_url, body
return None, body
def safe_filename(filename):
"""生成安全的文件名,保留或推断文件扩展名"""
clean_url = re.sub(r"\?.*$", "", filename)
basename = os.path.basename(clean_url)
decoded_name = unquote(basename)
name, ext = os.path.splitext(decoded_name)
safe_name = re.sub(r"[^a-zA-Z0-9\-_]", "_", name)
if not ext.lower() in [".jpg", ".jpeg", ".png", ".gif", ".webp"]:
ext = ""
if len(safe_name) > 100 - len(ext):
safe_name = safe_name[:100 - len(ext)]
return safe_name + ext
def download_image(url, output_path, token=None):
"""下载图片到指定路径,基于内容类型确定扩展名,并添加 GitHub 认证头"""
try:
headers = {}
if token:
headers['Authorization'] = f'token {token}'
response = requests.get(url, stream=True, headers=headers)
if response.status_code == 200:
content_type = response.headers.get("content-type", "").lower()
ext = ".jpg"
if "image/png" in content_type:
ext = ".png"
elif "image/jpeg" in content_type or "image/jpg" in content_type:
ext = ".jpg"
elif "image/gif" in content_type:
ext = ".gif"
elif "image/webp" in content_type:
ext = ".webp"
base, current_ext = os.path.splitext(output_path)
if current_ext.lower() not in [".jpg", ".jpeg", ".png", ".gif", ".webp"]:
output_path = base + ext
else:
output_path = base + current_ext
with open(output_path, 'wb') as f:
f.write(response.content)
logging.info(f"图片下载成功: {url} -> {output_path}")
return output_path
else:
logging.error(f"图片下载失败,状态码: {response.status_code}, URL: {url}")
except Exception as e:
logging.error(f"下载图片失败: {url} - {e}")
return None
def replace_image_urls(body, issue_number, output_dir, token=None):
"""替换正文中的远程图片为本地图片"""
img_pattern = r"!\[(.*?)\]\((https?:\/\/[^\)]+)\)"
def replacer(match):
alt_text = match.group(1)
img_url = match.group(2)
filename = f"{issue_number}_{safe_filename(img_url)}"
output_path = os.path.join(output_dir, filename)
final_path = download_image(img_url, output_path, token)
if final_path:
final_filename = os.path.basename(final_path)
return f""
return match.group(0)
return re.sub(img_pattern, replacer, body, flags=re.IGNORECASE)
def sanitize_markdown(content):
"""清理Markdown中的不安全内容"""
if not content:
return ""
soup = BeautifulSoup(content, "html.parser")
allowed_tags = ["p", "a", "code", "pre", "blockquote", "ul", "ol", "li", "strong", "em", "img", "h1", "h2", "h3", "h4", "h5", "h6"]
for tag in soup.find_all(True):
if tag.name not in allowed_tags:
tag.unwrap()
return str(soup)
def extract_tags_from_body(body, logger):
"""从正文最后一行提取标签,使用 $tag$ 格式,并返回清理后的正文"""
if not body:
logger.debug("Body is empty, no tags to extract")
return [], body
body = body.replace('\r\n', '\n').rstrip()
lines = body.split('\n')
if not lines:
logger.debug("No lines in body, no tags to extract")
return [], body
last_line = lines[-1].strip()
logger.debug(f"Last line for tag extraction: '{last_line}'")
tags = re.findall(r'\$(.+?)\$', last_line, re.UNICODE)
tags = [tag.strip() for tag in tags if tag.strip()]
if tags:
logger.debug(f"Extracted tags: {tags}")
body = '\n'.join(lines[:-1]).rstrip()
else:
logger.debug("No tags found in last line")
return tags, body
def convert_issue(issue, output_dir, token, logger):
"""转换单个issue为Hugo内容"""
try:
labels = [label.name for label in issue.labels]
if PUBLISH_LABEL not in labels or issue.state != "open":
logger.debug(f"跳过 issue #{issue.number} - 未标记为发布")
return False
pub_date = issue.created_at.strftime("%Y%m%d")
slug = f"{pub_date}_{issue.number}"
post_dir = os.path.join(output_dir, slug)
if os.path.exists(post_dir):
logger.info(f"跳过 issue #{issue.number} - 目录 {post_dir} 已存在")
return False
os.makedirs(post_dir, exist_ok=True)
body = issue.body or ""
logger.debug(f"Raw issue body: '{body}'")
cover_url, body = extract_cover_image(body)
tags, body = extract_tags_from_body(body, logger)
body = sanitize_markdown(body)
body = replace_image_urls(body, issue.number, post_dir, token)
logger.info(f"图片处理完成,{issue.number} 号 issue")
categories = [tag for tag in labels if tag in CATEGORY_MAP]
category = categories[0] if categories else "生活"
cover_name = None
if cover_url:
try:
cover_filename = f"cover_{safe_filename(cover_url)}"
cover_path = os.path.join(post_dir, cover_filename)
final_cover_path = download_image(cover_url, cover_path, token)
if final_cover_path:
cover_name = os.path.basename(final_cover_path)
logger.info(f"封面图下载成功:{cover_url} > {cover_name}")
else:
logger.error(f"封面图下载失败:{cover_url}")
except Exception as e:
logger.error(f"封面图下载失败:{cover_url} - {e}")
title_escaped = issue.title.replace('"', '\\"')
category_escaped = category.replace('"', '\\"')
frontmatter_lines = [
"---",
f'title: "{title_escaped}"',
f"date: \"{issue.created_at.strftime('%Y-%m-%d')}\"",
f"slug: \"{slug}\"",
f"categories: [\"{category_escaped}\"]",
f"tags: {json.dumps(tags, ensure_ascii=False)}"
]
if cover_name:
frontmatter_lines.append(f"image: \"{cover_name}\"")
frontmatter_lines.append("---\n")
frontmatter = "\n".join(frontmatter_lines)
md_file = os.path.join(post_dir, "index.md")
with open(md_file, "w", encoding="utf-8") as f:
f.write(frontmatter + body)
logger.info(f"成功转换 issue #{issue.number} 到 {md_file}")
return True
except Exception as e:
logger.exception(f"转换 issue #{issue.number} 时发生严重错误")
error_file = os.path.join(output_dir, f"ERROR_{issue.number}.tmp")
with open(error_file, "w") as f:
f.write(f"Conversion failed: {str(e)}")
return False
def main():
args = parse_arguments()
logger = setup_logger(args.debug)
token = args.token or os.getenv("GITHUB_TOKEN")
if not token:
logger.error("Missing GitHub token")
return
try:
auth = Auth.Token(token)
g = Github(auth=auth)
repo = g.get_repo(args.repo)
logger.info(f"已连接至 GitHub 仓库:{args.repo}")
except Exception as e:
logger.error(f"连接GitHub失败: {str(e)}")
return
os.makedirs(args.output, exist_ok=True)
logger.info(f"输出目录: {os.path.abspath(args.output)}")
processed_count = 0
error_count = 0
try:
issues = repo.get_issues(state="open")
total_issues = issues.totalCount
logger.info(f"开始处理 {total_issues} 个打开状态的 issue")
for issue in issues:
if issue.pull_request:
continue
try:
if convert_issue(issue, args.output, token, logger):
processed_count += 1
except Exception as e:
error_count += 1
logger.error(f"处理 issue #{issue.number} 时出错: {str(e)}")
try:
error_comment = f"⚠️ 转换为Hugo内容失败,请检查格式错误:\n\n```\n{str(e)}\n```"
if len(error_comment) > 65536:
error_comment = error_comment[:65000] + "\n```\n...(内容过长,部分已省略)"
issue.create_comment(error_comment)
try:
error_label = repo.get_label("conversion-error")
except:
error_label = repo.create_label("conversion-error", "ff0000")
issue.add_to_labels(error_label)
except Exception as inner_e:
logger.error(f"创建评论或添加标签时出错: {inner_e}")
except Exception as e:
logger.exception(f"获取issues时出错: {e}")
summary = f"处理完成!成功转换 {processed_count} 个issues,{error_count} 个错误"
if processed_count == 0:
logger.info(summary + " - 没有需要处理的内容变更")
else:
logger.info(summary)
if args.debug:
logger.debug("内容目录状态:")
logger.debug(os.listdir(args.output))
def parse_arguments():
parser = argparse.ArgumentParser(description='Convert GitHub issues to Hugo content')
parser.add_argument('--token', type=str, default=None, help='GitHub access token')
parser.add_argument('--repo', type=str, required=True, help='GitHub repository in format owner/repo')
parser.add_argument('--output', type=str, default='content/posts', help='Output directory')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
return parser.parse_args()
if __name__ == "__main__":
main()
|