python select模块学习

  python

0x01queue

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递

基本用法

q = queue.Queue() 初始化队列

q.put(item[, block[, timeout]]) 将参数item加入队列中,如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常。

q.get([block[, timeout]])  取出队列中的排第一的参数,block跟timeout参数同put(),其非阻塞方法为`get_nowait()`相当与get(False)

q.empty() 检测队列是否为空

0x02socket

什么是TCP/IP、UDP?

   TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,是一个工业标准的协议集,它是为广域网(WANs)设计的。
   UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是属于TCP/IP协议族中的一种。

TCP三次握手协议

  • 第一次握手:客户端发送syn包(seq=x)到服务器,并进入SYN_SEND状态,等待服务器确认
  • 第二次握手:服务器收到syn包,必须确认客户的客户SYN(ack=x+1),同时自己也发送一个SYN包(seq=y),即SYM+ACK包,此时服务器进入SYN_RECV状态。
  • 第三次握手客户端收到服务器的SYN+ACK包,向服务器发送确认包ACK(ack=y+1),此包发送完毕,客户端和服务端进入ESTABLISHED状态

一个简单的单个socket服务

TCP协议

server端

import socket

sockets = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.bind(address)  # 使用监听地址
sockets.listen(5)      # 设置最大连接数

while True:
	conn,addres = sockets.accept()   # 接受连接并返回,其中conn是新的套接字对象,可以用来接受和发送数据
	while True:
		data = conn.recv(1024)
		conn.send(data)
		print(data.decode('utf-8'))
		if data == '':
			conn.close()
			break

sockets.close()

client

import socket

sockets = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.connect(address)

while True:
	sens = input('>>>')
	sockets.send(sens.encode('gbk'))
	data = sockets.recv(1024)
	print(data.decode('utf-8'))

sockets.close()

UDP协议

server

host = 'localhost'
port = 4668
address = (host,port)
sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sockets.bind(address)  # 使用监听地址


while True:
	data,addres = sockets.recvfrom(1024)   # 接受连接并返回,其中conn是新的套接字对象,可以用来接受和发送数据
	print(data)
	sockets.sendto(data,address)

sockets.close()

client

import socket

sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
host = 'localhost'
port = 4668
address = (host,port)
sockets.connect(address)

while True:
	sens = input('>>>')
	sockets.sendall(sens.encode('gbk'))
	data = sockets.recv(1024)
	print(data.decode('utf-8'))

sockets.close()

 

0x03多路复用

select

IO多路复用是IO模式的一种,是一种单线程处理多并发的IO操作的方案,其他IO操作方案分别有:

  • 阻塞I/O(blocking IO)
  • 非阻塞I/O(noblocking IO)
  • I/O多路复用(IO multiplexing)
  • 异步I/O(asynchronous IO)

socket server 监听

import select
import socket
import queue

# 创建socket连接
server = socket.socket()
server.bind(('localhost',9005))
server.listen(10)

# 设置非阻塞模式
server.setblocking(False)


# key 为接受消息对象实例,
mes_dic = {}

# 检测连接的列表
inputs = [server,]
print('waiting ...')

# 返回上依此数据列表

outputs = []
print(select.select(inputs,outputs, inputs))
while  True:
	readable,writeable,exceptional = select.select(inputs, outputs, inputs)
	# 把inputs, outputs, exceptional(跟inputs公用)传给select后返回3个新的list:readable,writeable,exceptional.
	# 所有在readable list中的socket连接代表有数据可接收(recv)
	# 所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接
	# 当连接通信出现error时会把error写到exceptional列表中
	for r in readable:
		# 一个r为一个socket
		if r is server:
			conn,addr = server.accept()
			inputs.append(conn)
			# 因为新建立的连接还没发数据过来,现在就接受的话程序就报错
            # 所以要想实现这个客户端发数据时server端能知道,就需要让select再监测
            # 初始化一个队列,后面存要返回给这个客户端的数据
			mes_dic[conn] = queue.Queue()

		else:
			# 接受数据报表
			data = r.recv(1024)
			mes_dic[r].put(data)
			outputs.append(r)
    # 要返回给客户端得连接列表
    # 如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端
    # 否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接
	for w in writeable:
		data_to_client = mes_dic[w].get()
		print(data_to_client)
		data_to_client+b'''
		Hello word! 
		IO test!
		'''
		w.send(data_to_client)
		outputs.remove(w)
	# 如果在跟某个socket端出现错误,那么把这个连接端
	# 把这个连接对象在inputs\outputs\message_queue中删除,并关闭这个连接
	for e in exceptional:
		if e in outputs:
			outputs.remove(e)
		e.close()
		inputs.remove(e)
		del mes_dic[e]

select相对于原生的socket连接相比可以同时接收多个消息,单个socket一次只能处理一个socket通信,而select可以处理多个,但必须在一个端口下

缺点:

  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多的时候会很大
  • 单个进程能够监视的fd数量存在最大限制,在linux上默认为1024(可以通过修改宏定义或者重新编译内核的方式提升这个限制)
  • 并且由于select的fd是放在数组中,并且每次都要线性遍历整个数组,当fd很多的时候,开销也很大

poll

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或主动超时,被唤醒后它又要再次遍历fd。

因为poll是基于链表进行储存,所以没有最大的连接数的限制

setsockopt(level,optname,value)

这篇博客说得很详细

level定义了哪个选项将被使用。通常情况下是SOL_SOCKET,意思是正在使用的socket选项。它还可以通过设置一个特殊协议号码来设置协议选项,然而对于一个给定的操作系统,大多数协议选项都是明确的,所以为了简便,它们很少用于为移动设备设计的应用程序。optname参数提供使用的特殊选项。关于可用选项的设置,会因为操作系统的不同而有少许不同

如果level选定了SOL_SOCKET,那么一些常用的选项见下表:

 

选项

意义

期望值

SO_BINDTODEVICE

可以使socket只在某个特殊的网络接口(网卡)有效。也许不能是移动便携设备

一个字符串给出设备的名称或者一个空字符串返回默认值

SO_BROADCAST

允许广播地址发送和接收信息包。只对UDP有效。如何发送和接收广播信息包

布尔型整数

SO_DONTROUTE

禁止通过路由器和网关往外发送信息包。这主要是为了安全而用在以太网上UDP通信的一种方法。不管目的地址使用什么IP地址,都可以防止数据离开本地网络

布尔型整数

SO_KEEPALIVE

可以使TCP通信的信息包保持连续性。这些信息包可以在没有信息传输的时候,使通信的双方确定连接是保持的

布尔型整数

SO_OOBINLINE

可以把收到的不正常数据看成是正常的数据,也就是说会通过一个标准的对recv()的调用来接收这些数据

布尔型整数

SO_REUSEADDR

当socket关闭后,本地端用于该socket的端口号立刻就可以被重用。通常来说,只有经过系统定义一段时间后,才能被重用。

布尔型整数

 

poll.register(fd,[,mask]) 中,mask的种类为

参数 意义
POLLIN 有数据读取
POLLPRT 有数据紧急读取
POLLOUT 准备输出:输出不会阻塞
POLLERR 某些错误情况出现
POLLHUP 挂起
POLLNVAL 无效请求:描述无法打开

server端的代码如下

import socket
import select

response = b"io test"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置堵塞
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('127.0.0.1', 4669))
serversocket.listen(5)
serversocket.setblocking(False)

# 返回一个poll的对象,支持注册和注销文件描述符。
poll = select.poll()

# 注册一个文件描述符,注册后可以通过poll()方法来检查是否有对应的I/O事件发生。
# fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。
# 创建数据读取的连接
poll.register(serversocket.fileno(), select.POLLIN)

inputs = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                # 建立新的连接数
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                inputs[con.fileno()] = con
            else:
                con = inputs[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = inputs[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()

此代码需要在linux下运行

epoll

epoll是在2.6内核提出,是之前的select和poll的增强版本,相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个问题描述符管理多个描述符,将用户关系的文件描述符事件存放到内核的一个事件中,这样在用户空间和内核空间空间的copy只需要一次即可

优点

  1. 没有最大并发连接数的限制,能打开的fd的上线远大于2014(1G的内存能监听约10万个端口)
  2. 效率提升,不是轮询的方式,不会随着fd的数目增加效率下降,只有活跃可用的fd才会调用callback函数

select.epoll(sizeint = -1,flags=0) 返回一个epoll对象,evenmask变量

事件常量 意义
EPOLLIN 读就绪
EPOLLOUT 写就绪
EPOLLPRI 有数据紧急读取
EPOLLERR assoc. fd有错误情况发生
EPOLLHUP assoc. fd发生挂起
EPOLLRT 设置边缘触发(ET)(默认的是水平触发)
EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用
EPOLLRDNORM 和 EPOLLIN 相等
EPOLLRDBAND 优先读取的数据带(data band)
EPOLLWRNORM 和 EPOLLOUT 相等
EPOLLWRBAND 优先写的数据带(data band)
EPOLLMSG 忽视

epoll.close()  关闭一个epoll的fd

epoll.closed 检测epoll对象是否关闭

epoll.fileno() 返回epoll的fd

epoll.fromfd(fd) 通过fd创建一个epoll

epoll.register(fd[, eventmask])  使用fd注册一个epoll对象

epoll.modify(fdeventmask) 修改一个注册epoll的fd

epoll.unregister(fd) 移除该fd的epoll对象

import socket
import select
import Queue


serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ("127.0.0.1", 9000)
serversocket.bind(address)
serversocket.listen(5)

#服务端设置非阻塞
serversocket.setblocking(False)  

#超时时间
timeout = 10
#创建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#注册服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
#保存连接客户端消息的字典,格式为{}
message_queues = {}
#文件句柄到所对应对象的字典,格式为{句柄:对象}
fd_to_socket = {serversocket.fileno():serversocket,}

while True:
  #轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
  events = epoll.poll(timeout)
  if not events:
     continue
  for fd, event in events:
     socket = fd_to_socket[fd]
     #如果活动socket为当前服务器socket,表示有新连接
     if socket == serversocket:
            # 进行新的连接
            connection, address = serversocket.accept()
            connection.setblocking(False)
            #注册新连接fd到待读事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            #把新连接的文件句柄以及对象保存到字典
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection]  = Queue.Queue()
     #关闭事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
     #可读事件
     elif event & select.EPOLLIN:
        #接收数据
        data = socket.recv(1024)
        if data:
           #将数据放入对应客户端的字典
           message_queues[socket].put(data)
           #修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
           epoll.modify(fd, select.EPOLLOUT)
     #可写事件
     elif event & select.EPOLLOUT:
        try:
           #从字典中获取对应客户端的信息
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           #修改文件句柄为读事件
           epoll.modify(fd, select.EPOLLIN)
        else :
           socket.send(msg)


#在epoll中注销服务端文件句柄
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

 

FD:(File Discriptor) 针对socket通信,在使用网卡时,网卡需要处理N多连接,每个连接都需要一个对应的描述,也就是唯一ID,即对应文件描述符

Reference