webhook编写教程
目录
1 使用场景
编写一个web接口用于服务调用。
2 步骤
- 定义web接口。可以使用任何语言。这里使用的是python的fastapi,编写起来比较快。直接把你的需求和AI说让他给你写就行。这里以我的需求为例。通过访问
/trigger将本地文件使用scp命令传输到远端服务器。
## webhook_scp_server.py
from fastapi import FastAPI
from fastapi.responses import JSONResponse
import subprocess
import os
from datetime import datetime
import logging
## 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("scp-webhook")
app = FastAPI(title="SCP Trigger Webhook")
## 配置:请根据你的实际路径修改!
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
LOCAL_FILE_PATH = os.path.join(SCRIPT_DIR, "output", "all.yaml")
REMOTE_TARGET = "newserver:/path/to/location/"
@app.get("/trigger")
async def trigger_scp():
try:
now = datetime.now()
logger.info(f"[{now}] SCP Trigger received.")
## 检查本地文件是否存在
if not os.path.exists(LOCAL_FILE_PATH):
error_msg = f"Local file not found: {LOCAL_FILE_PATH}"
logger.error(error_msg)
return JSONResponse(
status_code=400,
content={"error": error_msg, "time": now.isoformat()}
)
## 构造 scp 命令
cmd = [
"scp",
LOCAL_FILE_PATH,
REMOTE_TARGET
]
## 执行命令
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False ## 不自动抛异常,我们手动处理
)
## 记录输出
if result.returncode == 0:
success_msg = f"SCP succeeded at {now}"
logger.info(success_msg)
logger.info(f"STDOUT: {result.stdout}")
return {
"status": "success",
"message": "File transferred successfully",
"time": now.isoformat(),
"stdout": result.stdout.strip()
}
else:
error_msg = f"SCP failed at {now} (exit code {result.returncode})"
logger.error(error_msg)
logger.error(f"STDERR: {result.stderr}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": "SCP command failed",
"time": now.isoformat(),
"stderr": result.stderr.strip(),
"exit_code": result.returncode
}
)
except Exception as e:
now = datetime.now()
logger.exception(f"[{now}] Unexpected error")
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": str(e),
"time": now.isoformat()
}
)
## 用于测试服务是否启动
@app.get("/")
async def root():
return {"message": "SCP Webhook Server is running. Try GET /trigger"}
- 安装相关依赖。
pip install fastapi uvicorn
- 手动启动测试功能是否正常。
uvicorn webhook_scp_server:app --host 0.0.0.0 --port 8888 --reload
curl -X GET http://yourip:8888/trigger
- 制作成服务方便使用。
## /etc/systemd/system/scp-webhook.service
[Unit]
Description=SCP Webhook Server
After=network.target
[Service]
User=rui
WorkingDirectory=/home/rui/scripts
ExecStart=/usr/bin/uvicorn webhook_scp_server:app --host 0.0.0.0 --port 8888
Restart=always
[Install]
WantedBy=multi-user.target
- 设置开机启动。
sudo systemctl daemon-reload
sudo systemctl enable scp-webhook
sudo systemctl start scp-webhook
sudo systemctl status scp-webhook
- 查看日志。
sudo journalctl -fu scp-webhook