python 3.5+ aiohttp 2.3.2+
- Fixed original library to use in 2021 onwards
- SOCKS4, SOCKS4a and SOCKS5 version
- ProxyConnector for aiohttp
- SOCKS "CONNECT" command
- UDP associate
- TCP port binding
You can install it using Pip:
pip install aiosocks2
If you want the latest development version, you can install it from source:
git clone git@github.com:ultrafunkamsterdam/aiosocks2.git cd aiosocks2 python setup.py install
import asyncio
import aiosocks2
async def connect():
socks5_addr = aiosocks2.Socks5Addr('127.0.0.1', 1080)
socks4_addr = aiosocks2.Socks4Addr('127.0.0.1', 1080)
socks5_auth = aiosocks2.Socks5Auth('login', 'pwd')
socks4_auth = aiosocks2.Socks4Auth('ident')
dst = ('github.com', 80)
# socks5 connect
transport, protocol = await aiosocks2.create_connection(
lambda: Protocol, proxy=socks5_addr, proxy_auth=socks5_auth, dst=dst)
# socks4 connect
transport, protocol = await aiosocks2.create_connection(
lambda: Protocol, proxy=socks4_addr, proxy_auth=socks4_auth, dst=dst)
# socks4 without auth and local domain name resolving
transport, protocol = await aiosocks2.create_connection(
lambda: Protocol, proxy=socks4_addr, proxy_auth=None, dst=dst, remote_resolve=False)
# use socks protocol
transport, protocol = await aiosocks2.create_connection(
None, proxy=socks4_addr, proxy_auth=None, dst=dst)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.close()
A wrapper for create_connection() returning a (reader, writer) pair
# StreamReader, StreamWriter
reader, writer = await aiosocks2.open_connection(
proxy=socks5_addr, proxy_auth=socks5_auth, dst=dst, remote_resolve=True)
data = await reader.read(10)
writer.write('data')
- SocksError is a base class for:
- NoAcceptableAuthMethods
- LoginAuthenticationFailed
- InvalidServerVersion
- InvalidServerReply
try:
transport, protocol = await aiosocks2.create_connection(
lambda: Protocol, proxy=socks5_addr, proxy_auth=socks5_auth, dst=dst)
except aiosocks2.SocksConnectionError:
# connection error
except aiosocks2.LoginAuthenticationFailed:
# auth failed
except aiosocks2.NoAcceptableAuthMethods:
# All offered SOCKS5 authentication methods were rejected
except (aiosocks2.InvalidServerVersion, aiosocks2.InvalidServerReply):
# something wrong
except aiosocks2.SocksError:
# something other
or
try:
transport, protocol = await aiosocks2.create_connection(
lambda: Protocol, proxy=socks5_addr, proxy_auth=socks5_auth, dst=dst)
except aiosocks2.SocksConnectionError:
# connection error
except aiosocks2.SocksError:
# socks error
import asyncio
import aiohttp
import aiosocks2
from aiosocks2.connector import ProxyConnector, ProxyClientRequest
async def load_github_main():
auth5 = aiosocks2.Socks5Auth('proxyuser1', password='pwd')
auth4 = aiosocks2.Socks4Auth('proxyuser1')
ba = aiohttp.BasicAuth('login')
# remote resolve
conn = ProxyConnector(remote_resolve=True)
# or locale resolve
conn = ProxyConnector(remote_resolve=False)
try:
with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
# socks5 proxy
async with session.get('http://github.com/', proxy='socks5://127.0.0.1:1080',
proxy_auth=auth5) as resp:
if resp.status == 200:
print(await resp.text())
# socks4 proxy
async with session.get('http://github.com/', proxy='socks4://127.0.0.1:1081',
proxy_auth=auth4) as resp:
if resp.status == 200:
print(await resp.text())
# http proxy
async with session.get('http://github.com/', proxy='http://127.0.0.1:8080',
proxy_auth=ba) as resp:
if resp.status == 200:
print(await resp.text())
except aiohttp.ClientProxyConnectionError:
# connection problem
except aiohttp.ClientConnectorError:
# ssl error, certificate error, etc
except aiosocks2.SocksError:
# communication problem
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(load_github_main())
loop.close()