ShadowSocks-libev 源码分析 - ss local


1.入口

  • 进入local.c文件中的可以找到两个启动入口,一个是作为可执行文件时,会直接进入main函数,另一个则是作为library使用时,则可以通过int start_ss_local_server(profile_t profile); 或者int start_ss_local_server_with_callback(profile_t profile, ss_local_callback callback, void *udata);, 实际上这两个函数最终都是调用local.c中的int _start_ss_local_server(profile_t profile, ss_local_callback callback, void *udata)函数
  • int _start_ss_local_server(profile_t profile, ss_local_callback callback, void *udata) 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
int _start_ss_local_server(profile_t profile, ss_local_callback callback, void *udata)
{
// 更新随机数种子
srand(time(NULL));

char *remote_host = profile.remote_host;
char *local_addr = profile.local_addr;
char *method = profile.method;
char *password = profile.password;
char *log = profile.log;
int remote_port = profile.remote_port;
int local_port = profile.local_port;
int timeout = profile.timeout;
int mtu = 0;
int mptcp = 0;

mode = profile.mode;
fast_open = profile.fast_open;
verbose = profile.verbose;
mtu = profile.mtu;
mptcp = profile.mptcp;

char local_port_str[16];
char remote_port_str[16];
sprintf(local_port_str, "%d", local_port);
sprintf(remote_port_str, "%d", remote_port);

// 设置log
USE_LOGFILE(log);

if (profile.acl != NULL) {
// 初始化 acl
acl = !init_acl(profile.acl);
}

if (local_addr == NULL) {
local_addr = "127.0.0.1";
}

// ignore SIGPIPE
signal(SIGPIPE, SIG_IGN);
signal(SIGABRT, SIG_IGN);

// 初始化一些系统信号io处理
ev_signal_init(&sigint_watcher, signal_cb, SIGINT);
ev_signal_init(&sigterm_watcher, signal_cb, SIGTERM);
ev_signal_start(EV_DEFAULT, &sigint_watcher);
ev_signal_start(EV_DEFAULT, &sigterm_watcher);
ev_signal_init(&sigusr1_watcher, signal_cb, SIGUSR1);
ev_signal_init(&sigchld_watcher, signal_cb, SIGCHLD);
ev_signal_start(EV_DEFAULT, &sigusr1_watcher);
ev_signal_start(EV_DEFAULT, &sigchld_watcher);

// 根据配置的初始化对应的加解密组件
// Setup keys
LOGI("initializing ciphers... %s", method);
crypto = crypto_init(password, NULL, method);
if (crypto == NULL)
FATAL("failed to init ciphers");

struct sockaddr_storage storage;
memset(&storage, 0, sizeof(struct sockaddr_storage));
if (get_sockaddr(remote_host, remote_port_str, &storage, 0, ipv6first) == -1) {
return -1;
}

// Setup proxy context
struct ev_loop *loop = EV_DEFAULT;

struct sockaddr *remote_addr_tmp[MAX_REMOTE_NUM];
listen_ctx_t listen_ctx;
listen_ctx.remote_num = 1;
listen_ctx.remote_addr = remote_addr_tmp;
listen_ctx.remote_addr[0] = (struct sockaddr *)(&storage);
listen_ctx.timeout = timeout;
listen_ctx.iface = NULL;
listen_ctx.mptcp = mptcp;

if (strcmp(local_addr, ":") > 0)
LOGI("listening at [%s]:%s", local_addr, local_port_str);
else
LOGI("listening at %s:%s", local_addr, local_port_str);

if (mode != UDP_ONLY) {
// Setup socket
int listenfd;
listenfd = create_and_bind(local_addr, local_port_str);
if (listenfd == -1) {
ERROR("bind()");
return -1;
}
if (listen(listenfd, SOMAXCONN) == -1) {
ERROR("listen()");
return -1;
}
// 设置socket 为非阻塞模式
setnonblocking(listenfd);

listen_ctx.fd = listenfd;
// 初始化socket io 处理, 当 listenfd 可读,会回调 accept_cb 函数
ev_io_init(&listen_ctx.io, accept_cb, listenfd, EV_READ);
// 加入事件循环
ev_io_start(loop, &listen_ctx.io);
}

// Setup UDP
if (mode != TCP_ONLY) {
// 如果启用了 UDP 则初始化UDP 相关
LOGI("udprelay enabled");
struct sockaddr *addr = (struct sockaddr *)(&storage);
udp_fd = init_udprelay(local_addr, local_port_str, addr,
get_sockaddr_len(addr), mtu, crypto, timeout, NULL);
}
// 初始化一个链表, 用于后续保存 连接
// Init connections
cork_dllist_init(&connections);

if (callback) {
callback(listen_ctx.fd, udp_fd, udata);
}
// 启动 事件循环
// Enter the loop
ev_run(loop, 0);

if (verbose) {
LOGI("closed gracefully");
}

// Clean up
if (mode != UDP_ONLY) {
ev_io_stop(loop, &listen_ctx.io);
free_connections(loop);
close(listen_ctx.fd);
}

if (mode != TCP_ONLY) {
free_udprelay();
}

return 0;
}

2.处理连接,当收到client 连接时,会回调void accept_cb(EV_P_ ev_io *w, int revents)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void
accept_cb(EV_P_ ev_io *w, int revents)
{
listen_ctx_t *listener = (listen_ctx_t *)w;
int serverfd = accept(listener->fd, NULL, NULL);
if (serverfd == -1) {
ERROR("accept");
return;
}
// 设置为非阻塞 socket
setnonblocking(serverfd);
int opt = 1;
setsockopt(serverfd, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt));
#ifdef SO_NOSIGPIPE
setsockopt(serverfd, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt));
#endif
// 创建一个 server
server_t *server = new_server(serverfd);
server->listener = listener;
// 将server 的 可读io事件加入 事件循环
ev_io_start(EV_A_ & server->recv_ctx->io);
}
  • static server_t * new_server(int fd)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static server_t *
new_server(int fd)
{
server_t *server;
server = ss_malloc(sizeof(server_t));

memset(server, 0, sizeof(server_t));
// 初始化server
server->recv_ctx = ss_malloc(sizeof(server_ctx_t));
server->send_ctx = ss_malloc(sizeof(server_ctx_t));
server->buf = ss_malloc(sizeof(buffer_t));
server->abuf = ss_malloc(sizeof(buffer_t));
balloc(server->buf, BUF_SIZE);
balloc(server->abuf, BUF_SIZE);
memset(server->recv_ctx, 0, sizeof(server_ctx_t));
memset(server->send_ctx, 0, sizeof(server_ctx_t));
server->stage = STAGE_INIT;
server->recv_ctx->connected = 0;
server->send_ctx->connected = 0;
server->fd = fd;
server->recv_ctx->server = server;
server->send_ctx->server = server;

// 初始化每一个连接对应的加解密相关组件
server->e_ctx = ss_align(sizeof(cipher_ctx_t));
server->d_ctx = ss_align(sizeof(cipher_ctx_t));
crypto->ctx_init(crypto->cipher, server->e_ctx, 1);
crypto->ctx_init(crypto->cipher, server->d_ctx, 0);
// 配置server io 相关处理函数
ev_io_init(&server->recv_ctx->io, server_recv_cb, fd, EV_READ);
ev_io_init(&server->send_ctx->io, server_send_cb, fd, EV_WRITE);

ev_timer_init(&server->delayed_connect_watcher,
delayed_connect_cb, 0.05, 0);
// 将当前server 加入到 链表中
cork_dllist_add(&connections, &server->entries);

return server;
}

3.读取数据 static void server_recv_cb(EV_P_ ev_io *w, int revents)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static void
server_recv_cb(EV_P_ ev_io *w, int revents)
{
server_ctx_t *server_recv_ctx = (server_ctx_t *)w;
server_t *server = server_recv_ctx->server;
remote_t *remote = server->remote;
buffer_t *buf;
ssize_t r;

ev_timer_stop(EV_A_ & server->delayed_connect_watcher);

if (remote == NULL) {
buf = server->buf;
} else {
buf = remote->buf;
}

if (revents != EV_TIMER) {
// 读取数据
r = recv(server->fd, buf->data + buf->len, BUF_SIZE - buf->len, 0);

if (r == 0) {
// connection closed
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
} else if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// no data
// continue to wait for recv
return;
} else {
if (verbose)
ERROR("server_recv_cb_recv");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
}
buf->len += r;
}
}

4.处理socks5 认证协商阶段

local.c 455 ~ 498
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
if (server->stage == STAGE_INIT) {
if (buf->len < 1)
return;
// 首先判断 socks 版本, 如不版本不是 5 则直接关闭连接,并释放相关资源
if (buf->data[0] != SVERSION) {
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
// 如果 读取到的数据长度, 不够认证协商的数据格式长度,则返回,等待下次读取
if (buf->len < sizeof(struct method_select_request)) {
return;
}
struct method_select_request *method = (struct method_select_request *)buf->data;
int method_len = method->nmethods + sizeof(struct method_select_request);
if (buf->len < method_len) {
return;
}
// ss 只支持无认证方式
struct method_select_response response;
response.ver = SVERSION;
response.method = METHOD_UNACCEPTABLE;
for (int i = 0; i < method->nmethods; i++)
if (method->methods[i] == METHOD_NOAUTH) {
response.method = METHOD_NOAUTH;
break;
}
char *send_buf = (char *)&response;
send(server->fd, send_buf, sizeof(response), 0);
// 如果没有可用的认证方式则直接关闭连接,并释放相关资源
if (response.method == METHOD_UNACCEPTABLE) {
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
// 标记下一阶段,需要处理socks 握手
server->stage = STAGE_HANDSHAKE;

if (method_len < (int)(buf->len)) {
memmove(buf->data, buf->data + method_len, buf->len - method_len);
buf->len -= method_len;
continue;
}

buf->len = 0;
return;
}

5.处理socks5握手

local.c 498 ~ 572
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 读取请求
struct socks5_request *request = (struct socks5_request *)buf->data;
size_t request_len = sizeof(struct socks5_request);
struct sockaddr_in sock_addr;
memset(&sock_addr, 0, sizeof(sock_addr));

if (buf->len < request_len) {
return;
}

int udp_assc = 0;

if (request->cmd == 3) {
udp_assc = 1;
socklen_t addr_len = sizeof(sock_addr);
getsockname(udp_fd, (struct sockaddr *)&sock_addr,
&addr_len);
if (verbose) {
LOGI("udp assc request accepted");
}
} else if (request->cmd != 1) {
// 如果cmd 不是 connect 则回复 CMD_NOT_SUPPORTED 然后关闭连接
LOGE("unsupported cmd: %d", request->cmd);
struct socks5_response response;
response.ver = SVERSION;
response.rep = CMD_NOT_SUPPORTED;
response.rsv = 0;
response.atyp = 1;
char *send_buf = (char *)&response;
send(server->fd, send_buf, 4, 0);
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}

// Fake reply
if (server->stage == STAGE_HANDSHAKE) {
// 响应 HANDSHAKE 信息
struct socks5_response response;
response.ver = SVERSION;
response.rep = 0;
response.rsv = 0;
response.atyp = 1;

buffer_t resp_to_send;
buffer_t *resp_buf = &resp_to_send;
balloc(resp_buf, BUF_SIZE);

memcpy(resp_buf->data, &response, sizeof(struct socks5_response));
memcpy(resp_buf->data + sizeof(struct socks5_response),
&sock_addr.sin_addr, sizeof(sock_addr.sin_addr));
memcpy(resp_buf->data + sizeof(struct socks5_response) +
sizeof(sock_addr.sin_addr),
&sock_addr.sin_port, sizeof(sock_addr.sin_port));

int reply_size = sizeof(struct socks5_response) +
sizeof(sock_addr.sin_addr) + sizeof(sock_addr.sin_port);

int s = send(server->fd, resp_buf->data, reply_size, 0);

bfree(resp_buf);
// 如果发生失败 则直接关闭连接
if (s < reply_size) {
LOGE("failed to send fake reply");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
if (udp_assc) {
// Wait until client closes the connection
return;
}
// 标记下一阶段 为 STAGE_PARSE
server->stage = STAGE_PARSE;
}

6.Parse阶段, 读取需要访问的真实地址和端口

local.c 574 ~ 636
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
char host[257], ip[INET6_ADDRSTRLEN], port[16];

buffer_t *abuf = server->abuf;
abuf->idx = 0;
abuf->len = 0;

abuf->data[abuf->len++] = request->atyp;
int atyp = request->atyp;

// get remote addr and port
if (atyp == 1) {
// IP V4
size_t in_addr_len = sizeof(struct in_addr);
if (buf->len < request_len + in_addr_len + 2) {
return;
}
memcpy(abuf->data + abuf->len, buf->data + request_len, in_addr_len + 2);
abuf->len += in_addr_len + 2;

if (acl || verbose) {
uint16_t p = ntohs(*(uint16_t *)(buf->data + request_len + in_addr_len));
inet_ntop(AF_INET, (const void *)(buf->data + request_len),
ip, INET_ADDRSTRLEN);
sprintf(port, "%d", p);
}
} else if (atyp == 3) {
// Domain name
uint8_t name_len = *(uint8_t *)(buf->data + request_len);
if (buf->len < request_len + 1 + name_len + 2) {
return;
}
abuf->data[abuf->len++] = name_len;
memcpy(abuf->data + abuf->len, buf->data + request_len + 1, name_len + 2);
abuf->len += name_len + 2;

if (acl || verbose) {
uint16_t p =
ntohs(*(uint16_t *)(buf->data + request_len + 1 + name_len));
memcpy(host, buf->data + request_len + 1, name_len);
host[name_len] = '\0';
sprintf(port, "%d", p);
}
} else if (atyp == 4) {
// IP V6
size_t in6_addr_len = sizeof(struct in6_addr);
if (buf->len < request_len + in6_addr_len + 2) {
return;
}
memcpy(abuf->data + abuf->len, buf->data + request_len, in6_addr_len + 2);
abuf->len += in6_addr_len + 2;

if (acl || verbose) {
uint16_t p = ntohs(*(uint16_t *)(buf->data + request_len + in6_addr_len));
inet_ntop(AF_INET6, (const void *)(buf->data + request_len),
ip, INET6_ADDRSTRLEN);
sprintf(port, "%d", p);
}
} else {
LOGE("unsupported addrtype: %d", request->atyp);
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}

7.SNI 阶段,Parse HTTP/SNI header

local.c 638 ~ 664
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
size_t abuf_len  = abuf->len;
int sni_detected = 0;
int ret = 0;

char *hostname;
uint16_t dst_port = ntohs(*(uint16_t *)(abuf->data + abuf->len - 2));

if (atyp == 1 || atyp == 4) {
if (dst_port == http_protocol->default_port)
ret = http_protocol->parse_packet(buf->data + 3 + abuf->len,
buf->len - 3 - abuf->len, &hostname);
else if (dst_port == tls_protocol->default_port)
ret = tls_protocol->parse_packet(buf->data + 3 + abuf->len,
buf->len - 3 - abuf->len, &hostname);
if (ret == -1 && buf->len < BUF_SIZE && server->stage != STAGE_SNI) {
server->stage = STAGE_SNI;
ev_timer_start(EV_A_ & server->delayed_connect_watcher);
return;
} else if (ret > 0) {
sni_detected = 1;
if (acl || verbose) {
memcpy(host, hostname, ret);
host[ret] = '\0';
}
ss_free(hostname);
}
}

8.建立ss remote连接,如果设置了ACL,需要先处理ACL

local.c 682 ~ 798
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
if (acl
#ifdef __ANDROID__
&& !(vpn && strcmp(port, "53") == 0)
#endif
) {
int bypass = 0;
int resolved = 0;
struct sockaddr_storage storage;
memset(&storage, 0, sizeof(struct sockaddr_storage));
int err;

int host_match = 0;
if (sni_detected || atyp == 3)
host_match = acl_match_host(host);

if (host_match > 0)
bypass = 1; // bypass hostnames in black list
else if (host_match < 0)
bypass = 0; // proxy hostnames in white list
else {
#ifndef __ANDROID__
if (atyp == 3) { // resolve domain so we can bypass domain with geoip
err = get_sockaddr(host, port, &storage, 0, ipv6first);
if (err != -1) {
resolved = 1;
switch (((struct sockaddr *)&storage)->sa_family) {
case AF_INET:
{
struct sockaddr_in *addr_in = (struct sockaddr_in *)&storage;
inet_ntop(AF_INET, &(addr_in->sin_addr), ip, INET_ADDRSTRLEN);
break;
}
case AF_INET6:
{
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *)&storage;
inet_ntop(AF_INET6, &(addr_in6->sin6_addr), ip, INET6_ADDRSTRLEN);
break;
}
default:
break;
}
}
}
#endif
int ip_match = acl_match_host(ip);
switch (get_acl_mode()) {
case BLACK_LIST:
if (ip_match > 0)
bypass = 1; // bypass IPs in black list
break;
case WHITE_LIST:
bypass = 1;
if (ip_match < 0)
bypass = 0; // proxy IPs in white list
break;
}
}

if (bypass) {
if (verbose) {
if (sni_detected || atyp == 3)
LOGI("bypass %s:%s", host, port);
else if (atyp == 1)
LOGI("bypass %s:%s", ip, port);
else if (atyp == 4)
LOGI("bypass [%s]:%s", ip, port);
}
#ifndef __ANDROID__
if (atyp == 3 && resolved != 1)
err = get_sockaddr(host, port, &storage, 0, ipv6first);
else
#endif
err = get_sockaddr(ip, port, &storage, 0, ipv6first);
if (err != -1) {
// 能来到这里说明,此次访问不需要使用代理,
remote = create_remote(server->listener, (struct sockaddr *)&storage);
if (remote != NULL)
// 标记为直连
remote->direct = 1;
}
}
}
// Not bypass
if (remote == NULL) {
// 创建 ss remote 连接
remote = create_remote(server->listener, NULL);
}

if (remote == NULL) {
LOGE("invalid remote addr");
close_and_free_server(EV_A_ server);
return;
}
// 如果不是直连, 则加密数据
if (!remote->direct) {
int err = crypto->encrypt(abuf, server->e_ctx, BUF_SIZE);
if (err) {
LOGE("invalid password or cipher");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
}

if (buf->len > 0) {
memcpy(remote->buf->data, buf->data, buf->len);
remote->buf->len = buf->len;
}

server->remote = remote;
remote->server = server;

if (buf->len > 0 || sni_detected) {
continue;
} else {
ev_timer_start(EV_A_ & server->delayed_connect_watcher);
}

return;

9.转发数据

local.c 303 ~ 454
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
if (remote == NULL) {
LOGE("invalid remote");
close_and_free_server(EV_A_ server);
return;
}

// insert shadowsocks header
if (!remote->direct) {
#ifdef __ANDROID__
tx += remote->buf->len;
#endif
int err = crypto->encrypt(remote->buf, server->e_ctx, BUF_SIZE);

if (err) {
LOGE("invalid password or cipher");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}

if (server->abuf) {
bprepend(remote->buf, server->abuf, BUF_SIZE);
bfree(server->abuf);
ss_free(server->abuf);
server->abuf = NULL;
}
}
// 如果remote 还没有连接,则连接 remote
if (!remote->send_ctx->connected) {
#ifdef __ANDROID__
if (vpn) {
int not_protect = 0;
if (remote->addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&remote->addr;
if (s->sin_addr.s_addr == inet_addr("127.0.0.1"))
not_protect = 1;
}
if (!not_protect) {
if (protect_socket(remote->fd) == -1) {
ERROR("protect_socket");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
}
}
#endif

remote->buf->idx = 0;

if (!fast_open || remote->direct) {
// connecting, wait until connected
int r = connect(remote->fd, (struct sockaddr *)&(remote->addr), remote->addr_len);

if (r == -1 && errno != CONNECT_IN_PROGRESS) {
ERROR("connect");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}

// wait on remote connected event
ev_io_stop(EV_A_ & server_recv_ctx->io);
ev_io_start(EV_A_ & remote->send_ctx->io);
ev_timer_start(EV_A_ & remote->send_ctx->watcher);
} else {
int s = -1;
#if defined(MSG_FASTOPEN) && !defined(TCP_FASTOPEN_CONNECT)
s = sendto(remote->fd, remote->buf->data, remote->buf->len, MSG_FASTOPEN,
(struct sockaddr *)&(remote->addr), remote->addr_len);
#else
#if defined(CONNECT_DATA_IDEMPOTENT)
((struct sockaddr_in *)&(remote->addr))->sin_len = sizeof(struct sockaddr_in);
sa_endpoints_t endpoints;
memset((char *)&endpoints, 0, sizeof(endpoints));
endpoints.sae_dstaddr = (struct sockaddr *)&(remote->addr);
endpoints.sae_dstaddrlen = remote->addr_len;

s = connectx(remote->fd, &endpoints, SAE_ASSOCID_ANY,
CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT,
NULL, 0, NULL, NULL);
#elif defined(TCP_FASTOPEN_CONNECT)
int optval = 1;
if(setsockopt(remote->fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT,
(void *)&optval, sizeof(optval)) < 0)
FATAL("failed to set TCP_FASTOPEN_CONNECT");
s = connect(remote->fd, (struct sockaddr *)&(remote->addr), remote->addr_len);
#else
FATAL("fast open is not enabled in this build");
#endif
if (s == 0)
s = send(remote->fd, remote->buf->data, remote->buf->len, 0);
#endif
if (s == -1) {
if (errno == CONNECT_IN_PROGRESS) {
// in progress, wait until connected
remote->buf->idx = 0;
ev_io_stop(EV_A_ & server_recv_ctx->io);
ev_io_start(EV_A_ & remote->send_ctx->io);
return;
} else {
if (errno == EOPNOTSUPP || errno == EPROTONOSUPPORT ||
errno == ENOPROTOOPT) {
LOGE("fast open is not supported on this platform");
// just turn it off
fast_open = 0;
} else {
ERROR("fast_open_connect");
}
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
} else {
remote->buf->len -= s;
remote->buf->idx = s;

ev_io_stop(EV_A_ & server_recv_ctx->io);
ev_io_start(EV_A_ & remote->send_ctx->io);
ev_timer_start(EV_A_ & remote->send_ctx->watcher);
return;
}
}
} else { // 如果已经连接了则直接转发数据
int s = send(remote->fd, remote->buf->data, remote->buf->len, 0);
if (s == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// no data, wait for send
remote->buf->idx = 0;
ev_io_stop(EV_A_ & server_recv_ctx->io);
ev_io_start(EV_A_ & remote->send_ctx->io);
return;
} else {
ERROR("server_recv_cb_send");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
} else if (s < (int)(remote->buf->len)) {
remote->buf->len -= s;
remote->buf->idx = s;
ev_io_stop(EV_A_ & server_recv_ctx->io);
ev_io_start(EV_A_ & remote->send_ctx->io);
return;
} else {
remote->buf->idx = 0;
remote->buf->len = 0;
}
}

// all processed
return;
local.c 948 ~ 1010
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 当remote连接 可写时,将加密后的数据发送到remote
static void
remote_send_cb(EV_P_ ev_io *w, int revents)
{
remote_ctx_t *remote_send_ctx = (remote_ctx_t *)w;
remote_t *remote = remote_send_ctx->remote;
server_t *server = remote->server;

if (!remote_send_ctx->connected) {
struct sockaddr_storage addr;
socklen_t len = sizeof addr;
int r = getpeername(remote->fd, (struct sockaddr *)&addr, &len);
if (r == 0) {
remote_send_ctx->connected = 1;
ev_timer_stop(EV_A_ & remote_send_ctx->watcher);
ev_timer_start(EV_A_ & remote->recv_ctx->watcher);
ev_io_start(EV_A_ & remote->recv_ctx->io);

// no need to send any data
if (remote->buf->len == 0) {
ev_io_stop(EV_A_ & remote_send_ctx->io);
ev_io_start(EV_A_ & server->recv_ctx->io);
return;
}
} else {
// not connected
ERROR("getpeername");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
}

if (remote->buf->len == 0) {
// close and free
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
} else {
// has data to send
ssize_t s = send(remote->fd, remote->buf->data + remote->buf->idx,
remote->buf->len, 0);
if (s == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
ERROR("remote_send_cb_send");
// close and free
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
}
return;
} else if (s < (ssize_t)(remote->buf->len)) {
// partly sent, move memory, wait for the next time to send
remote->buf->len -= s;
remote->buf->idx += s;
return;
} else {
// all sent out, wait for reading
remote->buf->len = 0;
remote->buf->idx = 0;
ev_io_stop(EV_A_ & remote_send_ctx->io);
ev_io_start(EV_A_ & server->recv_ctx->io);
}
}
}
local.c 871 ~ 946
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 当remote返回数据后,如果不是直连则需要先解密数据
static void
remote_recv_cb(EV_P_ ev_io *w, int revents)
{
remote_ctx_t *remote_recv_ctx = (remote_ctx_t *)w;
remote_t *remote = remote_recv_ctx->remote;
server_t *server = remote->server;

ev_timer_again(EV_A_ & remote->recv_ctx->watcher);

ssize_t r = recv(remote->fd, server->buf->data, BUF_SIZE, 0);

if (r == 0) {
// connection closed
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
} else if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// no data
// continue to wait for recv
return;
} else {
ERROR("remote_recv_cb_recv");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
}

server->buf->len = r;

if (!remote->direct) {
#ifdef __ANDROID__
rx += server->buf->len;
stat_update_cb();
#endif
int err = crypto->decrypt(server->buf, server->d_ctx, BUF_SIZE);
if (err == CRYPTO_ERROR) {
LOGE("invalid password or cipher");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
} else if (err == CRYPTO_NEED_MORE) {
return; // Wait for more
}
}

int s = send(server->fd, server->buf->data, server->buf->len, 0);

if (s == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// no data, wait for send
server->buf->idx = 0;
ev_io_stop(EV_A_ & remote_recv_ctx->io);
ev_io_start(EV_A_ & server->send_ctx->io);
} else {
ERROR("remote_recv_cb_send");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
}
} else if (s < (int)(server->buf->len)) {
server->buf->len -= s;
server->buf->idx = s;
ev_io_stop(EV_A_ & remote_recv_ctx->io);
ev_io_start(EV_A_ & server->send_ctx->io);
}

// Disable TCP_NODELAY after the first response are sent
if (!remote->recv_ctx->connected && !no_delay) {
int opt = 0;
setsockopt(server->fd, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt));
setsockopt(remote->fd, SOL_TCP, TCP_NODELAY, &opt, sizeof(opt));
}
remote->recv_ctx->connected = 1;
}
local.c 803 ~ 839
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 当本地连接可写时,将remote接受到的数据,发送到本地连接
static void
server_send_cb(EV_P_ ev_io *w, int revents)
{
server_ctx_t *server_send_ctx = (server_ctx_t *)w;
server_t *server = server_send_ctx->server;
remote_t *remote = server->remote;
if (server->buf->len == 0) {
// close and free
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
return;
} else {
// has data to send
ssize_t s = send(server->fd, server->buf->data + server->buf->idx,
server->buf->len, 0);
if (s == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
ERROR("server_send_cb_send");
close_and_free_remote(EV_A_ remote);
close_and_free_server(EV_A_ server);
}
return;
} else if (s < (ssize_t)(server->buf->len)) {
// partly sent, move memory, wait for the next time to send
server->buf->len -= s;
server->buf->idx += s;
return;
} else {
// all sent out, wait for reading
server->buf->len = 0;
server->buf->idx = 0;
ev_io_stop(EV_A_ & server_send_ctx->io);
ev_io_start(EV_A_ & remote->recv_ctx->io);
return;
}
}
}
-------------本文结束感谢您的阅读-------------