XSRF

XSRF(Cross-site request forgery,跨站请求伪造),也被简写为 CSRF,发音为”sea surf”,这是一个常见的安全漏洞(这里有一篇写的不错的文章《浅谈CSRF攻击方式》,有兴趣的可以简单了解下)。通常 Synchronizer token pattern 是一种常见的解决方案,该方案利用了第三方站点无法访问 cookie 的限制,为每个客户端设置一个不同的 token,并将其存储在 cookie 中。当用户发起有副作用的 HTTP 请求时,则必须携带一个包含该 token 的参数(也可以通过 Http Header 传递),服务端将对存储在 cookie 和请求参数中的 token 进行比较,以防止潜在的跨站请求伪造。

生成 xsrf_token

tornado.web.RequestHandler 中与生成跨站请求伪造 token 直接相关的是 xsrf_token 属性和 xsrf_form_html 方法。

xsrf_token 属性在首次访问时会为客户端设置一个名为 _xsrf 的 cookie,其值变为前面所说的 token。token 有两个版本,版本号分别为 1 和 2,若没有在应用中设置 xsrf_cookie_version 参数则默认使用版本 2。版本 2 为每次请求都生成一个随机的掩码,相比较版本 1 而言安全性大大增强。

xsrf_form_html 就是返回一个隐藏的 HTML < input/> 元素,用于包含在页面的 Form 元素中以便在 POST 请求时将 token 发送给服务端验证。

详细代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@property
def xsrf_token(self):
    if not hasattr(self, "_xsrf_token"):
        version, token, timestamp = self._get_raw_xsrf_token()
        output_version = self.settings.get("xsrf_cookie_version", 2)
        if output_version == 1:
            self._xsrf_token = binascii.b2a_hex(token)
        elif output_version == 2:
            mask = os.urandom(4)
            self._xsrf_token = b"|".join([
                b"2",
                binascii.b2a_hex(mask),
                binascii.b2a_hex(_websocket_mask(mask, token)),
                utf8(str(int(timestamp)))])
        else:
            raise ValueError("unknown xsrf cookie version %d",
                             output_version)
        if version is None:
            expires_days = 30 if self.current_user else None
            self.set_cookie("_xsrf", self._xsrf_token,
                            expires_days=expires_days)
    return self._xsrf_token

def xsrf_form_html(self):
    return '<input type="hidden" name="_xsrf" value="' + \
        escape.xhtml_escape(self.xsrf_token) + '"/>'

上述两个方法关联的另外几个方法是 _get_raw_xsrf_token_decode_xsrf_token。首次访问 _get_raw_xsrf_token 方法时,将尝试为当前用户请求生成 token(若已经生成,则直接从 cookie “_xsrf” 中获取),并赋值给 handler 的 _raw_xsrf_token 字段。_decode_xsrf_token 方法将 token 解析为 (version, token, timestamp) 元组返回(兼容版本 1 ,版本 1 中没有 version 和 timestamp 字段)。代码很简单,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def _get_raw_xsrf_token(self):
    if not hasattr(self, '_raw_xsrf_token'):
        cookie = self.get_cookie("_xsrf")
        if cookie:
            version, token, timestamp = self._decode_xsrf_token(cookie)
        else:
            version, token, timestamp = None, None, None
        if token is None:
            version = None
            token = os.urandom(16)
            timestamp = time.time()
        self._raw_xsrf_token = (version, token, timestamp)
    return self._raw_xsrf_token

def _decode_xsrf_token(self, cookie):
    m = _signed_value_version_re.match(utf8(cookie))
    if m:
        version = int(m.group(1))
        if version == 2:
            _, mask, masked_token, timestamp = cookie.split("|")
            mask = binascii.a2b_hex(utf8(mask))
            token = _websocket_mask(
                mask, binascii.a2b_hex(utf8(masked_token)))
            timestamp = int(timestamp)
            return version, token, timestamp
        else:
            # Treat unknown versions as not present instead of failing.
            return None, None, None
    else:
        version = 1
        try:
            token = binascii.a2b_hex(utf8(cookie))
        except (binascii.Error, TypeError):
            token = utf8(cookie)
        # We don't have a usable timestamp in older versions.
        timestamp = int(time.time())
        return (version, token, timestamp)

检查 xsrf_token

对 xsrf_token 的检查在 _execute 方法中委托 check_xsrf_cookie 方法进行,代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def _execute(
        self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
    ) -> None:
        """Executes this request with the given output transforms."""
        self._transforms = transforms
        try:
            if self.request.method not in self.SUPPORTED_METHODS:
                raise HTTPError(405)
            self.path_args = [self.decode_argument(arg) for arg in args]
            self.path_kwargs = dict(
                (k, self.decode_argument(v, name=k)) for (k, v) in kwargs.items()
            )
            # If XSRF cookies are turned on, reject form submissions without
            # the proper cookie
            if self.request.method not in (
                "GET",
                "HEAD",
                "OPTIONS",
            ) and self.application.settings.get("xsrf_cookies"):
                self.check_xsrf_cookie()

            result = self.prepare()
            if result is not None:
                result = await result
            if self._prepared_future is not None:
                # Tell the Application we've finished with prepare()
                # and are ready for the body to arrive.
                future_set_result_unless_cancelled(self._prepared_future, None)
            if self._finished:
                return

            if _has_stream_request_body(self.__class__):
                # In streaming mode request.body is a Future that signals
                # the body has been completely received.  The Future has no
                # result; the data has been passed to self.data_received
                # instead.
                try:
                    await self.request._body_future
                except iostream.StreamClosedError:
                    return

            method = getattr(self, self.request.method.lower())
            result = method(*self.path_args, **self.path_kwargs)
            if result is not None:
                result = await result
            if self._auto_finish and not self._finished:
                self.finish()
        except Exception as e:
            try:
                self._handle_request_exception(e)
            except Exception:
                app_log.error("Exception in exception handler", exc_info=True)
            finally:
                # Unset result to avoid circular references
                result = None
            if self._prepared_future is not None and not self._prepared_future.done():
                # In case we failed before setting _prepared_future, do it
                # now (to unblock the HTTP server).  Note that this is not
                # in a finally block to avoid GC issues prior to Python 3.4.
                self._prepared_future.set_result(None)

def check_xsrf_cookie(self):
    token = (self.get_argument("_xsrf", None) or
             self.request.headers.get("X-Xsrftoken") or
             self.request.headers.get("X-Csrftoken"))
    if not token:
        raise HTTPError(403, "'_xsrf' argument missing from POST")
    _, token, _ = self._decode_xsrf_token(token)
    _, expected_token, _ = self._get_raw_xsrf_token()
    if not _time_independent_equals(utf8(token), utf8(expected_token)):
        raise HTTPError(403, "XSRF cookie does not match POST argument")

check_xsrf_cookie 方法代码显示与 cookie 中的 token 进行比较的 token 来源于请求参数 _xsrf 或者 HTTP 头域(X-Xsrftoken 或者 X-Csrftoken)。目前仅比较 token 值,对其中的 timestamp 和 version 字段不做比较验证。

由上可见,xsrf cookies 的生成仅与是否访问 xsrf_token 属性相关,要进行验证则需要为应用设置 xsrf_cookies 为 True。