0x01queue
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
基本用法
q = queue.Queue()
初始化队列
q.put(item[, block[, timeout]])
将参数item加入队列中,如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常。
q.get([block[, timeout]])
取出队列中的排第一的参数,block跟timeout参数同put(),其非阻塞方法为`get_nowait()`相当与get(False)
q.empty()
检测队列是否为空
0x02socket
什么是TCP/IP、UDP?
TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,是一个工业标准的协议集,它是为广域网(WANs)设计的。
UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是属于TCP/IP协议族中的一种。
TCP三次握手协议
- 第一次握手:客户端发送syn包(seq=x)到服务器,并进入SYN_SEND状态,等待服务器确认
- 第二次握手:服务器收到syn包,必须确认客户的客户SYN(ack=x+1),同时自己也发送一个SYN包(seq=y),即SYM+ACK包,此时服务器进入SYN_RECV状态。
- 第三次握手客户端收到服务器的SYN+ACK包,向服务器发送确认包ACK(ack=y+1),此包发送完毕,客户端和服务端进入ESTABLISHED状态
一个简单的单个socket服务
TCP协议
server端
import socket
sockets = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.bind(address) # 使用监听地址
sockets.listen(5) # 设置最大连接数
while True:
conn,addres = sockets.accept() # 接受连接并返回,其中conn是新的套接字对象,可以用来接受和发送数据
while True:
data = conn.recv(1024)
conn.send(data)
print(data.decode('utf-8'))
if data == '':
conn.close()
break
sockets.close()
client
import socket
sockets = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.connect(address)
while True:
sens = input('>>>')
sockets.send(sens.encode('gbk'))
data = sockets.recv(1024)
print(data.decode('utf-8'))
sockets.close()
UDP协议
server
host = 'localhost'
port = 4668
address = (host,port)
sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sockets.bind(address) # 使用监听地址
while True:
data,addres = sockets.recvfrom(1024) # 接受连接并返回,其中conn是新的套接字对象,可以用来接受和发送数据
print(data)
sockets.sendto(data,address)
sockets.close()
client
import socket
sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.connect(address)
while True:
sens = input('>>>')
sockets.sendall(sens.encode('gbk'))
data = sockets.recv(1024)
print(data.decode('utf-8'))
sockets.close()
0x03多路复用
select
IO多路复用是IO模式的一种,是一种单线程处理多并发的IO操作的方案,其他IO操作方案分别有:
- 阻塞I/O(blocking IO)
- 非阻塞I/O(noblocking IO)
- I/O多路复用(IO multiplexing)
- 异步I/O(asynchronous IO)
socket server 监听
import select
import socket
import queue
# 创建socket连接
server = socket.socket()
server.bind(('localhost',9005))
server.listen(10)
# 设置非阻塞模式
server.setblocking(False)
# key 为接受消息对象实例,
mes_dic = {}
# 检测连接的列表
inputs = [server,]
print('waiting ...')
# 返回上依此数据列表
outputs = []
print(select.select(inputs,outputs, inputs))
while True:
readable,writeable,exceptional = select.select(inputs, outputs, inputs)
# 把inputs, outputs, exceptional(跟inputs公用)传给select后返回3个新的list:readable,writeable,exceptional.
# 所有在readable list中的socket连接代表有数据可接收(recv)
# 所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接
# 当连接通信出现error时会把error写到exceptional列表中
for r in readable:
# 一个r为一个socket
if r is server:
conn,addr = server.accept()
inputs.append(conn)
# 因为新建立的连接还没发数据过来,现在就接受的话程序就报错
# 所以要想实现这个客户端发数据时server端能知道,就需要让select再监测
# 初始化一个队列,后面存要返回给这个客户端的数据
mes_dic[conn] = queue.Queue()
else:
# 接受数据报表
data = r.recv(1024)
mes_dic[r].put(data)
outputs.append(r)
# 要返回给客户端得连接列表
# 如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端
# 否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接
for w in writeable:
data_to_client = mes_dic[w].get()
print(data_to_client)
data_to_client+b'''
Hello word!
IO test!
'''
w.send(data_to_client)
outputs.remove(w)
# 如果在跟某个socket端出现错误,那么把这个连接端
# 把这个连接对象在inputs\outputs\message_queue中删除,并关闭这个连接
for e in exceptional:
if e in outputs:
outputs.remove(e)
e.close()
inputs.remove(e)
del mes_dic[e]
select相对于原生的socket连接相比可以同时接收多个消息,单个socket一次只能处理一个socket通信,而select可以处理多个,但必须在一个端口下
缺点:
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多的时候会很大
- 单个进程能够监视的fd数量存在最大限制,在linux上默认为1024(可以通过修改宏定义或者重新编译内核的方式提升这个限制)
- 并且由于select的fd是放在数组中,并且每次都要线性遍历整个数组,当fd很多的时候,开销也很大
poll
poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或主动超时,被唤醒后它又要再次遍历fd。
因为poll是基于链表进行储存,所以没有最大的连接数的限制
setsockopt(level,optname,value)
这篇博客说得很详细
level定义了哪个选项将被使用。通常情况下是SOL_SOCKET,意思是正在使用的socket选项。它还可以通过设置一个特殊协议号码来设置协议选项,然而对于一个给定的操作系统,大多数协议选项都是明确的,所以为了简便,它们很少用于为移动设备设计的应用程序。optname参数提供使用的特殊选项。关于可用选项的设置,会因为操作系统的不同而有少许不同
如果level选定了SOL_SOCKET,那么一些常用的选项见下表:
选项 |
意义 |
期望值 |
SO_BINDTODEVICE |
可以使socket只在某个特殊的网络接口(网卡)有效。也许不能是移动便携设备 |
一个字符串给出设备的名称或者一个空字符串返回默认值 |
SO_BROADCAST |
允许广播地址发送和接收信息包。只对UDP有效。如何发送和接收广播信息包 |
布尔型整数 |
SO_DONTROUTE |
禁止通过路由器和网关往外发送信息包。这主要是为了安全而用在以太网上UDP通信的一种方法。不管目的地址使用什么IP地址,都可以防止数据离开本地网络 |
布尔型整数 |
SO_KEEPALIVE |
可以使TCP通信的信息包保持连续性。这些信息包可以在没有信息传输的时候,使通信的双方确定连接是保持的 |
布尔型整数 |
SO_OOBINLINE |
可以把收到的不正常数据看成是正常的数据,也就是说会通过一个标准的对recv()的调用来接收这些数据 |
布尔型整数 |
SO_REUSEADDR |
当socket关闭后,本地端用于该socket的端口号立刻就可以被重用。通常来说,只有经过系统定义一段时间后,才能被重用。 |
布尔型整数 |
poll.register(fd,[,mask])
中,mask的种类为
参数 | 意义 |
---|---|
POLLIN | 有数据读取 |
POLLPRT | 有数据紧急读取 |
POLLOUT | 准备输出:输出不会阻塞 |
POLLERR | 某些错误情况出现 |
POLLHUP | 挂起 |
POLLNVAL | 无效请求:描述无法打开 |
server端的代码如下
import socket
import select
response = b"io test"
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置堵塞
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('127.0.0.1', 4669))
serversocket.listen(5)
serversocket.setblocking(False)
# 返回一个poll的对象,支持注册和注销文件描述符。
poll = select.poll()
# 注册一个文件描述符,注册后可以通过poll()方法来检查是否有对应的I/O事件发生。
# fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。
# 创建数据读取的连接
poll.register(serversocket.fileno(), select.POLLIN)
inputs = {}
while True:
for fd, event in poll.poll():
if event == select.POLLIN:
if fd == serversocket.fileno():
# 建立新的连接数
con, addr = serversocket.accept()
poll.register(con.fileno(), select.POLLIN)
inputs[con.fileno()] = con
else:
con = inputs[fd]
data = con.recv(1024)
if data:
poll.modify(con.fileno(), select.POLLOUT)
elif event == select.POLLOUT:
con = inputs[fd]
con.send(response)
poll.unregister(con.fileno())
con.close()
此代码需要在linux下运行
epoll
epoll是在2.6内核提出,是之前的select和poll的增强版本,相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个问题描述符管理多个描述符,将用户关系的文件描述符事件存放到内核的一个事件中,这样在用户空间和内核空间空间的copy只需要一次即可
优点
- 没有最大并发连接数的限制,能打开的fd的上线远大于2014(1G的内存能监听约10万个端口)
- 效率提升,不是轮询的方式,不会随着fd的数目增加效率下降,只有活跃可用的fd才会调用callback函数
select.epoll(sizeint = -1,flags=0)
返回一个epoll对象,evenmask变量
事件常量 | 意义 |
---|---|
EPOLLIN | 读就绪 |
EPOLLOUT | 写就绪 |
EPOLLPRI | 有数据紧急读取 |
EPOLLERR | assoc. fd有错误情况发生 |
EPOLLHUP | assoc. fd发生挂起 |
EPOLLRT | 设置边缘触发(ET)(默认的是水平触发) |
EPOLLONESHOT | 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用 |
EPOLLRDNORM | 和 EPOLLIN 相等 |
EPOLLRDBAND | 优先读取的数据带(data band) |
EPOLLWRNORM | 和 EPOLLOUT 相等 |
EPOLLWRBAND | 优先写的数据带(data band) |
EPOLLMSG | 忽视 |
epoll.
close
() 关闭一个epoll的fd
epoll.
closed
检测epoll对象是否关闭
epoll.
fileno
() 返回epoll的fd
epoll.
fromfd
(fd) 通过fd创建一个epoll
epoll.
register
(fd[, eventmask]) 使用fd注册一个epoll对象
epoll.
modify
(fd, eventmask) 修改一个注册epoll的fd
epoll.
unregister
(fd) 移除该fd的epoll对象
import socket
import select
import Queue
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ("127.0.0.1", 9000)
serversocket.bind(address)
serversocket.listen(5)
#服务端设置非阻塞
serversocket.setblocking(False)
#超时时间
timeout = 10
#创建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#注册服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
#保存连接客户端消息的字典,格式为{}
message_queues = {}
#文件句柄到所对应对象的字典,格式为{句柄:对象}
fd_to_socket = {serversocket.fileno():serversocket,}
while True:
#轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
events = epoll.poll(timeout)
if not events:
continue
for fd, event in events:
socket = fd_to_socket[fd]
#如果活动socket为当前服务器socket,表示有新连接
if socket == serversocket:
# 进行新的连接
connection, address = serversocket.accept()
connection.setblocking(False)
#注册新连接fd到待读事件集合
epoll.register(connection.fileno(), select.EPOLLIN)
#把新连接的文件句柄以及对象保存到字典
fd_to_socket[connection.fileno()] = connection
message_queues[connection] = Queue.Queue()
#关闭事件
elif event & select.EPOLLHUP:
epoll.unregister(fd)
fd_to_socket[fd].close()
del fd_to_socket[fd]
#可读事件
elif event & select.EPOLLIN:
#接收数据
data = socket.recv(1024)
if data:
#将数据放入对应客户端的字典
message_queues[socket].put(data)
#修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
epoll.modify(fd, select.EPOLLOUT)
#可写事件
elif event & select.EPOLLOUT:
try:
#从字典中获取对应客户端的信息
msg = message_queues[socket].get_nowait()
except Queue.Empty:
#修改文件句柄为读事件
epoll.modify(fd, select.EPOLLIN)
else :
socket.send(msg)
#在epoll中注销服务端文件句柄
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
FD:(File Discriptor) 针对socket通信,在使用网卡时,网卡需要处理N多连接,每个连接都需要一个对应的描述,也就是唯一ID,即对应文件描述符