DDNS 动态域名系统原理浅析
编辑DDNS(Dynamic Domain Name System)在需要远程访问家庭网络设备或搭建服务器等场景中非常有用。今天我们来讨论一下 DDNS 的实现过程。我最近将域名 DNS 解析迁移到了火山引擎 TrafficRoute DNS 套件(以下简称TRDNS),发现市面上没有针对 TRDNS 的动态域名解析工具,因此我决定自己动手实现一个,并在这里记录下这个过程。
必备材料
购买一个域名(可在阿里云、腾讯云、华为云等服务商购买,注意长期在国内使用需进行工信部ICP备案和网安备案)
使用DNS解析API(以下简称【设置接口】,由DNS服务商提供,默认情况下在购买域名的服务商提供。在购买域名之前,确保服务器提供此服务,大多数服务商都有)
家庭宽带具有动态IP(没有动态IP无法使用DDNS,IPv4 和 IPv6 的 DDNS 解析过程都是一样的,除了服务商解析API可能会有区别,下文我依IPv4为例)
熟练掌握一门编程语言(本文使用Python和Groovy(Java),读者可自行选择其他编程语言)
使用免费IP信息查询API接口(以下简称【查询接口】,可在此GitHub仓库中查找:https://github.com/ihmily/ip-info-api)
DDNS 实现原理
我直接用通俗易懂的语言来解释整个代码的实现过程,相信看到这里的同学都能够听得明白。
我们会使用定时任务不断地自动查询接口,获取当前的公网IP地址,并将其记录在本地。每次查询后,我们会将新的IP地址与上一次保存的IP地址进行比对,如果不一致,就说明IP发生了变化,这时候我们需要通过DNS解析API将新的IP地址解析到对应的域名上。
整个DDNS的运行过程其实非常简单,我们常用的DDNS客户端ddns-go和ALIDDNS都是基于这个原理实现的。
下文将根据以下几点来说明实现过程:
获取当前的公网IP地址
使用定时任务
判断IP是否发生变化
将新的IP地址解析到域名上
获取当前的公网 IP 地址
大多数查询接口都是使用 GET 请求。为了确定接口返回的是公网 IPv4 还是 IPv6,我们可以直接在浏览器中打开接口并查看返回的 IP。但是,有些特殊情况,比如 https://ipinfo.io,使用浏览器打开是可视化的网页,使用接口方式访问直接返回 IPv4 地址。因此,最好使用编程发起请求。
以下是一个 Python 实例,用于获取当前公网 IP 地址:
import requests
def get_pub_ip():
try:
response = requests.get('https://ipinfo.io', timeout=2)
response.raise_for_status()
return response.json().get('ip')
except Exception as e:
return None
这样我们就成功获取了公网 IP 地址。为了确保稳定性,建议在实际使用中从多个查询接口中获取公网 IP,以免受某个查询接口临时故障的影响。
使用定时任务
定时任务的主要目的是为了定时循环监控公网IP的变化。一般来说,我们会设置时间间隔,建议以分钟为单位进行设置。
实现定时任务的方式有很多种,可以使用编程框架提供的定时任务。但一般情况下,这些定时任务需要持续在后台运行。个人认为即用即走的方案更为便捷,例如Linux系统自带的crontab或Windows系统自带的任务计划程序等。我的DDNS实现只有一个Python脚本,正好也是在Linux系统上,因此我选择使用crontab。大家可以根据自己的情况选择适合自己的方式进行设置。
# 每三分钟检查一次IP变化
*/3 * * * * sudo -u root /usr/bin/python3 /ddns/ddns.py
判断 IP 是否发生变化
需要将查询到的公网IP保存起来,最简单的方式是将其存储在本地文件中。下次执行任务时,可以将查询到的IP与之前保存的IP进行比对,以判断是否发生了变化。这个步骤非常简单:
import requests
import sys
def get_pub_ip():
try:
response = requests.get('https://ipinfo.io', timeout=2)
response.raise_for_status()
return response.json().get('ip')
except Exception as e:
return None
# IPV4地址保存文件
ipv4_file = '/ddns/ipv4.txt'
# 读取IPV4文件
def read_ipv4_file():
try:
with open(ipv4_file, 'r') as file:
return file.read()
except FileNotFoundError:
return None
# 写入IPV4文件
def save_ipv4_file(ip):
try:
with open(ipv4_file, 'w') as file:
file.write(ip)
except Exception as e:
print(f"写入文件时发生错误: {e}")
if __name__ == '__main__':
# 获取当前IP
ipv4 = get_pub_ip()
# 获取旧IP
old_ipv4 = read_ipv4_file()
# 比对IP是否变更
if old_ipv4 == ipv4:
sys.exit()
# 保存变更后的IP
save_ipv4_file(ipv4)
将新的IP地址解析到域名上
不同DNS服务商的设置接口可能不同,因此我的示例代码可能无法适用于所有人。然而,基本思路是一致的,需要大家灵活运用。我们需要查阅服务商的云解析DNS文档,了解其API列表和调用方式。需要关注的API包括:获取域名信息、添加解析记录、删除解析记录和更新解析记录。按照我的步骤,只需关注这几项API。当然,更推荐通读整个API文档,以获取更全面有效的信息。
TrafficRoute DNS 套件文档地址:https://www.volcengine.com/docs/6758/155086
我使用 Groovy 脚本编写了这部分代码,因为我对 Java 更熟悉。由于需要与上面的 Python 代码结合使用,而且我习惯即用即走的设计方式,因此这部分代码被设计成可独立使用的部分。Python 通过命令行 subprocess 的方式进行调用与结合。
Groovy 部分代码:
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.security.MessageDigest
import java.time.ZoneId
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.util.logging.Level
import java.util.logging.Logger
import jp.co.osstech.regdom4j.RegDomain
//校验参数数量
if(args.length < 2) {
Logger.global.log(Level.SEVERE, "Usage: script.groovy <domain_name> <new_ip>")
System.exit(0)
}
//获取参数
def DOMAIN_NAME = args[0]
def DOMAIN = new RegDomain().getRegisteredDomain(DOMAIN_NAME);
def HOST = DOMAIN_NAME.substring(0, DOMAIN_NAME.lastIndexOf(DOMAIN)-1)
def NEW_IP = args[1]
//获取域名记录ZID
def zid = getZID(DOMAIN);
//获取域名解析记录
def recordId = getRecordID(zid, DOMAIN_NAME);
if(recordId==null || recordId.isBlank()) {
//创建IP解析
createDNS(zid, NEW_IP, HOST);
}else{
//更新IP解析
updateDNS(recordId, NEW_IP, HOST);
}
//=======================================================
static void updateDNS(String recordId, String ip, String host) throws URISyntaxException, IOException, InterruptedException {
def body = JsonOutput.toJson([
Host: host,
Line: "default",
RecordID: recordId,
TTL: 600,
Type: "A",
Value: ip,
Weight: 1
]).toString().bytes
var result = request("POST", [:], [:], "UpdateRecord", body)
if(result.RecordID == recordId){
Logger.global.log(Level.WARNING, "Update DNS successfully")
}else{
Logger.global.log(Level.SEVERE, "Update DNS failed")
}
}
static void createDNS(Long zid, String ip, String host) {
def body = JsonOutput.toJson([
Host: host,
Line: "default",
TTL: 600,
Type: "A",
Value: ip,
ZID: zid
]).bytes
var result = request("POST", [:], [:], "CreateRecord", body);
if(result.Host == host){
Logger.global.log(Level.WARNING, "Create DNS successfully");
}else{
Logger.global.log(Level.WARNING, "Create DNS failed");
}
}
static String getRecordID(Long zid, String DOMAIN_NAME) {
var query = [ZID: zid, PageSize: 500];
var result = request("GET", query, [:], "ListRecords",[] as byte[]);
def record = result.Records?.find { r -> DOMAIN_NAME == (r as Map).PQDN } as Map
return record ? record.RecordID.toString() : null
}
static Long getZID(String domain) {
def query = [PageSize: 500];
def result = request("GET", query, [:], "ListZones", [] as byte[])
def zone = result.Zones?.find { z -> domain == (z as Map).ZoneName } as Map
return zone?.ZID as Long;
}
//=====================================================================
/**
* sha256非对称加密
*/
static byte[] hmacSHA256(byte[] key, String content) {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(key, "HmacSHA256"));
return mac.doFinal(content.getBytes());
}
/**
* sha256 hash算法
*/
static String hashSHA256(byte[] content) {
MessageDigest md = MessageDigest.getInstance("SHA-256");
return encodeHexStr(md.digest(content));
}
/**
* 将字节数组转换为十六进制字符串
* @param byteArray 字节数组
* @return 十六进制字符串
*/
static String encodeHexStr(byte[] byteArray) {
StringBuilder hexString = new StringBuilder();
for (byte b : byteArray) {
hexString.append(String.format("%02x", b));
}
return hexString.toString();
}
/**
* 发送请求
*/
static Map<?, ?> request(String method, Map<String, Object> query, Map<String, String> header, String action, byte[] body) {
def contentType = "application/json"
def host = "open.volcengineapi.com"
def path = "/"
// 初始化身份证明
var credential = [
"accessKeyId": "AK",
"secretKeyId": "SK",
"service": "DNS",
"region": "cn-north-1"
]
//计算签名
def queryList = new ArrayList<>(query.entrySet())
queryList.sort { it.key }
def pairs = queryList.collect { "${it.key}=${it.value}" } as List<String>
pairs.add("Action=${action}")
pairs.add("Version=2018-08-01")
//按参数名称对查询参数进行升序排序
pairs.sort { it.split('=')[0] }
// 初始化签名结构
var uri = new URI("https", host, path, String.join("&", pairs), null)
// 接下来开始计算签名
var formatter = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneId.of("UTC"))
var zonedDateTime = ZonedDateTime.now(ZoneId.of("UTC"))
var xDate = formatter.format(zonedDateTime)
var shortXDate = xDate.substring(0, 8)
var xContentSha256 = hashSHA256(body)
//计算
def headStr = ["content-type", "host", "x-content-sha256", "x-date"]
var signedHeadersStr = headStr.join(';')
def headStrSecond = ["content-type:$contentType", "host:$host", "x-content-sha256:$xContentSha256", "x-date:$xDate"]
var preRequestStr = headStrSecond.join('\n')
var preCanonicalRequestStr = [method, path, uri.getRawQuery(), preRequestStr, "", signedHeadersStr, xContentSha256]
var canonicalRequestStr = preCanonicalRequestStr.join('\n')
var hashedCanonicalRequest = hashSHA256(canonicalRequestStr.getBytes())
var credentialStr = [shortXDate, credential.get("region"), credential.get("service"), "request"]
var credentialScope = credentialStr.join("/")
var preStringToSign = ["HMAC-SHA256", xDate, credentialScope, hashedCanonicalRequest]
var stringToSign = preStringToSign.join("\n")
var kDate = hmacSHA256(credential.get("secretKeyId").getBytes(), shortXDate)
var kRegion = hmacSHA256(kDate, credential.get("region"))
var kService = hmacSHA256(kRegion, credential.get("service"))
var kSigning = hmacSHA256(kService, "request")
var signature = encodeHexStr(Objects.requireNonNull(hmacSHA256(kSigning, stringToSign)))
def authorization = "HMAC-SHA256 Credential=${credential.get("accessKeyId")}/${credentialScope}, SignedHeaders=${signedHeadersStr}, Signature=${signature}"
//构建请求头
var requestBuilder = HttpRequest.newBuilder().uri(uri)
.header("Content-Type", contentType)
.header("X-Date", xDate)
.header("X-Content-Sha256", xContentSha256)
.header("Authorization", authorization);
header.forEach(requestBuilder::header);
//构建请求
var request = switch (method.toUpperCase()) {
case "POST" -> requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(body)).build()
case "GET" -> requestBuilder.GET().build()
default -> throw new UnsupportedOperationException()
}
//发送请求
var response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString())
//解析结果
def result = new JsonSlurper().parseText(response.body()) as Map<String, ?>
if(!result.containsKey("Result")){
def error = (result.ResponseMetadata as Map).Error as Map
//直接抛出错误
throw new RuntimeException("$error.MessageCN.$error.Message")
}
return result.Result as Map<String, ?>
}
Python部分代码:
# 更新DNS
result = subprocess.run(['groovy', '/ddns/script.groovy', 'www.xyz.cn', ipv4], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
logging.info(f"更新DDNS成功:{ipv4}")
else:
error = result.stderr
logging.error(f"更新DDNS失败:{ipv4} - {error}")
if result.stdout:
logging.error(f"命令标准输出: {result.stdout}")
结语
这样就实现了一个简单的DDNS客户端工具,整个步骤也比较简单,主要复杂在研究DNS服务商云解析API上。当然,以上步骤中的代码也有很多不合理的地方或者存在BUG,主要是为了说明流程。你可以根据自己的想法进行补充修改,比如增加当IP发生变化时通过微信发送通知等功能。
需要补充一点,定时任务的间隔直接影响了IP发生变化时更新DNS解析的速度,但也不是唯一的变量,因为DNS解析需要一定的生效时间。
感谢大家观看,如果不妥支持清谅解,有喜欢的部分可以进行讨论,我会考虑展开细说。
- 0
- 0
-
分享