scoket用法

科技资讯 投稿 6100 0 评论

scoket用法

一.scoket基本介绍

1.scoket简介(以下是来自chatgpt回答

      使得应用程序可以利用网络传输层提供的服务(如TCP或UDP)进行通信。

      到网络上的另一个应用程序,并从网络上的另一个应用程序接收数据。

     常见的网络应用程序,如Web服务器、邮件服务器、FTP服务器等都使用Socket技术。此外,许多P2P

      程序,如网络游戏、聊天程序等。

2.建立TCP/IP连接

socket_stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM

第一个参数表示通信协议类型

AF_UNIX:适用于本地进程间通信

SOCK_STREAM:表示tcp/ip连线

2)与服务端建立连接

# 建立连接 ip_port = ("ip", "port" # 连接的服务端ip和端口 socket_stream.connect(ip_port # 建立连接,出错不返回错误码 socket_stream.connect_ex(ip_port # 出错时返回错误码,不抛异常

connect(和connect_ex(选一个即可

# 发送数据
data = 'hello'
socket_stream.send(data.encode("utf-8"  # 发送TCP数据,当send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完
# or
socket_stream.sendall(data.encode("utf-8"  # 发送完整的TCP数据(循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完

当发送的数据较大时建议使用sendall发送

# 接收TCP数据。1024(变量表示每次最多接受1M字节的数据,recv(函数是一个阻塞函数,没有要接收的数据时,会一直等待,直到接收到数据或出现错误才会返回
response_data = socket_stream.recv(1024

5)关闭连接

# 关闭连接 socket_stream.close(

3.建立UDP连接

1)创建scoket对象

client_dgram = socket.socket(socket.AF_INET, socket.SOCK_DGRAM

2)发送数据

# 发送数据 ip_port = ('127.0.0.1', 8080 data = 'Hello' client_dgram.sendto(data.encode("utf-8", ip_port

这里发送用的是sendto(方法

# 接收UDP数据数据
data = client_dgram.recvfrom(4096

4)关闭连接

# 关闭连接 client_dgram.close(

二.使用scoket编写聊天程序

以下部分代码参考:https://www.yuque.com/imhelloworld/nov9az/bffcea259d3c96fb17a130acebc12801

1)server

import socket import threading # 服务器IP地址和端口号 SERVER_IP = 'IP地址' SERVER_PORT = 端口号 # 创建一个socket对象 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM # 绑定IP地址和端口号 server_socket.bind((SERVER_IP, SERVER_PORT # 设置最大连接数,超过10后拒绝。0的话表示不接受连接,直接拒绝 server_socket.listen(10 # 客户端列表 clients = [] def handle_client(client_socket: while True: try: # 接收客户端发送的消息 message = client_socket.recv(10240.decode('utf-8' print("打印", message if message == 'quert': client_socket.send(message.encode('utf-8' else: # 广播消息给所有客户端 for c in clients: if c != client_socket: c.send(message.encode('utf-8' except: # 发生异常时关闭连接 client_socket.close( clients.remove(client_socket print("发送异常了" break # 循环监听客户端连接 while True: print('Waiting for client connections...' # 接受客户端连接请求,新的客户端请求时创建一个新的socket,用于处理客户端的请求,原有的socket继续监听其他客户端的连接请求; # 函数是一个阻塞函数,当没有客户端连接请求时,会一直等待,直到有客户端连接请求到达 client_socket, client_address = server_socket.accept( print('客户socket', client_socket, 'ip和端口', client_address clients.append(client_socket # 创建一个新的线程处理客户端连接 client_thread = threading.Thread(target=handle_client, args=(client_socket, client_thread.start(

2)click

import socket import threading import time # 服务器IP地址和端口号 SERVER_IP = 'IP地址' SERVER_PORT = 端口号 # 创建一个socket对象 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM # 连接服务器 client_socket.connect((SERVER_IP, SERVER_PORT def receive(: while True: try: # 接收服务器发送的消息 message = client_socket.recv(1024.decode('utf-8' if message == 'quert': client_socket.close( break print('\n对方回答:', message except: # 发生异常时关闭连接 client_socket.close( break def send(: while True: # 获取用户输入的消息 message = input( if message == 'quert': client_socket.close( print('连接断开,通信结束' break else: # 发送消息给服务器 client_socket.send(message.encode('utf-8' # 创建两个线程,分别用于接收和发送消息 receive_thread = threading.Thread(target=receive send_thread = threading.Thread(target=send # 启动线程 receive_thread.start( send_thread.start(

这里需要注意的是,既然是聊天程序,那么肯定涉及两个click,两个click代码一致即可,运行时先运行server,后运行click,之后就可以在Terminal下建立对话了。

三.scoket+chatgpt建立搜索对话框

def api(params, tokened="token":
    url = "https://api.aigcfun.com/api/v1/text?key=" + tokened
    data = {"messages": [{"role": "system", "content": "请以markdown的形式返回答案"},
                         {"role": "user", "content": params}], "tokensLength": 32, "model": "gpt-3.5-turbo"}
    response_data = requests.post(url=url, json=data
    return response_data.json(["choices"][0]["text"]

params为用户问的问题,return返回的是chatgpt回答的答案

def receive(:
    while True:
        try:
            # 接收服务器发送的消息
            message = client_socket.recv(1024.decode('utf-8'
            data_message = api(message
            while data_message == None:  # 由于chatgpt有时响应的比较慢,这里需添加个循环判断
                continue
            # if data_message == '您今日使用次数已达上限,请明日再试!':
            #     data_message = api(params=message, tokened=token(

            client_socket.send(data_message.encode('utf-8'  # 发送chatgpt回答的问题给服务端
        except:
            # 发生异常时关闭连接
            client_socket.close(
            break

3)目前chatpgt虽是免费的,但每天都有次数限制,所以可以请求获取token的接口来获取新的token。这里就不做演示了(hhhhhh.........)

 

编程笔记 » scoket用法

赞同 (31) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽