From 3fec661adb0e95460113b5f70ac90b0fe7056dbf Mon Sep 17 00:00:00 2001 From: sbwml Date: Sat, 23 Sep 2023 15:17:07 +0800 Subject: [PATCH] mosdns: pacthes: sync upstream latest commit fixed #129 --- .../117-pool-simplify-PackBuffer.patch | 222 ++++++++++++ ...esToStringUnsafe-remove-SplitLineReg.patch | 39 +++ ...lify-Handler-interface-add-more-meta.patch | 254 ++++++++++++++ .../patches/120-server-add-doq-server.patch | 321 ++++++++++++++++++ .../121-query_context-add-QueryMeta.patch | 266 +++++++++++++++ .../122-add-new-string_exp-matcher.patch | 291 ++++++++++++++++ .../patches/123-add-plugin-rate_limiter.patch | 297 ++++++++++++++++ ...-fixed-eol-pipelineConn-was-t-closed.patch | 158 +++++++++ ...-send-STREAM-FIN-after-query-is-sent.patch | 32 ++ ...patch => 203-compatible-with-go1.20.patch} | 0 10 files changed, 1880 insertions(+) create mode 100644 mosdns/patches/117-pool-simplify-PackBuffer.patch create mode 100644 mosdns/patches/118-utils-update-BytesToStringUnsafe-remove-SplitLineReg.patch create mode 100644 mosdns/patches/119-server-simplify-Handler-interface-add-more-meta.patch create mode 100644 mosdns/patches/120-server-add-doq-server.patch create mode 100644 mosdns/patches/121-query_context-add-QueryMeta.patch create mode 100644 mosdns/patches/122-add-new-string_exp-matcher.patch create mode 100644 mosdns/patches/123-add-plugin-rate_limiter.patch create mode 100644 mosdns/patches/124-transport-fixed-eol-pipelineConn-was-t-closed.patch create mode 100644 mosdns/patches/125-upstream-doq-send-STREAM-FIN-after-query-is-sent.patch rename mosdns/patches/{120-compatible-with-go1.20.patch => 203-compatible-with-go1.20.patch} (100%) diff --git a/mosdns/patches/117-pool-simplify-PackBuffer.patch b/mosdns/patches/117-pool-simplify-PackBuffer.patch new file mode 100644 index 0000000..9922c6b --- /dev/null +++ b/mosdns/patches/117-pool-simplify-PackBuffer.patch @@ -0,0 +1,222 @@ +From 64a83b8e28b3988df9eec4425130b57a09b15032 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Thu, 21 Sep 2023 22:06:49 +0800 +Subject: [PATCH 1/9] pool: simplify PackBuffer + +--- + pkg/dnsutils/net_io.go | 7 +++--- + pkg/pool/msg_buf.go | 41 ++++++++++++++++++------------------ + pkg/pool/msg_buf_test.go | 6 +----- + pkg/server/http_handler.go | 6 +++--- + pkg/server/tcp.go | 6 +++--- + pkg/server/udp.go | 6 +++--- + pkg/upstream/doh/upstream.go | 5 +++-- + 7 files changed, 37 insertions(+), 40 deletions(-) + +diff --git a/pkg/dnsutils/net_io.go b/pkg/dnsutils/net_io.go +index f165446..26e6efb 100644 +--- a/pkg/dnsutils/net_io.go ++++ b/pkg/dnsutils/net_io.go +@@ -101,13 +101,12 @@ func WriteRawMsgToTCP(c io.Writer, b []byte) (n int, err error) { + } + + func WriteMsgToUDP(c io.Writer, m *dns.Msg) (int, error) { +- b, buf, err := pool.PackBuffer(m) ++ b, err := pool.PackBuffer(m) + if err != nil { + return 0, err + } +- defer pool.ReleaseBuf(buf) +- +- return c.Write(b) ++ defer pool.ReleaseBuf(b) ++ return c.Write(*b) + } + + func ReadMsgFromUDP(c io.Reader, bufSize int) (*dns.Msg, int, error) { +diff --git a/pkg/pool/msg_buf.go b/pkg/pool/msg_buf.go +index 11faf7d..b5f861c 100644 +--- a/pkg/pool/msg_buf.go ++++ b/pkg/pool/msg_buf.go +@@ -26,47 +26,48 @@ import ( + "github.com/miekg/dns" + ) + +-// There is no such way to give dns.Msg.PackBuffer() a buffer +-// with a proper size. +-// Just give it a big buf and hope the buf will be reused in most scenes. +-const packBufSize = 4096 ++// dns.Msg.PackBuffer requires a buffer with length of m.Len() + 1. ++// Don't know why it needs one more byte. ++func getPackBuffer(m *dns.Msg) int { ++ return m.Len() + 1 ++} + + // PackBuffer packs the dns msg m to wire format. + // Callers should release the buf by calling ReleaseBuf after they have done + // with the wire []byte. +-func PackBuffer(m *dns.Msg) (wire []byte, buf *[]byte, err error) { +- buf = GetBuf(packBufSize) +- wire, err = m.PackBuffer(*buf) ++func PackBuffer(m *dns.Msg) (*[]byte, error) { ++ b := GetBuf(getPackBuffer(m)) ++ wire, err := m.PackBuffer(*b) + if err != nil { +- ReleaseBuf(buf) +- return nil, nil, err ++ ReleaseBuf(b) ++ return nil, err + } +- return wire, buf, nil ++ if &((*b)[0]) != &wire[0] { // reallocated ++ ReleaseBuf(b) ++ return nil, dns.ErrBuf ++ } ++ return b, nil + } + + // PackBuffer packs the dns msg m to wire format, with to bytes length header. + // Callers should release the buf by calling ReleaseBuf. +-func PackTCPBuffer(m *dns.Msg) (buf *[]byte, err error) { +- b := GetBuf(packBufSize) ++func PackTCPBuffer(m *dns.Msg) (*[]byte, error) { ++ b := GetBuf(2 + getPackBuffer(m)) + wire, err := m.PackBuffer((*b)[2:]) + if err != nil { + ReleaseBuf(b) + return nil, err + } ++ if &((*b)[2]) != &wire[0] { // reallocated ++ ReleaseBuf(b) ++ return nil, dns.ErrBuf ++ } + + l := len(wire) + if l > dns.MaxMsgSize { + ReleaseBuf(b) + return nil, fmt.Errorf("dns payload size %d is too large", l) + } +- +- if &((*b)[2]) != &wire[0] { // reallocated +- ReleaseBuf(b) +- b = GetBuf(l + 2) +- binary.BigEndian.PutUint16((*b)[:2], uint16(l)) +- copy((*b)[2:], wire) +- return b, nil +- } + binary.BigEndian.PutUint16((*b)[:2], uint16(l)) + *b = (*b)[:2+l] + return b, nil +diff --git a/pkg/pool/msg_buf_test.go b/pkg/pool/msg_buf_test.go +index 97d9a76..bfd98d1 100644 +--- a/pkg/pool/msg_buf_test.go ++++ b/pkg/pool/msg_buf_test.go +@@ -28,12 +28,8 @@ import ( + func TestPackBuffer_No_Allocation(t *testing.T) { + m := new(dns.Msg) + m.SetQuestion("123.", dns.TypeAAAA) +- wire, buf, err := PackBuffer(m) ++ _, err := PackBuffer(m) + if err != nil { + t.Fatal(err) + } +- +- if cap(wire) != cap(*buf) { +- t.Fatalf("wire and buf have different cap, wire %d, buf %d", cap(wire), cap(*buf)) +- } + } +diff --git a/pkg/server/http_handler.go b/pkg/server/http_handler.go +index 58f5811..3e671e3 100644 +--- a/pkg/server/http_handler.go ++++ b/pkg/server/http_handler.go +@@ -103,17 +103,17 @@ func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + panic(err) // Force http server to close connection. + } + +- b, buf, err := pool.PackBuffer(r) ++ b, err := pool.PackBuffer(r) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + h.warnErr(req, "failed to unpack handler's response", err) + return + } +- defer pool.ReleaseBuf(buf) ++ defer pool.ReleaseBuf(b) + + w.Header().Set("Content-Type", "application/dns-message") + w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", dnsutils.GetMinimalTTL(r))) +- if _, err := w.Write(b); err != nil { ++ if _, err := w.Write(*b); err != nil { + h.warnErr(req, "failed to write response", err) + return + } +diff --git a/pkg/server/tcp.go b/pkg/server/tcp.go +index 5f479b1..ddc4846 100644 +--- a/pkg/server/tcp.go ++++ b/pkg/server/tcp.go +@@ -101,14 +101,14 @@ func ServeTCP(l net.Listener, h Handler, opts TCPServerOpts) error { + c.Close() // abort the connection + return + } +- b, buf, err := pool.PackBuffer(r) ++ b, err := pool.PackTCPBuffer(r) + if err != nil { + logger.Error("failed to unpack handler's response", zap.Error(err), zap.Stringer("msg", r)) + return + } +- defer pool.ReleaseBuf(buf) ++ defer pool.ReleaseBuf(b) + +- if _, err := dnsutils.WriteRawMsgToTCP(c, b); err != nil { ++ if _, err := c.Write(*b); err != nil { + logger.Warn("failed to write response", zap.Stringer("client", c.RemoteAddr()), zap.Error(err)) + return + } +diff --git a/pkg/server/udp.go b/pkg/server/udp.go +index 4dc1087..22e8d2b 100644 +--- a/pkg/server/udp.go ++++ b/pkg/server/udp.go +@@ -95,18 +95,18 @@ func ServeUDP(c *net.UDPConn, h Handler, opts UDPServerOpts) error { + } + if r != nil { + r.Truncate(getUDPSize(q)) +- b, buf, err := pool.PackBuffer(r) ++ b, err := pool.PackBuffer(r) + if err != nil { + logger.Error("failed to unpack handler's response", zap.Error(err), zap.Stringer("msg", r)) + return + } +- defer pool.ReleaseBuf(buf) ++ defer pool.ReleaseBuf(b) + + var oob []byte + if oobWriter != nil && dstIpFromCm != nil { + oob = oobWriter(dstIpFromCm) + } +- if _, _, err := c.WriteMsgUDPAddrPort(b, oob, remoteAddr); err != nil { ++ if _, _, err := c.WriteMsgUDPAddrPort(*b, oob, remoteAddr); err != nil { + logger.Warn("failed to write response", zap.Stringer("client", remoteAddr), zap.Error(err)) + } + } +diff --git a/pkg/upstream/doh/upstream.go b/pkg/upstream/doh/upstream.go +index abc124b..9cc72c4 100644 +--- a/pkg/upstream/doh/upstream.go ++++ b/pkg/upstream/doh/upstream.go +@@ -54,11 +54,12 @@ var ( + ) + + func (u *Upstream) ExchangeContext(ctx context.Context, q *dns.Msg) (*dns.Msg, error) { +- wire, buf, err := pool.PackBuffer(q) ++ bp, err := pool.PackBuffer(q) + if err != nil { + return nil, fmt.Errorf("failed to pack query msg, %w", err) + } +- defer pool.ReleaseBuf(buf) ++ defer pool.ReleaseBuf(bp) ++ wire := *bp + + // In order to maximize HTTP cache friendliness, DoH clients using media + // formats that include the ID field from the DNS message header, such +-- +2.34.8 + diff --git a/mosdns/patches/118-utils-update-BytesToStringUnsafe-remove-SplitLineReg.patch b/mosdns/patches/118-utils-update-BytesToStringUnsafe-remove-SplitLineReg.patch new file mode 100644 index 0000000..b4f8dc5 --- /dev/null +++ b/mosdns/patches/118-utils-update-BytesToStringUnsafe-remove-SplitLineReg.patch @@ -0,0 +1,39 @@ +From 4c1a7967a9367a8cce2b37fa6c81de1b50b9fa42 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Thu, 21 Sep 2023 22:30:15 +0800 +Subject: [PATCH 2/9] utils: update BytesToStringUnsafe, remove SplitLineReg + +--- + pkg/utils/strings.go | 10 +--------- + 1 file changed, 1 insertion(+), 9 deletions(-) + +diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go +index 632aadb..23471c2 100644 +--- a/pkg/utils/strings.go ++++ b/pkg/utils/strings.go +@@ -20,21 +20,13 @@ + package utils + + import ( +- "regexp" + "strings" + "unsafe" + ) + + // BytesToStringUnsafe converts bytes to string. + func BytesToStringUnsafe(b []byte) string { +- return *(*string)(unsafe.Pointer(&b)) +-} +- +-var charBlockExpr = regexp.MustCompile("\\S+") +- +-// SplitLineReg extracts words from s by using regexp "\S+". +-func SplitLineReg(s string) []string { +- return charBlockExpr.FindAllString(s, -1) ++ return unsafe.String(unsafe.SliceData(b), len(b)) + } + + // RemoveComment removes comment after "symbol". +-- +2.34.8 + diff --git a/mosdns/patches/119-server-simplify-Handler-interface-add-more-meta.patch b/mosdns/patches/119-server-simplify-Handler-interface-add-more-meta.patch new file mode 100644 index 0000000..cde19fc --- /dev/null +++ b/mosdns/patches/119-server-simplify-Handler-interface-add-more-meta.patch @@ -0,0 +1,254 @@ +From c0af4b587311766650c8c103656dcb595bcfef34 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Fri, 22 Sep 2023 09:24:05 +0800 +Subject: [PATCH 3/9] server: simplify Handler interface, add more meta + +--- + pkg/server/http_handler.go | 25 +++++++++-------- + pkg/server/iface.go | 18 ++++++++----- + pkg/server/tcp.go | 21 ++++++++------- + pkg/server/udp.go | 42 ++++++++--------------------- + pkg/server_handler/entry_handler.go | 29 +++++++++++++++++--- + 5 files changed, 71 insertions(+), 64 deletions(-) + +diff --git a/pkg/server/http_handler.go b/pkg/server/http_handler.go +index 3e671e3..5a41314 100644 +--- a/pkg/server/http_handler.go ++++ b/pkg/server/http_handler.go +@@ -28,7 +28,6 @@ import ( + "net/netip" + "strings" + +- "github.com/IrineSistiana/mosdns/v5/pkg/dnsutils" + "github.com/IrineSistiana/mosdns/v5/pkg/pool" + "github.com/miekg/dns" + "go.uber.org/zap" +@@ -97,23 +96,23 @@ func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + return + } + +- r, err := h.dnsHandler.Handle(req.Context(), q, QueryMeta{ClientAddr: clientAddr}) +- if err != nil { +- h.warnErr(req, "handler err", err) +- panic(err) // Force http server to close connection. ++ queryMeta := QueryMeta{ ++ ClientAddr: clientAddr, + } +- +- b, err := pool.PackBuffer(r) +- if err != nil { ++ if u := req.URL; u != nil { ++ queryMeta.UrlPath = u.Path ++ } ++ if tlsStat := req.TLS; tlsStat != nil { ++ queryMeta.ServerName = tlsStat.ServerName ++ } ++ resp := h.dnsHandler.Handle(req.Context(), q, queryMeta, pool.PackBuffer) ++ if resp == nil { + w.WriteHeader(http.StatusInternalServerError) +- h.warnErr(req, "failed to unpack handler's response", err) + return + } +- defer pool.ReleaseBuf(b) +- ++ defer pool.ReleaseBuf(resp) + w.Header().Set("Content-Type", "application/dns-message") +- w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", dnsutils.GetMinimalTTL(r))) +- if _, err := w.Write(*b); err != nil { ++ if _, err := w.Write(*resp); err != nil { + h.warnErr(req, "failed to write response", err) + return + } +diff --git a/pkg/server/iface.go b/pkg/server/iface.go +index 2f15be1..c45b502 100644 +--- a/pkg/server/iface.go ++++ b/pkg/server/iface.go +@@ -10,14 +10,20 @@ import ( + // Handler handles incoming request q and MUST ALWAYS return a response. + // Handler MUST handle dns errors by itself and return a proper error responses. + // e.g. Return a SERVFAIL if something goes wrong. +-// If Handle() returns an error, caller considers that the error is associated +-// with the downstream connection and will close the downstream connection +-// immediately. ++// If Handle() returns a nil resp, caller will ++// udp: do nothing. ++// tcp/dot: close the connection immediately. ++// doh: send a 500 response. ++// doq: close the stream immediately. + type Handler interface { +- Handle(ctx context.Context, q *dns.Msg, meta QueryMeta) (resp *dns.Msg, err error) ++ Handle(ctx context.Context, q *dns.Msg, meta QueryMeta, packMsgPayload func(m *dns.Msg) (*[]byte, error)) (respPayload *[]byte) + } + + type QueryMeta struct { +- ClientAddr netip.Addr // Maybe invalid +- FromUDP bool ++ FromUDP bool ++ ++ // Optional ++ ClientAddr netip.Addr ++ ServerName string ++ UrlPath string + } +diff --git a/pkg/server/tcp.go b/pkg/server/tcp.go +index ddc4846..6faba76 100644 +--- a/pkg/server/tcp.go ++++ b/pkg/server/tcp.go +@@ -21,6 +21,7 @@ package server + + import ( + "context" ++ "crypto/tls" + "fmt" + "net" + "net/netip" +@@ -93,22 +94,22 @@ func ServeTCP(l net.Listener, h Handler, opts TCPServerOpts) error { + return // read err, close the connection + } + ++ // Try to get server name from tls conn. ++ var serverName string ++ if tlsConn, ok := c.(*tls.Conn); ok { ++ serverName = tlsConn.ConnectionState().ServerName ++ } ++ + // handle query + go func() { +- r, err := h.Handle(tcpConnCtx, req, QueryMeta{ClientAddr: clientAddr}) +- if err != nil { +- logger.Warn("handler err", zap.Error(err)) ++ r := h.Handle(tcpConnCtx, req, QueryMeta{ClientAddr: clientAddr, ServerName: serverName}, pool.PackTCPBuffer) ++ if r == nil { + c.Close() // abort the connection + return + } +- b, err := pool.PackTCPBuffer(r) +- if err != nil { +- logger.Error("failed to unpack handler's response", zap.Error(err), zap.Stringer("msg", r)) +- return +- } +- defer pool.ReleaseBuf(b) ++ defer pool.ReleaseBuf(r) + +- if _, err := c.Write(*b); err != nil { ++ if _, err := c.Write(*r); err != nil { + logger.Warn("failed to write response", zap.Stringer("client", c.RemoteAddr()), zap.Error(err)) + return + } +diff --git a/pkg/server/udp.go b/pkg/server/udp.go +index 22e8d2b..89e57e2 100644 +--- a/pkg/server/udp.go ++++ b/pkg/server/udp.go +@@ -63,10 +63,10 @@ func ServeUDP(c *net.UDPConn, h Handler, opts UDPServerOpts) error { + n, oobn, _, remoteAddr, err := c.ReadMsgUDPAddrPort(*rb, ob) + if err != nil { + if n == 0 { +- // err with zero read. Most likely becasue c was closed. ++ // Err with zero read. Most likely because c was closed. + return fmt.Errorf("unexpected read err: %w", err) + } +- // err with some read. Tempory err. ++ // Temporary err. + logger.Warn("read err", zap.Error(err)) + continue + } +@@ -88,42 +88,22 @@ func ServeUDP(c *net.UDPConn, h Handler, opts UDPServerOpts) error { + + // handle query + go func() { +- r, err := h.Handle(listenerCtx, q, QueryMeta{ClientAddr: remoteAddr.Addr(), FromUDP: true}) +- if err != nil { +- logger.Warn("handler err", zap.Error(err)) ++ payload := h.Handle(listenerCtx, q, QueryMeta{ClientAddr: remoteAddr.Addr(), FromUDP: true}, pool.PackBuffer) ++ if payload == nil { + return + } +- if r != nil { +- r.Truncate(getUDPSize(q)) +- b, err := pool.PackBuffer(r) +- if err != nil { +- logger.Error("failed to unpack handler's response", zap.Error(err), zap.Stringer("msg", r)) +- return +- } +- defer pool.ReleaseBuf(b) ++ defer pool.ReleaseBuf(payload) + +- var oob []byte +- if oobWriter != nil && dstIpFromCm != nil { +- oob = oobWriter(dstIpFromCm) +- } +- if _, _, err := c.WriteMsgUDPAddrPort(*b, oob, remoteAddr); err != nil { +- logger.Warn("failed to write response", zap.Stringer("client", remoteAddr), zap.Error(err)) +- } ++ var oob []byte ++ if oobWriter != nil && dstIpFromCm != nil { ++ oob = oobWriter(dstIpFromCm) ++ } ++ if _, _, err := c.WriteMsgUDPAddrPort(*payload, oob, remoteAddr); err != nil { ++ logger.Warn("failed to write response", zap.Stringer("client", remoteAddr), zap.Error(err)) + } + }() + } + } + +-func getUDPSize(m *dns.Msg) int { +- var s uint16 +- if opt := m.IsEdns0(); opt != nil { +- s = opt.UDPSize() +- } +- if s < dns.MinMsgSize { +- s = dns.MinMsgSize +- } +- return int(s) +-} +- + type getSrcAddrFromOOB func(oob []byte) (net.IP, error) + type writeSrcAddrToOOB func(a net.IP) []byte +diff --git a/pkg/server_handler/entry_handler.go b/pkg/server_handler/entry_handler.go +index 520e3d2..9e3a386 100644 +--- a/pkg/server_handler/entry_handler.go ++++ b/pkg/server_handler/entry_handler.go +@@ -71,9 +71,9 @@ func NewEntryHandler(opts EntryHandlerOpts) *EntryHandler { + } + + // ServeDNS implements server.Handler. +-// If entry returns an error, a SERVFAIL response will be set. +-// If entry returns without a response, a REFUSED response will be set. +-func (h *EntryHandler) Handle(ctx context.Context, q *dns.Msg, qInfo server.QueryMeta) (*dns.Msg, error) { ++// If entry returns an error, a SERVFAIL response will be returned. ++// If entry returns without a response, a REFUSED response will be returned. ++func (h *EntryHandler) Handle(ctx context.Context, q *dns.Msg, qInfo server.QueryMeta, packMsgPayload func(m *dns.Msg) (*[]byte, error)) *[]byte { + ddl := time.Now().Add(h.opts.QueryTimeout) + ctx, cancel := context.WithDeadline(ctx, ddl) + defer cancel() +@@ -100,5 +100,26 @@ func (h *EntryHandler) Handle(ctx context.Context, q *dns.Msg, qInfo server.Quer + respMsg.Rcode = dns.RcodeServerFailure + } + respMsg.RecursionAvailable = true +- return respMsg, nil ++ ++ if qInfo.FromUDP { ++ respMsg.Truncate(getUDPSize(q)) ++ } ++ ++ payload, err := packMsgPayload(respMsg) ++ if err != nil { ++ h.opts.Logger.Error("internal err: failed to pack resp msg", zap.Error(err)) ++ return nil ++ } ++ return payload ++} ++ ++func getUDPSize(m *dns.Msg) int { ++ var s uint16 ++ if opt := m.IsEdns0(); opt != nil { ++ s = opt.UDPSize() ++ } ++ if s < dns.MinMsgSize { ++ s = dns.MinMsgSize ++ } ++ return int(s) + } +-- +2.34.8 + diff --git a/mosdns/patches/120-server-add-doq-server.patch b/mosdns/patches/120-server-add-doq-server.patch new file mode 100644 index 0000000..70afd98 --- /dev/null +++ b/mosdns/patches/120-server-add-doq-server.patch @@ -0,0 +1,321 @@ +From df0762ce550c33e1cfd423fef95020c41ca770da Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Fri, 22 Sep 2023 10:39:07 +0800 +Subject: [PATCH 4/9] server: add doq server + +--- + pkg/server/doq.go | 121 +++++++++++++++++++++++ + plugin/enabled_plugins.go | 20 ++-- + plugin/server/quic_server/quic_server.go | 120 ++++++++++++++++++++++ + 3 files changed, 248 insertions(+), 13 deletions(-) + create mode 100644 pkg/server/doq.go + create mode 100644 plugin/server/quic_server/quic_server.go + +diff --git a/pkg/server/doq.go b/pkg/server/doq.go +new file mode 100644 +index 0000000..8fb5f81 +--- /dev/null ++++ b/pkg/server/doq.go +@@ -0,0 +1,121 @@ ++/* ++ * Copyright (C) 2020-2022, IrineSistiana ++ * ++ * This file is part of mosdns. ++ * ++ * mosdns is free software: you can redistribute it and/or modify ++ * it under the terms of the GNU General Public License as published by ++ * the Free Software Foundation, either version 3 of the License, or ++ * (at your option) any later version. ++ * ++ * mosdns is distributed in the hope that it will be useful, ++ * but WITHOUT ANY WARRANTY; without even the implied warranty of ++ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ++ * GNU General Public License for more details. ++ * ++ * You should have received a copy of the GNU General Public License ++ * along with this program. If not, see . ++ */ ++ ++package server ++ ++import ( ++ "context" ++ "fmt" ++ "net" ++ "net/netip" ++ "time" ++ ++ "github.com/IrineSistiana/mosdns/v5/pkg/dnsutils" ++ "github.com/IrineSistiana/mosdns/v5/pkg/pool" ++ "github.com/quic-go/quic-go" ++ "go.uber.org/zap" ++) ++ ++const ( ++ defaultQuicIdleTimeout = time.Second * 30 ++ streamReadTimeout = time.Second * 1 ++ quicFirstReadTimeout = time.Millisecond * 500 ++) ++ ++type DoQServerOpts struct { ++ Logger *zap.Logger ++ IdleTimeout time.Duration ++} ++ ++// ServeDoQ starts a server at l. It returns if l had an Accept() error. ++// It always returns a non-nil error. ++func ServeDoQ(l *quic.Listener, h Handler, opts DoQServerOpts) error { ++ logger := opts.Logger ++ if logger == nil { ++ logger = nopLogger ++ } ++ idleTimeout := opts.IdleTimeout ++ if idleTimeout <= 0 { ++ idleTimeout = defaultQuicIdleTimeout ++ } ++ ++ listenerCtx, cancel := context.WithCancel(context.Background()) ++ defer cancel() ++ for { ++ c, err := l.Accept(listenerCtx) ++ if err != nil { ++ return fmt.Errorf("unexpected listener err: %w", err) ++ } ++ ++ // handle connection ++ connCtx, cancelConn := context.WithCancel(listenerCtx) ++ go func() { ++ defer c.CloseWithError(0, "") ++ defer cancelConn() ++ ++ var clientAddr netip.Addr ++ ta, ok := c.RemoteAddr().(*net.UDPAddr) ++ if ok { ++ clientAddr = ta.AddrPort().Addr() ++ } ++ ++ firstRead := true ++ for { ++ var streamAcceptTimeout time.Duration ++ if firstRead { ++ firstRead = false ++ streamAcceptTimeout = quicFirstReadTimeout ++ } else { ++ streamAcceptTimeout = idleTimeout ++ } ++ streamAcceptCtx, cancelStreamAccept := context.WithTimeout(connCtx, streamAcceptTimeout) ++ stream, err := c.AcceptStream(streamAcceptCtx) ++ cancelStreamAccept() ++ if err != nil { ++ return ++ } ++ ++ // Handle stream. ++ // For doq, one stream, one query. ++ go func() { ++ defer stream.Close() ++ ++ // Avoid fragmentation attack. ++ stream.SetReadDeadline(time.Now().Add(streamReadTimeout)) ++ req, _, err := dnsutils.ReadMsgFromTCP(stream) ++ if err != nil { ++ return ++ } ++ queryMeta := QueryMeta{ ++ ClientAddr: clientAddr, ++ ServerName: c.ConnectionState().TLS.ServerName, ++ } ++ ++ resp := h.Handle(connCtx, req, queryMeta, pool.PackTCPBuffer) ++ if resp == nil { ++ return ++ } ++ if _, err := stream.Write(*resp); err != nil { ++ logger.Warn("failed to write response", zap.Stringer("client", c.RemoteAddr()), zap.Error(err)) ++ } ++ }() ++ } ++ }() ++ } ++} +diff --git a/plugin/enabled_plugins.go b/plugin/enabled_plugins.go +index 199587c..0f7531b 100644 +--- a/plugin/enabled_plugins.go ++++ b/plugin/enabled_plugins.go +@@ -21,12 +21,11 @@ package plugin + + // data providers + import ( ++ // data provider + _ "github.com/IrineSistiana/mosdns/v5/plugin/data_provider/domain_set" + _ "github.com/IrineSistiana/mosdns/v5/plugin/data_provider/ip_set" +-) + +-// matches +-import ( ++ // matcher + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/client_ip" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/cname" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/env" +@@ -39,10 +38,8 @@ import ( + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/random" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/rcode" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/resp_ip" +-) + +-// executables +-import ( ++ // executable + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/arbitrary" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/black_hole" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/cache" +@@ -62,16 +59,13 @@ import ( + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence/fallback" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sleep" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/ttl" +-) + +-// other +-import ( +- _ "github.com/IrineSistiana/mosdns/v5/plugin/mark" // executable and matcher +-) ++ // executable and matcher ++ _ "github.com/IrineSistiana/mosdns/v5/plugin/mark" + +-// servers +-import ( ++ // server + _ "github.com/IrineSistiana/mosdns/v5/plugin/server/http_server" ++ _ "github.com/IrineSistiana/mosdns/v5/plugin/server/quic_server" + _ "github.com/IrineSistiana/mosdns/v5/plugin/server/tcp_server" + _ "github.com/IrineSistiana/mosdns/v5/plugin/server/udp_server" + ) +diff --git a/plugin/server/quic_server/quic_server.go b/plugin/server/quic_server/quic_server.go +new file mode 100644 +index 0000000..8a5a4c1 +--- /dev/null ++++ b/plugin/server/quic_server/quic_server.go +@@ -0,0 +1,120 @@ ++/* ++ * Copyright (C) 2020-2022, IrineSistiana ++ * ++ * This file is part of mosdns. ++ * ++ * mosdns is free software: you can redistribute it and/or modify ++ * it under the terms of the GNU General Public License as published by ++ * the Free Software Foundation, either version 3 of the License, or ++ * (at your option) any later version. ++ * ++ * mosdns is distributed in the hope that it will be useful, ++ * but WITHOUT ANY WARRANTY; without even the implied warranty of ++ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ++ * GNU General Public License for more details. ++ * ++ * You should have received a copy of the GNU General Public License ++ * along with this program. If not, see . ++ */ ++ ++package quic_server ++ ++import ( ++ "crypto/tls" ++ "errors" ++ "fmt" ++ "net" ++ "time" ++ ++ "github.com/IrineSistiana/mosdns/v5/coremain" ++ "github.com/IrineSistiana/mosdns/v5/pkg/server" ++ "github.com/IrineSistiana/mosdns/v5/pkg/utils" ++ "github.com/IrineSistiana/mosdns/v5/plugin/server/server_utils" ++ "github.com/quic-go/quic-go" ++) ++ ++const PluginType = "quic_server" ++ ++func init() { ++ coremain.RegNewPluginFunc(PluginType, Init, func() any { return new(Args) }) ++} ++ ++type Args struct { ++ Entry string `yaml:"entry"` ++ Listen string `yaml:"listen"` ++ Cert string `yaml:"cert"` ++ Key string `yaml:"key"` ++ IdleTimeout int `yaml:"idle_timeout"` ++} ++ ++func (a *Args) init() { ++ utils.SetDefaultNum(&a.IdleTimeout, 30) ++} ++ ++type QuicServer struct { ++ args *Args ++ ++ l *quic.Listener ++} ++ ++func (s *QuicServer) Close() error { ++ return s.l.Close() ++} ++ ++func Init(bp *coremain.BP, args any) (any, error) { ++ return StartServer(bp, args.(*Args)) ++} ++ ++func StartServer(bp *coremain.BP, args *Args) (*QuicServer, error) { ++ dh, err := server_utils.NewHandler(bp, args.Entry) ++ if err != nil { ++ return nil, fmt.Errorf("failed to init dns handler, %w", err) ++ } ++ ++ // Init tls ++ if len(args.Key) == 0 || len(args.Cert) == 0 { ++ return nil, errors.New("quic server requires a tls certificate") ++ } ++ tlsConfig := new(tls.Config) ++ if err := server.LoadCert(tlsConfig, args.Cert, args.Key); err != nil { ++ return nil, fmt.Errorf("failed to read tls cert, %w", err) ++ } ++ tlsConfig.NextProtos = []string{"doq"} ++ ++ uc, err := net.ListenPacket("udp", args.Listen) ++ if err != nil { ++ return nil, fmt.Errorf("failed to listen socket, %w", err) ++ } ++ ++ idleTimeout := time.Duration(args.IdleTimeout) * time.Second ++ ++ quicConfig := &quic.Config{ ++ MaxIdleTimeout: idleTimeout, ++ InitialStreamReceiveWindow: 4 * 1024, ++ MaxStreamReceiveWindow: 4 * 1024, ++ InitialConnectionReceiveWindow: 8 * 1024, ++ MaxConnectionReceiveWindow: 16 * 1024, ++ Allow0RTT: false, ++ } ++ ++ qt := &quic.Transport{ ++ Conn: uc, ++ } ++ ++ quicListener, err := qt.Listen(tlsConfig, quicConfig) ++ if err != nil { ++ qt.Close() ++ return nil, fmt.Errorf("failed to listen quic, %w", err) ++ } ++ ++ go func() { ++ defer quicListener.Close() ++ serverOpts := server.DoQServerOpts{Logger: bp.L(), IdleTimeout: idleTimeout} ++ err := server.ServeDoQ(quicListener, dh, serverOpts) ++ bp.M().GetSafeClose().SendCloseSignal(err) ++ }() ++ return &QuicServer{ ++ args: args, ++ l: quicListener, ++ }, nil ++} +-- +2.34.8 + diff --git a/mosdns/patches/121-query_context-add-QueryMeta.patch b/mosdns/patches/121-query_context-add-QueryMeta.patch new file mode 100644 index 0000000..c288818 --- /dev/null +++ b/mosdns/patches/121-query_context-add-QueryMeta.patch @@ -0,0 +1,266 @@ +From 65bf1f77a56fe481cacf3a1cada155b66949578f Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Fri, 22 Sep 2023 16:10:24 +0800 +Subject: [PATCH 5/9] query_context: add QueryMeta + +--- + pkg/query_context/client_addr.go | 38 ------------------- + pkg/query_context/context.go | 23 ++++++++--- + pkg/server_handler/entry_handler.go | 5 +-- + .../dual_selector/dual_selector_test.go | 9 +++-- + plugin/executable/ipset/ipset_test.go | 11 +++--- + plugin/executable/sequence/sequence_test.go | 5 ++- + plugin/matcher/client_ip/client_ip_matcher.go | 4 +- + 7 files changed, 34 insertions(+), 61 deletions(-) + delete mode 100644 pkg/query_context/client_addr.go + +diff --git a/pkg/query_context/client_addr.go b/pkg/query_context/client_addr.go +deleted file mode 100644 +index 7793fe6..0000000 +--- a/pkg/query_context/client_addr.go ++++ /dev/null +@@ -1,38 +0,0 @@ +-/* +- * Copyright (C) 2020-2022, IrineSistiana +- * +- * This file is part of mosdns. +- * +- * mosdns is free software: you can redistribute it and/or modify +- * it under the terms of the GNU General Public License as published by +- * the Free Software Foundation, either version 3 of the License, or +- * (at your option) any later version. +- * +- * mosdns is distributed in the hope that it will be useful, +- * but WITHOUT ANY WARRANTY; without even the implied warranty of +- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +- * GNU General Public License for more details. +- * +- * You should have received a copy of the GNU General Public License +- * along with this program. If not, see . +- */ +- +-package query_context +- +-import ( +- "net/netip" +-) +- +-var clientAddrKey = RegKey() +- +-func SetClientAddr(qCtx *Context, addr *netip.Addr) { +- qCtx.StoreValue(clientAddrKey, addr) +-} +- +-func GetClientAddr(qCtx *Context) (*netip.Addr, bool) { +- v, ok := qCtx.GetValue(clientAddrKey) +- if !ok { +- return nil, false +- } +- return v.(*netip.Addr), true +-} +diff --git a/pkg/query_context/context.go b/pkg/query_context/context.go +index d3e67ae..9fa3fd7 100644 +--- a/pkg/query_context/context.go ++++ b/pkg/query_context/context.go +@@ -20,11 +20,13 @@ + package query_context + + import ( ++ "sync/atomic" ++ "time" ++ ++ "github.com/IrineSistiana/mosdns/v5/pkg/server" + "github.com/miekg/dns" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +- "sync/atomic" +- "time" + ) + + // Context is a query context that pass through plugins +@@ -34,6 +36,7 @@ import ( + type Context struct { + startTime time.Time // when was this Context created + q *dns.Msg ++ queryMeta QueryMeta + + // id for this Context. Not for the dns query. This id is mainly for logging. + id uint32 +@@ -48,14 +51,17 @@ type Context struct { + + var contextUid atomic.Uint32 + ++type QueryMeta = server.QueryMeta ++ + // NewContext creates a new query Context. + // q is the query dns msg. It cannot be nil, or NewContext will panic. +-func NewContext(q *dns.Msg) *Context { ++func NewContext(q *dns.Msg, qm QueryMeta) *Context { + if q == nil { + panic("handler: query msg is nil") + } + ctx := &Context{ + q: q, ++ queryMeta: qm, + id: contextUid.Add(1), + startTime: time.Now(), + } +@@ -68,6 +74,11 @@ func (ctx *Context) Q() *dns.Msg { + return ctx.q + } + ++// QueryMeta returns the meta data of the query. ++func (ctx *Context) QueryMeta() QueryMeta { ++ return ctx.queryMeta ++} ++ + // R returns the response. It might be nil. + func (ctx *Context) R() *dns.Msg { + return ctx.r +@@ -164,8 +175,8 @@ func (ctx *Context) DeleteMark(m uint32) { + func (ctx *Context) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddUint32("uqid", ctx.id) + +- if addr, ok := GetClientAddr(ctx); ok && addr.IsValid() { +- zap.Stringer("client", addr).AddTo(encoder) ++ if clientAddr := ctx.queryMeta.ClientAddr; clientAddr.IsValid() { ++ zap.Stringer("client", clientAddr).AddTo(encoder) + } + + q := ctx.Q() +@@ -180,7 +191,7 @@ func (ctx *Context) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + if r := ctx.R(); r != nil { + encoder.AddInt("rcode", r.Rcode) + } +- encoder.AddDuration("elapsed", time.Now().Sub(ctx.StartTime())) ++ encoder.AddDuration("elapsed", time.Since(ctx.StartTime())) + return nil + } + +diff --git a/pkg/server_handler/entry_handler.go b/pkg/server_handler/entry_handler.go +index 9e3a386..c12d852 100644 +--- a/pkg/server_handler/entry_handler.go ++++ b/pkg/server_handler/entry_handler.go +@@ -79,10 +79,7 @@ func (h *EntryHandler) Handle(ctx context.Context, q *dns.Msg, qInfo server.Quer + defer cancel() + + // exec entry +- qCtx := query_context.NewContext(q) +- if qInfo.ClientAddr.IsValid() { +- query_context.SetClientAddr(qCtx, &qInfo.ClientAddr) +- } ++ qCtx := query_context.NewContext(q, qInfo) + err := h.opts.Entry.Exec(ctx, qCtx) + respMsg := qCtx.R() + if err != nil { +diff --git a/plugin/executable/dual_selector/dual_selector_test.go b/plugin/executable/dual_selector/dual_selector_test.go +index 6a5ae92..524e739 100644 +--- a/plugin/executable/dual_selector/dual_selector_test.go ++++ b/plugin/executable/dual_selector/dual_selector_test.go +@@ -21,14 +21,15 @@ package dual_selector + + import ( + "context" ++ "net" ++ "testing" ++ "time" ++ + "github.com/IrineSistiana/mosdns/v5/coremain" + "github.com/IrineSistiana/mosdns/v5/pkg/query_context" + "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" + "github.com/miekg/dns" + "go.uber.org/zap" +- "net" +- "testing" +- "time" + ) + + type dummyNext struct { +@@ -158,7 +159,7 @@ func TestSelector_Exec(t *testing.T) { + + q := new(dns.Msg) + q.SetQuestion("example.", tt.qtype) +- qCtx := query_context.NewContext(q) ++ qCtx := query_context.NewContext(q, query_context.QueryMeta{}) + cw := sequence.NewChainWalker([]*sequence.ChainNode{{E: tt.next}}, nil) + if err := s.Exec(context.Background(), qCtx, cw); (err != nil) != tt.wantErr { + t.Errorf("Exec() error = %v, wantErr %v", err, tt.wantErr) +diff --git a/plugin/executable/ipset/ipset_test.go b/plugin/executable/ipset/ipset_test.go +index cb92eb2..c5ad508 100644 +--- a/plugin/executable/ipset/ipset_test.go ++++ b/plugin/executable/ipset/ipset_test.go +@@ -24,15 +24,16 @@ package ipset + import ( + "context" + "fmt" +- "github.com/IrineSistiana/mosdns/v5/pkg/query_context" +- "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" +- "github.com/miekg/dns" +- "github.com/vishvananda/netlink" + "math/rand" + "net" + "os" + "strconv" + "testing" ++ ++ "github.com/IrineSistiana/mosdns/v5/pkg/query_context" ++ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" ++ "github.com/miekg/dns" ++ "github.com/vishvananda/netlink" + ) + + func skipTest(t *testing.T) { +@@ -85,7 +86,7 @@ func Test_ipset(t *testing.T) { + r.Answer = append(r.Answer, &dns.A{A: net.ParseIP("127.0.0.2")}) + r.Answer = append(r.Answer, &dns.AAAA{AAAA: net.ParseIP("::1")}) + r.Answer = append(r.Answer, &dns.AAAA{AAAA: net.ParseIP("::2")}) +- qCtx := query_context.NewContext(q) ++ qCtx := query_context.NewContext(q, query_context.QueryMeta{}) + qCtx.SetResponse(r) + if err := p.Exec(context.Background(), qCtx); err != nil { + t.Fatal(err) +diff --git a/plugin/executable/sequence/sequence_test.go b/plugin/executable/sequence/sequence_test.go +index ea7704d..16b1360 100644 +--- a/plugin/executable/sequence/sequence_test.go ++++ b/plugin/executable/sequence/sequence_test.go +@@ -22,10 +22,11 @@ package sequence + import ( + "context" + "errors" ++ "testing" ++ + "github.com/IrineSistiana/mosdns/v5/coremain" + "github.com/IrineSistiana/mosdns/v5/pkg/query_context" + "github.com/miekg/dns" +- "testing" + ) + + type dummy struct { +@@ -186,7 +187,7 @@ func Test_sequence_Exec(t *testing.T) { + if err != nil { + t.Fatal(err) + } +- qCtx := query_context.NewContext(new(dns.Msg)) ++ qCtx := query_context.NewContext(new(dns.Msg), query_context.QueryMeta{}) + if err := s.Exec(context.Background(), qCtx); (err != nil) != tt.wantErr { + t.Errorf("Exec() error = %v, wantErr %v", err, tt.wantErr) + } +diff --git a/plugin/matcher/client_ip/client_ip_matcher.go b/plugin/matcher/client_ip/client_ip_matcher.go +index 357df9b..b308b5d 100644 +--- a/plugin/matcher/client_ip/client_ip_matcher.go ++++ b/plugin/matcher/client_ip/client_ip_matcher.go +@@ -39,9 +39,9 @@ func QuickSetup(bq sequence.BQ, s string) (sequence.Matcher, error) { + } + + func matchClientAddr(qCtx *query_context.Context, m netlist.Matcher) (bool, error) { +- addr, _ := query_context.GetClientAddr(qCtx) ++ addr := qCtx.QueryMeta().ClientAddr + if !addr.IsValid() { + return false, nil + } +- return m.Match(*addr), nil ++ return m.Match(addr), nil + } +-- +2.34.8 + diff --git a/mosdns/patches/122-add-new-string_exp-matcher.patch b/mosdns/patches/122-add-new-string_exp-matcher.patch new file mode 100644 index 0000000..9c699df --- /dev/null +++ b/mosdns/patches/122-add-new-string_exp-matcher.patch @@ -0,0 +1,291 @@ +From 71145e797f3748b2b608d7f2e0319339fbd41f5b Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Fri, 22 Sep 2023 17:35:01 +0800 +Subject: [PATCH 6/9] add new string_exp matcher + +--- + plugin/enabled_plugins.go | 1 + + plugin/matcher/string_exp/string_exp.go | 184 +++++++++++++++++++ + plugin/matcher/string_exp/string_exp_test.go | 67 +++++++ + 3 files changed, 252 insertions(+) + create mode 100644 plugin/matcher/string_exp/string_exp.go + create mode 100644 plugin/matcher/string_exp/string_exp_test.go + +diff --git a/plugin/enabled_plugins.go b/plugin/enabled_plugins.go +index 0f7531b..dfb311b 100644 +--- a/plugin/enabled_plugins.go ++++ b/plugin/enabled_plugins.go +@@ -38,6 +38,7 @@ import ( + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/random" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/rcode" + _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/resp_ip" ++ _ "github.com/IrineSistiana/mosdns/v5/plugin/matcher/string_exp" + + // executable + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/arbitrary" +diff --git a/plugin/matcher/string_exp/string_exp.go b/plugin/matcher/string_exp/string_exp.go +new file mode 100644 +index 0000000..692f4e3 +--- /dev/null ++++ b/plugin/matcher/string_exp/string_exp.go +@@ -0,0 +1,184 @@ ++/* ++ * Copyright (C) 2020-2022, IrineSistiana ++ * ++ * This file is part of mosdns. ++ * ++ * mosdns is free software: you can redistribute it and/or modify ++ * it under the terms of the GNU General Public License as published by ++ * the Free Software Foundation, either version 3 of the License, or ++ * (at your option) any later version. ++ * ++ * mosdns is distributed in the hope that it will be useful, ++ * but WITHOUT ANY WARRANTY; without even the implied warranty of ++ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ++ * GNU General Public License for more details. ++ * ++ * You should have received a copy of the GNU General Public License ++ * along with this program. If not, see . ++ */ ++ ++package string_exp ++ ++import ( ++ "context" ++ "errors" ++ "fmt" ++ "os" ++ "regexp" ++ "strings" ++ ++ "github.com/IrineSistiana/mosdns/v5/pkg/query_context" ++ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" ++) ++ ++const PluginType = "string_exp" ++ ++func init() { ++ sequence.MustRegMatchQuickSetup(PluginType, QuickSetup) ++} ++ ++var _ sequence.Matcher = (*Matcher)(nil) ++ ++type Matcher struct { ++ getStr GetStrFunc ++ m StringMatcher ++} ++ ++type StringMatcher interface { ++ MatchStr(s string) bool ++} ++ ++type GetStrFunc func(qCtx *query_context.Context) string ++ ++func (m *Matcher) Match(_ context.Context, qCtx *query_context.Context) (bool, error) { ++ return m.match(qCtx), nil ++} ++ ++func (m *Matcher) match(qCtx *query_context.Context) bool { ++ return m.m.MatchStr(m.getStr(qCtx)) ++} ++ ++func NewMatcher(f GetStrFunc, sm StringMatcher) *Matcher { ++ m := &Matcher{ ++ getStr: f, ++ m: sm, ++ } ++ return m ++} ++ ++// Format: "scr_string_name op [string]..." ++// scr_string_name = {url_path|server_name|$env_key} ++// op = {zl|eq|prefix|suffix|contains|regexp} ++func QuickSetupFromStr(s string) (sequence.Matcher, error) { ++ sf := strings.Fields(s) ++ if len(sf) < 2 { ++ return nil, errors.New("not enough args") ++ } ++ srcStrName := sf[0] ++ op := sf[1] ++ args := sf[2:] ++ ++ var sm StringMatcher ++ switch op { ++ case "zl": ++ sm = opZl{} ++ case "eq": ++ m := make(map[string]struct{}) ++ for _, s := range args { ++ m[s] = struct{}{} ++ } ++ sm = &opEq{m: m} ++ case "regexp": ++ var exps []*regexp.Regexp ++ for _, s := range args { ++ exp, err := regexp.Compile(s) ++ if err != nil { ++ return nil, fmt.Errorf("invalid reg expression, %w", err) ++ } ++ exps = append(exps, exp) ++ } ++ sm = &opRegExp{exp: exps} ++ case "prefix": ++ sm = &opF{s: args, f: strings.HasPrefix} ++ case "suffix": ++ sm = &opF{s: args, f: strings.HasSuffix} ++ case "contains": ++ sm = &opF{s: args, f: strings.Contains} ++ default: ++ return nil, fmt.Errorf("invalid operator %s", op) ++ } ++ ++ var gf GetStrFunc ++ if strings.HasPrefix(srcStrName, "$") { ++ // Env ++ envKey := strings.TrimPrefix(srcStrName, "$") ++ gf = func(_ *query_context.Context) string { ++ return os.Getenv(envKey) ++ } ++ } else { ++ switch srcStrName { ++ case "url_path": ++ gf = getUrlPath ++ case "server_name": ++ gf = getServerName ++ default: ++ return nil, fmt.Errorf("invalid src string name %s", srcStrName) ++ } ++ } ++ return NewMatcher(gf, sm), nil ++} ++ ++// QuickSetup returns a sequence.ExecQuickSetupFunc. ++func QuickSetup(_ sequence.BQ, s string) (sequence.Matcher, error) { ++ return QuickSetupFromStr(s) ++} ++ ++type opZl struct{} ++ ++func (op opZl) MatchStr(s string) bool { ++ return len(s) == 0 ++} ++ ++type opEq struct { ++ m map[string]struct{} ++} ++ ++func (op *opEq) MatchStr(s string) bool { ++ _, ok := op.m[s] ++ return ok ++} ++ ++type opF struct { ++ s []string ++ f func(s, arg string) bool ++} ++ ++func (op *opF) MatchStr(s string) bool { ++ for _, sub := range op.s { ++ if op.f(s, sub) { ++ return true ++ } ++ } ++ return false ++} ++ ++type opRegExp struct { ++ exp []*regexp.Regexp ++} ++ ++func (op *opRegExp) MatchStr(s string) bool { ++ for _, exp := range op.exp { ++ if exp.MatchString(s) { ++ return true ++ } ++ } ++ return false ++} ++ ++func getUrlPath(qCtx *query_context.Context) string { ++ return qCtx.QueryMeta().UrlPath ++} ++ ++func getServerName(qCtx *query_context.Context) string { ++ return qCtx.QueryMeta().ServerName ++} +diff --git a/plugin/matcher/string_exp/string_exp_test.go b/plugin/matcher/string_exp/string_exp_test.go +new file mode 100644 +index 0000000..9140191 +--- /dev/null ++++ b/plugin/matcher/string_exp/string_exp_test.go +@@ -0,0 +1,67 @@ ++/* ++ * Copyright (C) 2020-2022, IrineSistiana ++ * ++ * This file is part of mosdns. ++ * ++ * mosdns is free software: you can redistribute it and/or modify ++ * it under the terms of the GNU General Public License as published by ++ * the Free Software Foundation, either version 3 of the License, or ++ * (at your option) any later version. ++ * ++ * mosdns is distributed in the hope that it will be useful, ++ * but WITHOUT ANY WARRANTY; without even the implied warranty of ++ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ++ * GNU General Public License for more details. ++ * ++ * You should have received a copy of the GNU General Public License ++ * along with this program. If not, see . ++ */ ++ ++package string_exp ++ ++import ( ++ "context" ++ "os" ++ "testing" ++ ++ "github.com/IrineSistiana/mosdns/v5/pkg/query_context" ++ "github.com/miekg/dns" ++ "github.com/stretchr/testify/require" ++) ++ ++func TestMatcher_Match(t *testing.T) { ++ r := require.New(t) ++ q := new(dns.Msg) ++ qc := query_context.NewContext(q, query_context.QueryMeta{UrlPath: "/dns-query", ServerName: "a.b.c"}) ++ os.Setenv("STRING_EXP_TEST", "abc") ++ ++ doTest := func(arg string, want bool) { ++ t.Helper() ++ urlMatcher, err := QuickSetupFromStr(arg) ++ r.NoError(err) ++ got, err := urlMatcher.Match(context.Background(), qc) ++ r.NoError(err) ++ r.Equal(want, got) ++ } ++ ++ doTest("url_path zl", false) ++ doTest("url_path eq /dns-query", true) ++ doTest("url_path eq /123 /dns-query /abc", true) ++ doTest("url_path eq /123 /abc", false) ++ doTest("url_path contains abc dns def", true) ++ doTest("url_path contains abc def", false) ++ doTest("url_path prefix abc /dns def", true) ++ doTest("url_path prefix abc def", false) ++ doTest("url_path suffix abc query def", true) ++ doTest("url_path suffix abc def", false) ++ doTest("url_path regexp ^/dns-query$", true) ++ doTest("url_path regexp ^abc", false) ++ ++ doTest("server_name eq abc a.b.c def", true) ++ doTest("server_name eq abc def", false) ++ ++ doTest("$STRING_EXP_TEST eq 123 abc def", true) ++ doTest("$STRING_EXP_TEST eq 123 def", false) ++ doTest("$STRING_EXP_TEST_NOT_EXIST eq 123 abc def", false) ++ doTest("$STRING_EXP_TEST_NOT_EXIST zl", true) ++} +-- +2.34.8 + diff --git a/mosdns/patches/123-add-plugin-rate_limiter.patch b/mosdns/patches/123-add-plugin-rate_limiter.patch new file mode 100644 index 0000000..56c50ac --- /dev/null +++ b/mosdns/patches/123-add-plugin-rate_limiter.patch @@ -0,0 +1,297 @@ +From 11436dd9cde412f83d1bfbd06b4163445c52bb12 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Fri, 22 Sep 2023 20:55:49 +0800 +Subject: [PATCH 7/9] add plugin rate_limiter + +--- + go.mod | 1 + + go.sum | 2 + + pkg/rate_limiter/rate_limiter.go | 145 ++++++++++++++++++ + plugin/enabled_plugins.go | 1 + + .../executable/rate_limiter/rate_limiter.go | 85 ++++++++++ + 5 files changed, 234 insertions(+) + create mode 100644 pkg/rate_limiter/rate_limiter.go + create mode 100644 plugin/executable/rate_limiter/rate_limiter.go + +diff --git a/go.mod b/go.mod +index 7c2b96a..aea0c99 100644 +--- a/go.mod ++++ b/go.mod +@@ -63,6 +63,7 @@ require ( + golang.org/x/crypto v0.13.0 // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect ++ golang.org/x/time v0.3.0 // indirect + golang.org/x/tools v0.13.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +diff --git a/go.sum b/go.sum +index dd20043..d2b393f 100644 +--- a/go.sum ++++ b/go.sum +@@ -413,6 +413,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= + golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= + golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= ++golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= ++golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= + golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= + golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= + golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +diff --git a/pkg/rate_limiter/rate_limiter.go b/pkg/rate_limiter/rate_limiter.go +new file mode 100644 +index 0000000..30fa516 +--- /dev/null ++++ b/pkg/rate_limiter/rate_limiter.go +@@ -0,0 +1,145 @@ ++package rate_limiter ++ ++import ( ++ "io" ++ "net/netip" ++ "sync" ++ "time" ++ ++ "golang.org/x/time/rate" ++) ++ ++type RateLimiter interface { ++ Allow(addr netip.Addr) bool ++ io.Closer ++} ++ ++type limiter struct { ++ limit rate.Limit ++ burst int ++ mask4 int ++ mask6 int ++ ++ closeOnce sync.Once ++ closeNotify chan struct{} ++ m sync.Mutex ++ tables map[netip.Addr]*limiterEntry ++} ++ ++type limiterEntry struct { ++ l *rate.Limiter ++ lastSeen time.Time ++ sync.Once ++} ++ ++// limit and burst should be greater than zero. ++// If gcInterval is <= 0, it will be automatically chosen between 2~10s. ++// In this case, if the token refill time (burst/limit) is greater than 10s, ++// the actual average qps limit may be higher than expected. ++// If mask is zero or greater than 32/128. The default is 32/48. ++// If mask is negative, the masks will be 0. ++func NewRateLimiter(limit rate.Limit, burst int, gcInterval time.Duration, mask4, mask6 int) RateLimiter { ++ if mask4 > 32 || mask4 == 0 { ++ mask4 = 32 ++ } ++ if mask4 < 0 { ++ mask4 = 0 ++ } ++ ++ if mask6 > 128 || mask6 == 0 { ++ mask6 = 48 ++ } ++ if mask6 < 0 { ++ mask6 = 0 ++ } ++ ++ if gcInterval <= 0 { ++ if limit <= 0 || burst <= 0 { ++ gcInterval = time.Second * 2 ++ } else { ++ refillSec := float64(burst) / float64(limit) ++ if refillSec < 2 { ++ refillSec = 2 ++ } ++ if refillSec > 10 { ++ refillSec = 10 ++ } ++ gcInterval = time.Duration(refillSec) * time.Second ++ } ++ } ++ ++ l := &limiter{ ++ limit: limit, ++ burst: burst, ++ mask4: mask4, ++ mask6: mask6, ++ closeNotify: make(chan struct{}), ++ tables: make(map[netip.Addr]*limiterEntry), ++ } ++ go l.gcLoop(gcInterval) ++ return l ++} ++ ++func (l *limiter) Allow(a netip.Addr) bool { ++ a = l.applyMask(a) ++ now := time.Now() ++ l.m.Lock() ++ e, ok := l.tables[a] ++ if !ok { ++ e = &limiterEntry{ ++ l: rate.NewLimiter(l.limit, l.burst), ++ lastSeen: now, ++ } ++ l.tables[a] = e ++ } ++ e.lastSeen = now ++ clientLimiter := e.l ++ l.m.Unlock() ++ return clientLimiter.AllowN(now, 1) ++} ++ ++func (l *limiter) Close() error { ++ l.closeOnce.Do(func() { ++ close(l.closeNotify) ++ }) ++ return nil ++} ++ ++func (l *limiter) gcLoop(gcInterval time.Duration) { ++ ticker := time.NewTicker(gcInterval) ++ defer ticker.Stop() ++ ++ for { ++ select { ++ case <-l.closeNotify: ++ return ++ case now := <-ticker.C: ++ l.doGc(now, gcInterval) ++ } ++ } ++} ++ ++func (l *limiter) doGc(now time.Time, gcInterval time.Duration) { ++ l.m.Lock() ++ defer l.m.Unlock() ++ ++ for a, e := range l.tables { ++ if now.Sub(e.lastSeen) > gcInterval { ++ delete(l.tables, a) ++ } ++ } ++} ++ ++func (l *limiter) applyMask(a netip.Addr) netip.Addr { ++ switch { ++ case a.Is4(): ++ m, _ := a.Prefix(l.mask4) ++ return m.Addr() ++ case a.Is4In6(): ++ m, _ := netip.AddrFrom4(a.As4()).Prefix(l.mask4) ++ return m.Addr() ++ default: ++ m, _ := a.Prefix(l.mask6) ++ return m.Addr() ++ } ++} +diff --git a/plugin/enabled_plugins.go b/plugin/enabled_plugins.go +index dfb311b..d72ed07 100644 +--- a/plugin/enabled_plugins.go ++++ b/plugin/enabled_plugins.go +@@ -54,6 +54,7 @@ import ( + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/metrics_collector" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/nftset" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/query_summary" ++ _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/rate_limiter" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/redirect" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/reverse_lookup" + _ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" +diff --git a/plugin/executable/rate_limiter/rate_limiter.go b/plugin/executable/rate_limiter/rate_limiter.go +new file mode 100644 +index 0000000..241f947 +--- /dev/null ++++ b/plugin/executable/rate_limiter/rate_limiter.go +@@ -0,0 +1,85 @@ ++/* ++ * Copyright (C) 2020-2022, IrineSistiana ++ * ++ * This file is part of mosdns. ++ * ++ * mosdns is free software: you can redistribute it and/or modify ++ * it under the terms of the GNU General Public License as published by ++ * the Free Software Foundation, either version 3 of the License, or ++ * (at your option) any later version. ++ * ++ * mosdns is distributed in the hope that it will be useful, ++ * but WITHOUT ANY WARRANTY; without even the implied warranty of ++ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ++ * GNU General Public License for more details. ++ * ++ * You should have received a copy of the GNU General Public License ++ * along with this program. If not, see . ++ */ ++ ++package rate_limiter ++ ++import ( ++ "context" ++ ++ "github.com/IrineSistiana/mosdns/v5/coremain" ++ "github.com/IrineSistiana/mosdns/v5/pkg/query_context" ++ "github.com/IrineSistiana/mosdns/v5/pkg/rate_limiter" ++ "github.com/IrineSistiana/mosdns/v5/pkg/utils" ++ "github.com/IrineSistiana/mosdns/v5/plugin/executable/sequence" ++ "github.com/miekg/dns" ++ "golang.org/x/time/rate" ++) ++ ++const PluginType = "rate_limiter" ++ ++func init() { ++ coremain.RegNewPluginFunc(PluginType, Init, func() any { return new(Args) }) ++} ++ ++type Args struct { ++ Qps float64 `yaml:"qps"` ++ Burst int `yaml:"burst"` ++ Mask4 int `yaml:"mask4"` ++ Mask6 int `yaml:"mask6"` ++} ++ ++func (args *Args) init() { ++ utils.SetDefaultUnsignNum(&args.Qps, 20) ++ utils.SetDefaultUnsignNum(&args.Burst, 40) ++ utils.SetDefaultUnsignNum(&args.Mask4, 32) ++ utils.SetDefaultUnsignNum(&args.Mask4, 48) ++} ++ ++var _ sequence.Executable = (*RateLimiter)(nil) ++ ++type RateLimiter struct { ++ l rate_limiter.RateLimiter ++} ++ ++func Init(_ *coremain.BP, args any) (any, error) { ++ return New(*(args.(*Args))), nil ++} ++ ++func New(args Args) *RateLimiter { ++ args.init() ++ l := rate_limiter.NewRateLimiter(rate.Limit(args.Qps), args.Burst, 0, args.Mask4, args.Mask6) ++ return &RateLimiter{l: l} ++} ++ ++func (s *RateLimiter) Exec(ctx context.Context, qCtx *query_context.Context) error { ++ clientAddr := qCtx.QueryMeta().ClientAddr ++ if clientAddr.IsValid() { ++ if !s.l.Allow(clientAddr) { ++ qCtx.SetResponse(refuse(qCtx.Q())) ++ } ++ } ++ return nil ++} ++ ++func refuse(q *dns.Msg) *dns.Msg { ++ r := new(dns.Msg) ++ r.SetReply(q) ++ r.Rcode = dns.RcodeRefused ++ return r ++} +-- +2.34.8 + diff --git a/mosdns/patches/124-transport-fixed-eol-pipelineConn-was-t-closed.patch b/mosdns/patches/124-transport-fixed-eol-pipelineConn-was-t-closed.patch new file mode 100644 index 0000000..cfdf38e --- /dev/null +++ b/mosdns/patches/124-transport-fixed-eol-pipelineConn-was-t-closed.patch @@ -0,0 +1,158 @@ +From f81a617d6bc3ad05bd9c9edd343083f4b4c09cd4 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Sat, 23 Sep 2023 08:28:10 +0800 +Subject: [PATCH 8/9] transport: fixed eol pipelineConn was't closed + +when calling PipelineTransport.Close() +--- + pkg/upstream/transport/pipeline.go | 54 ++++++++++++++----------- + pkg/upstream/transport/pipeline_test.go | 2 +- + 2 files changed, 31 insertions(+), 25 deletions(-) + +diff --git a/pkg/upstream/transport/pipeline.go b/pkg/upstream/transport/pipeline.go +index 10c3d23..70f9c8d 100644 +--- a/pkg/upstream/transport/pipeline.go ++++ b/pkg/upstream/transport/pipeline.go +@@ -33,10 +33,11 @@ import ( + type PipelineTransport struct { + PipelineOpts + +- m sync.Mutex // protect following fields +- closed bool +- r *rand.Rand +- conns []*pipelineConn ++ m sync.Mutex // protect following fields ++ closed bool ++ r *rand.Rand ++ activeConns []*pipelineConn ++ conns map[*pipelineConn]struct{} + } + + type PipelineOpts struct { +@@ -66,6 +67,7 @@ func NewPipelineTransport(opt PipelineOpts) *PipelineTransport { + return &PipelineTransport{ + PipelineOpts: opt, + r: rand.New(rand.NewSource(time.Now().Unix())), ++ conns: make(map[*pipelineConn]struct{}), + } + } + +@@ -73,13 +75,13 @@ func (t *PipelineTransport) ExchangeContext(ctx context.Context, m *dns.Msg) (*d + const maxAttempt = 3 + attempt := 0 + for { +- conn, allocatedQid, isNewConn, wg, err := t.getPipelineConn() ++ pc, allocatedQid, isNewConn, err := t.getPipelineConn() + if err != nil { + return nil, err + } + +- r, err := conn.exchangePipeline(ctx, m, allocatedQid) +- wg.Done() ++ r, err := pc.dc.exchangePipeline(ctx, m, allocatedQid) ++ pc.wg.Done() + + if err != nil { + // Reused connection may not stable. +@@ -103,7 +105,7 @@ func (t *PipelineTransport) Close() error { + return nil + } + t.closed = true +- for _, conn := range t.conns { ++ for conn := range t.conns { + conn.dc.closeWithErr(errClosedTransport) + } + return nil +@@ -113,10 +115,9 @@ func (t *PipelineTransport) Close() error { + // Caller must call wg.Done() after dnsConn.exchangePipeline. + // The returned dnsConn is ready to serve queries. + func (t *PipelineTransport) getPipelineConn() ( +- dc *dnsConn, ++ pc *pipelineConn, + allocatedQid uint16, + isNewConn bool, +- wg *sync.WaitGroup, + err error, + ) { + t.m.Lock() +@@ -128,21 +129,19 @@ func (t *PipelineTransport) getPipelineConn() ( + + pci, pc := t.pickPipelineConnLocked() + +- // Dail a new connection if (conn pool is empty), or +- // (the picked conn is busy, and we are allowed to dail more connections). ++ // Dial a new connection if (conn pool is empty), or ++ // (the picked conn is busy, and we are allowed to dial more connections). + maxConn := t.MaxConn + if maxConn <= 0 { + maxConn = defaultPipelineMaxConns + } +- if pc == nil || (pc.dc.queueLen() > pipelineBusyQueueLen && len(t.conns) < maxConn) { +- dc = newDnsConn(t.IOOpts) ++ if pc == nil || (pc.dc.queueLen() > pipelineBusyQueueLen && len(t.activeConns) < maxConn) { ++ dc := newDnsConn(t.IOOpts) + pc = newPipelineConn(dc) + isNewConn = true +- pci = sliceAdd(&t.conns, pc) +- } else { +- dc = pc.dc ++ pci = sliceAdd(&t.activeConns, pc) ++ t.conns[pc] = struct{}{} + } +- wg = &pc.wg + + pc.wg.Add(1) + pc.servedLocked++ +@@ -152,13 +151,20 @@ func (t *PipelineTransport) getPipelineConn() ( + // This connection has served too many queries. + // Note: the connection should be closed only after all its queries finished. + // We can't close it here. Some queries may still on that connection. +- sliceDel(&t.conns, pci) ++ sliceDel(&t.activeConns, pci) // remove from active conns ++ } ++ t.m.Unlock() ++ ++ if eol { ++ // Cleanup when all queries is finished. + go func() { +- wg.Wait() +- dc.closeWithErr(errEOL) ++ pc.wg.Wait() ++ pc.dc.closeWithErr(errEOL) ++ t.m.Lock() ++ delete(t.conns, pc) ++ t.m.Unlock() + }() + } +- t.m.Unlock() + return + } + +@@ -167,9 +173,9 @@ func (t *PipelineTransport) getPipelineConn() ( + // Require holding PipelineTransport.m. + func (t *PipelineTransport) pickPipelineConnLocked() (int, *pipelineConn) { + for { +- pci, pc := sliceRandGet(t.conns, t.r) ++ pci, pc := sliceRandGet(t.activeConns, t.r) + if pc != nil && pc.dc.isClosed() { // closed conn, delete it and retry +- sliceDel(&t.conns, pci) ++ sliceDel(&t.activeConns, pci) + continue + } + return pci, pc // conn pool is empty or we got a pc +diff --git a/pkg/upstream/transport/pipeline_test.go b/pkg/upstream/transport/pipeline_test.go +index 653d779..c595288 100644 +--- a/pkg/upstream/transport/pipeline_test.go ++++ b/pkg/upstream/transport/pipeline_test.go +@@ -86,7 +86,7 @@ func testPipelineTransport(t *testing.T, ioOpts IOOpts) { + wg.Wait() + + pt.m.Lock() +- pl := len(pt.conns) ++ pl := len(pt.activeConns) + pt.m.Unlock() + if pl > po.MaxConn { + t.Fatalf("max %d active conn, but got %d active conn(s)", po.MaxConn, pl) +-- +2.34.8 + diff --git a/mosdns/patches/125-upstream-doq-send-STREAM-FIN-after-query-is-sent.patch b/mosdns/patches/125-upstream-doq-send-STREAM-FIN-after-query-is-sent.patch new file mode 100644 index 0000000..e95e182 --- /dev/null +++ b/mosdns/patches/125-upstream-doq-send-STREAM-FIN-after-query-is-sent.patch @@ -0,0 +1,32 @@ +From 1fa552cbe353e6467d33bd9366712ad150f06af0 Mon Sep 17 00:00:00 2001 +From: Irine Sistiana <49315432+IrineSistiana@users.noreply.github.com> +Date: Sat, 23 Sep 2023 14:29:25 +0800 +Subject: [PATCH 9/9] upstream/doq: send STREAM FIN after query is sent + +fixed #720 +--- + pkg/upstream/doq/upstream.go | 8 ++++++++ + 1 file changed, 8 insertions(+) + +diff --git a/pkg/upstream/doq/upstream.go b/pkg/upstream/doq/upstream.go +index 23d7f1c..78d30db 100644 +--- a/pkg/upstream/doq/upstream.go ++++ b/pkg/upstream/doq/upstream.go +@@ -242,6 +242,14 @@ func (u *Upstream) exchange(s quic.Stream, payload []byte) (*dns.Msg, error) { + return nil, err + } + ++ // RFC 9250 4.2 ++ // The client MUST send the DNS query over the selected stream and MUST ++ // indicate through the STREAM FIN mechanism that no further data will ++ // be sent on that stream. ++ // ++ // Call Close() here will send the STREAM FIN. It won't close Read. ++ s.Close() ++ + resp, _, err := dnsutils.ReadMsgFromTCP(s) + if err != nil { + return nil, err +-- +2.34.8 + diff --git a/mosdns/patches/120-compatible-with-go1.20.patch b/mosdns/patches/203-compatible-with-go1.20.patch similarity index 100% rename from mosdns/patches/120-compatible-with-go1.20.patch rename to mosdns/patches/203-compatible-with-go1.20.patch