博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
「旁门右道」CURL持久连接技巧
阅读量:7211 次
发布时间:2019-06-29

本文共 14642 字,大约阅读时间需要 48 分钟。

背景

对于同一服务可能存在多次调用的情况,然而每次调用都需要建立一次tcp连接导致大量重复工作的同时还增加了连接超时或连接错误的概率,为了减少tcp连接次数最大限度的提高连接利用率,需要能够重复利用每个tcp连接。

原理

  • HTTP1.1与HTTP2.0支持对于一次TCP连接建立的通道重复使用。
  • HTTP2.0支持多路复用
  • CURL支持对HTTP1.1和HTTP2.0已建立连接的复用,如果旧连接已失效则主动关闭旧连接,如果连接有效则尝试使用已有连接传输数据。关键代码如下:
// php/ext/url/interface.c/* {
{
{ proto bool curl_exec(resource ch) Perform a cURL session */PHP_FUNCTION(curl_exec){ CURLcode error; zval *zid; php_curl *ch; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &zid) == FAILURE) { return; } ZEND_FETCH_RESOURCE(ch, php_curl *, &zid, -1, le_curl_name, le_curl); _php_curl_verify_handlers(ch, 1 TSRMLS_CC); _php_curl_cleanup_handle(ch); // 调用CURL方法 error = curl_easy_perform(ch->cp); SAVE_CURL_ERROR(ch, error); /* CURLE_PARTIAL_FILE is returned by HEAD requests */ if (error != CURLE_OK && error != CURLE_PARTIAL_FILE) { if (ch->handlers->write->buf.len > 0) { smart_str_free(&ch->handlers->write->buf); } RETURN_FALSE; } if (ch->handlers->std_err) { php_stream *stream; stream = (php_stream*)zend_fetch_resource(&ch->handlers->std_err TSRMLS_CC, -1, NULL, NULL, 2, php_file_le_stream(), php_file_le_pstream()); if (stream) { php_stream_flush(stream); } } if (ch->handlers->write->method == PHP_CURL_RETURN && ch->handlers->write->buf.len > 0) { smart_str_0(&ch->handlers->write->buf); RETURN_STRINGL(ch->handlers->write->buf.c, ch->handlers->write->buf.len, 1); } /* flush the file handle, so any remaining data is synched to disk */ if (ch->handlers->write->method == PHP_CURL_FILE && ch->handlers->write->fp) { fflush(ch->handlers->write->fp); } if (ch->handlers->write_header->method == PHP_CURL_FILE && ch->handlers->write_header->fp) { fflush(ch->handlers->write_header->fp); } if (ch->handlers->write->method == PHP_CURL_RETURN) { RETURN_EMPTY_STRING(); } else { RETURN_TRUE; }}/* }}} */// curl/lib/url.c line 4328 // 主动关闭已失效的连接 prune_dead_connections(data); /************************************************************* * Check the current list of connections to see if we can * re-use an already existing one or if we have to create a * new one. *************************************************************/ /* reuse_fresh is TRUE if we are told to use a new connection by force, but we only acknowledge this option if this is not a re-used connection already (which happens due to follow-location or during a HTTP authentication phase). */ if(data->set.reuse_fresh && !data->state.this_is_a_follow) reuse = FALSE; else // 从已存在的链接中查找出可以复用的连接(如果是不支持多路复用且正在使用中的连接会被忽略) reuse = ConnectionExists(data, conn, &conn_temp, &force_reuse, &waitpipe); /* If we found a reusable connection, we may still want to open a new connection if we are pipelining. */ if(reuse && !force_reuse && IsPipeliningPossible(data, conn_temp)) { size_t pipelen = conn_temp->send_pipe.size + conn_temp->recv_pipe.size; if(pipelen > 0) { infof(data, "Found connection %ld, with requests in the pipe (%zu)\n", conn_temp->connection_id, pipelen); if(conn_temp->bundle->num_connections < max_host_connections && data->state.conn_cache->num_connections < max_total_connections) { /* We want a new connection anyway */ reuse = FALSE; infof(data, "We can reuse, but we want a new connection anyway\n"); } } } if(reuse) { /* * We already have a connection for this, we got the former connection * in the conn_temp variable and thus we need to cleanup the one we * just allocated before we can move along and use the previously * existing one. */ conn_temp->inuse = TRUE; /* mark this as being in use so that no other handle in a multi stack may nick it */ reuse_conn(conn, conn_temp); free(conn); /* we don't need this anymore */ conn = conn_temp; *in_connect = conn; infof(data, "Re-using existing connection! (#%ld) with %s %s\n", conn->connection_id, conn->bits.proxy?"proxy":"host", conn->socks_proxy.host.name ? conn->socks_proxy.host.dispname : conn->http_proxy.host.name ? conn->http_proxy.host.dispname : conn->host.dispname); } else { /* We have decided that we want a new connection. However, we may not be able to do that if we have reached the limit of how many connections we are allowed to open. */ struct connectbundle *bundle = NULL; if(conn->handler->flags & PROTOPT_ALPN_NPN) { /* The protocol wants it, so set the bits if enabled in the easy handle (default) */ if(data->set.ssl_enable_alpn) conn->bits.tls_enable_alpn = TRUE; if(data->set.ssl_enable_npn) conn->bits.tls_enable_npn = TRUE; } if(waitpipe) /* There is a connection that *might* become usable for pipelining "soon", and we wait for that */ connections_available = FALSE; else bundle = Curl_conncache_find_bundle(conn, data->state.conn_cache); if(max_host_connections > 0 && bundle && (bundle->num_connections >= max_host_connections)) { struct connectdata *conn_candidate; /* The bundle is full. Let's see if we can kill a connection. */ conn_candidate = find_oldest_idle_connection_in_bundle(data, bundle); if(conn_candidate) { /* Set the connection's owner correctly, then kill it */ conn_candidate->data = data; (void)Curl_disconnect(conn_candidate, /* dead_connection */ FALSE); } else { infof(data, "No more connections allowed to host: %d\n", max_host_connections); connections_available = FALSE; } } if(connections_available && (max_total_connections > 0) && (data->state.conn_cache->num_connections >= max_total_connections)) { struct connectdata *conn_candidate; /* The cache is full. Let's see if we can kill a connection. */ conn_candidate = Curl_conncache_oldest_idle(data); if(conn_candidate) { /* Set the connection's owner correctly, then kill it */ conn_candidate->data = data; (void)Curl_disconnect(conn_candidate, /* dead_connection */ FALSE); } else { infof(data, "No connections available in cache\n"); connections_available = FALSE; } } if(!connections_available) { infof(data, "No connections available.\n"); conn_free(conn); *in_connect = NULL; result = CURLE_NO_CONNECTION_AVAILABLE; goto out; } else { /* * This is a brand new connection, so let's store it in the connection * cache of ours! */ Curl_conncache_add_conn(data->state.conn_cache, conn); }#if defined(USE_NTLM) /* If NTLM is requested in a part of this connection, make sure we don't assume the state is fine as this is a fresh connection and NTLM is connection based. */ if((data->state.authhost.picked & (CURLAUTH_NTLM | CURLAUTH_NTLM_WB)) && data->state.authhost.done) { infof(data, "NTLM picked AND auth done set, clear picked!\n"); data->state.authhost.picked = CURLAUTH_NONE; data->state.authhost.done = FALSE; } if((data->state.authproxy.picked & (CURLAUTH_NTLM | CURLAUTH_NTLM_WB)) && data->state.authproxy.done) { infof(data, "NTLM-proxy picked AND auth done set, clear picked!\n"); data->state.authproxy.picked = CURLAUTH_NONE; data->state.authproxy.done = FALSE; }#endif }// curl/lib/multi.c/* * This function scans the connection cache for half-open/dead connections, * closes and removes them. * The cleanup is done at most once per second. */static void prune_dead_connections(struct Curl_easy *data){ struct curltime now = Curl_now(); time_t elapsed = Curl_timediff(now, data->state.conn_cache->last_cleanup); if(elapsed >= 1000L) { Curl_conncache_foreach(data, data->state.conn_cache, data, call_disconnect_if_dead); data->state.conn_cache->last_cleanup = now; }}

PHP实现

class Curl{    protected $ch = null;    protected $errorCode = 0;    protected $errorMsg = '';    protected $curlInfo = array();    protected $verbose = null;    private static $instance = null;    public function getLastErrorCode()    {        return $this->errorCode;    }    public function getLastErrorMsg()    {        return $this->errorMsg;    }    public function getLastCurlInfo()    {        return $this->curlInfo;    }    private function __construct()    {        $this->ch = curl_init();    }    /*     * 单例模式防止被clone     */    private function __clone(){        throw new CurlException('The Curl library can\'t be cloned');    }    /*     * 使用单例模式调用     */    public static function getInstance(){        if(!self::$instance instanceof self){            self::$instance = new self();        }        return self::$instance;    }    /**     * curl以get的方式访问     * @param $url     * @param int $timeout     * @param array $params get请求的参数,可以在url中直接带参数,也可以在这里传     * @param array $headers 支持['Accept' => 'application/json']和['Accept: application/json']两种方式     * @return mixed     */    public function get($url, $timeout = 3, $params = [], $headers = [])    {        $url = $this->buildQuery($url,$params);        $this->setGeneralOption($url,$timeout,$headers);        $result = $this->execute();        return $result;    }    /**     * curl以post的方式访问     * @param $url     * @param array $params     * @param array $headers 支持['Accept' => 'application/json']和['Accept: application/json']两种方式     * @param bool $withHttpBuildQuery     * @param int $timeout     * @return mixed     */    public function post($url, $params = [], $headers = [], $withHttpBuildQuery = true, $timeout=3)    {        if ($withHttpBuildQuery) {            if (!is_array($params)) {                $params = [$params];            }            $params = http_build_query($params);        }                curl_setopt($this->ch, CURLOPT_POST, 1);        curl_setopt($this->ch, CURLOPT_POSTFIELDS, $params);        $this->setGeneralOption($url,$timeout,$headers);        $result = $this->execute();        return $result;    }    /**     * curl以HTTP2.0 get的方式访问     * @param string $url 请求URL     * @param int $timeout 超时时间,单位秒     * @param array $params get请求的参数,可以在url中直接带参数,也可以在这里传     * @param array $headers 支持['Accept' => 'application/json']和['Accept: application/json']两种方式     * @return mixed     */    public function get2($url, $timeout = 3, $params = [], $headers = [])    {        $url = $this->buildQuery($url,$params);        $this->setGeneralOption($url,$timeout,$headers,CURL_HTTP_VERSION_2_0);        $result = $this->execute();        return $result;    }    /**     * curl以HTTP2.0 post的方式访问     * @param string $url 请求URL     * @param array $params     * @param array $headers 支持['Accept' => 'application/json']和['Accept: application/json']两种方式     * @param bool $withHttpBuildQuery     * @param int $timeout 超时时间,单位秒     * @return mixed     */    public function post2($url, $params = [], $headers = [], $withHttpBuildQuery = true, $timeout=3)    {        if ($withHttpBuildQuery) {            if (!is_array($params)) {                $params = [$params];            }            $params = http_build_query($params);        }        curl_setopt($this->ch, CURLOPT_POST, 1);        curl_setopt($this->ch, CURLOPT_POSTFIELDS, $params);        $this->setGeneralOption($url,$timeout,$headers,CURL_HTTP_VERSION_2_0);        $result = $this->execute();        return $result;    }    /**     * 实例销毁前主动关闭所有连接     */    public function __destruct()    {        $this->close();    }    /**     * 关闭所有连接     * Description: 这一步在php-fpm中可以省略,实例结束后php-fpm的垃圾回收机制会关闭     */    public function close()    {        if (is_resource($this->ch)) {            curl_close($this->ch);            $this->ch = null;        }    }    /**     * 拼接请求URL     * @param string $url 请求URL     * @param array $params 待拼接参数     * @return string     */    protected function buildQuery($url,$params)    {        if (!$params) {            return $url;        }        if (strpos($url, '?') === false) {            $url .= '?';        } else {            $url .= '&';        }        $url .= http_build_query($params);        return $url;    }    /**     * 设置通用curl配置     * @param string $url 请求URL     * @param int $timeout 超时时间,单位秒     * @param array $headers 请求header     * @param int $httpVersion 使用的http协议,默认为1.1     */    protected function setGeneralOption($url,$timeout,$headers=array(),$httpVersion=CURL_HTTP_VERSION_1_1)    {        curl_setopt($this->ch, CURLOPT_URL, $url);        curl_setopt($this->ch, CURLOPT_SSL_VERIFYPEER, true); //让CURL支持HTTPS访问        curl_setopt($this->ch, CURLOPT_SSL_VERIFYHOST, 2);        curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, 1);        curl_setopt($this->ch, CURLOPT_TIMEOUT, $timeout);        curl_setopt($this->ch, CURLOPT_HTTP_VERSION, $httpVersion);        // 启用debug获取更详细的连接信息,与CURLOPT_HEADER互斥        curl_setopt($this->ch, CURLOPT_VERBOSE, 1);        $this->verbose = fopen('php://temp', 'w+');        curl_setopt($this->ch, CURLOPT_STDERR, $this->verbose);        if ($headers && is_array($headers)) {            $realHeader = [];            foreach ($headers as $key => $val) {                if (is_string($key)) {                    $realHeader[] = $key. ': '. $val;                } else {                    $realHeader[] = $val;                }            }            curl_setopt($this->ch, CURLOPT_HTTPHEADER, $realHeader);        }    }    /**     * 执行请求     * @return mixed     */    protected function execute()    {        $result = curl_exec($this->ch);        // 记录详细的debug信息        $this->curlInfo = curl_getinfo($this->ch);        rewind($this->verbose);        $this->curlInfo['verbose'] = stream_get_contents($this->verbose);        $this->verbose = null;        if ($result === false) {            $this->errorCode = curl_errno($this->ch);            $this->errorMsg = curl_error($this->ch);            $this->curlInfo['error_code'] = $this->errorCode;            $this->curlInfo['error_message'] = $this->errorMsg;        }        curl_reset($this->ch);        return $result;    }}class CurlException extends \Exception {}

拓展

  • 由于PHP-FPM的回收机制,一次请求结束后CURL的资源将会被回收,这意味着这次请求建立的TCP连接将会被关闭,在这种情况下就无法达到垮请求复用的目的。因此可以利用独立进程的方式来维护已建立的TCP连接专门负责CURL的请求。
  • 对于HTTP2.0而言,由于支持多路复用,因此对于一个域名的请求建立一次tcp连接后可以支持同时多个请求的处理(HTTP1.1一个tcp连接同时只支持一个请求,如果第二个请求同时到达则CURL将建立新的tcp连接以便完成请求),利用这一特性使用独立进程配合协程可以达到对于单一场景的curl高并发的支撑。
  • 同理除PHP外可扩展到其他语言。

 By佐柱

转载请注明出处,也欢迎偶尔逛逛我的,谢谢 :)

你可能感兴趣的文章
时序数据库连载系列: 时序数据库一哥InfluxDB之存储机制解析
查看>>
sorl实现商品快速搜索
查看>>
Webpack4 学习笔记 - 01:webpack的安装和简单配置
查看>>
236. Lowest Common Ancestor of a Binary Tree
查看>>
300. Longest Increasing Subsequence
查看>>
GO基础编程-自定义函数
查看>>
你真的懂switch吗?聊聊switch语句中的块级作用域
查看>>
从0到1,了解NLP中的文本相似度
查看>>
HTML5新特性总结
查看>>
超越时代的天才——图灵
查看>>
使用 ale.js 制作一个小而美的表格编辑器(2)
查看>>
mybatis常用标签和动态查询
查看>>
以太坊交易源码分析
查看>>
React组件常用设计模式之Render Props
查看>>
多多客DOODOOKE更新插件&模块及下载附件教程
查看>>
js简单倒计时
查看>>
手把手教你React(一)JSX与虚拟DOM
查看>>
snabbdom源码解析(七) 事件处理
查看>>
在北京做Java开发如何月薪达到两万,需要技术水平达到什么程度?
查看>>
移动端适配之二:visual viewport、layout viewport和ideal viewport介绍
查看>>