Azure OpenAI和OpenAI提供的服务基本一致的,除了了Azure 更新会慢一点之外,最大的区别是请求路径不同。
对于Azure而言,需要去Azure AI Foundry 里创建部署
部署名称就是请求时的URL参数的一部分,举例如下
https://xxx.openai.azure.com/openai/deployments/{your_deployment_name}/chat/completions?api-version=2024-12-01-preview
api-version
需要根据个人情况选择,新版本的包含更多的功能。
那么问题就简单了,如果要做代理转发,需要做的事情就是创建一个模型名称(如gpt-4-mini)和部署名称的映射表,写一个简单的程序读取body,然后动态拼接为正确的URL,然后请求返回即可。
如果你会写lua,那么直接搭配下openresty就可以了。然而……
选择Web框架
为了图简单,语言就选Python吧。在网上找到了一个 Python Web framework的性能对比图,还有一个比较新的对比图
信我的,不要选择blacksheep(后面会讲)。这次我们就选择第二名的sanic,文档很好,语法很像flask,原生支持asyncio,星标也很多
非流实现
非流式响应直接用httpx发请求,然后返回就行了,非常简单,记得跟着把状态码也设置了
代码如下,非常简单,应该不用解释就能看懂
import os import httpx from sanic import Sanic from sanic import json as json_response from sanic.request import Request client = httpx.AsyncClient( http2=True, timeout=httpx.Timeout( connect=15.0, read=300.0, write=300.0, pool=10.0, ), ) app = Sanic(__name__) url = os.getenv("URL") api_key = os.getenv("API_KEY") @app.route("/v1/chat/completions", methods=["POST"]) async def chat_completions(request: Request): body = request.json if body.get("stream"): pass else: return await non_stream(body) async def non_stream(body): response = await client.post(url, json=body, headers={"api-key": api_key}) return json_response(response.json(), status=response.status_code) if __name__ == "__main__": app.run(host="127.0.0.1", port=8000, debug=True, dev=True, auto_reload=True)
流式实现
Sanic的流式也很简单,我们直接使用 httpx的stream
就可以。
需要注意的是,如果请求参数错误,那么Azure会给返回400类错误,此时不能返回SSE,而且普通的json。这里可以通过response.aread()
来读取响应体。
也就是说代码大概长这样
async with client.stream("POST", url, json=body, headers={"api-key": api_key}) as response: if response.status_code != 200: error = await response.aread() # 由于 aread() 直接返回了bytes,所以就用raw方法返回,设置content-type为application json,没必要在反序列化一次用json返回 return raw(error, content_type="application/json", status=response.status_code)
如果请求正常开始返回,那么就先设置设置content-type
server = await request.respond(content_type="text/event-stream")
然后去迭代
async for chunk in response.aiter_text(): await server.send(chunk)
恭喜你,用50行代码实现了代理服务!至于 text-embedding 这种根本不支持流式的模型,甚至可以写一个通用的函数,反正就是原样发送、原样返回
进一步……
当然, 你可以根据自己的需求进一步扩展。比如说……
- 创建一个yaml配置文件,配置Azure OpenAI区域
- 配置好deployment和OpenAI model名字的映射表
- 加上权重
- 根据一定的算法,如轮询,加权轮询,最少使用等选择最佳区域
- 过滤响应内容中的字段(比如
content_filter_results
之类的) - 内容审查
用Python操作json可比用Go方便多了!不用两眼一发黑的写type真的是太幸福了!
具体操作空间,那留给自己想象啦!
为什么不要使用 blacksheep
最开始我选择了blacksheep,因为这个最快嘛,文档看起来也不错。
后来发现踩了很多坑,直接整个一天时间没了🫠……
SSE的序列化
Blacksheep用SSE是这样子滴:
import asyncio from collections.abc import AsyncIterable from blacksheep import Application, get from blacksheep.server.sse import ServerSentEvent, ServerSentEventsResponse app = Application() # An AsyncGenerator yielding ServerSentEvent... async def events_provider() -> AsyncIterable[ServerSentEvent]: for i in range(3): yield ServerSentEvent({"message": f"Hello World {i}"}) await asyncio.sleep(1) # A request handler returning a streaming response bound to the generator... @get("/events") def events_handler(): return ServerSentEventsResponse(events_provider)
ServerSentEvent
会自动json序列化你的传入的参数,正常OpenAI最后一个响应是[DONE]
然而用它你发现……你永远无法正确返回 [DONE]
,比如
yield ServerSentEvent("[DONE]") yield ServerSentEvent(["DONE"])
你会发现这引号是怎么回事🤡
解决方案是自定义他的json dumps,硬编码一下,如果是[DONE]
的时候直接返回
from blacksheep.settings.json import default_json_dumps, json_settings def custom_dumps(value): if value == "[DONE]": return value else: return default_json_dumps(value) json_settings.use(dumps=custom_dumps)
异步生成器锁死
Blacksheep是使用的异步生成器,看yield
和async
就知道。但是在流式请求的时候,如果azure返回了错误json,我们也要返回错误json给客户端,而不是返回SSE。
然而……一旦你用了yield+async,这个函数就是异步生成器函数了,你可以使用return
结束生成器,但是却不能使用 return 123
这样的表达式。
所以试图在 as response
后判断状态码,然后试图返回一个json的操作,比如
return json({"message": "Hello, World!"})
都是不行滴!
SSE锁死
实际上,当你路由中调用return ServerSentEventsResponse(events_provider)
后,整个请求只可能以SSE的格式返回了🫠
聪明的你可能会想着既然不能return
,那我yield
一下
if response.status_code != HTTPStatus.OK: content = await response.aread() yield content return
IDE没报错,但是运行时……
TypeError: Argument 'event' has incorrect type (expected blacksheep.contents.ServerSentEvent, got bytes)
别想着改type annotation了,不管用的🤣
提升 httpx.Client 也没用
我也想到了这个办法,先在路由后调用 client.stream()
然后看status code是不是200,如果是,那么走 return ServerSentEventsResponse
否则就是 return json
恭喜你!发现了新的坑!你会发现……
raise StreamClosed() httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.
那尝试手动进入,不用async with
了
stream_ctx = client.stream( ) stream_response = await stream_ctx.__aenter__() .... return ServerSentEventsResponse(partial(stream_provider, stream_ctx, stream_response))
很好
line 155, in stream_provider async for c in stream_response.aiter_text(): httpx.ReadTimeout
那就闭包,用前朝的剑指挥今朝的兵!这样的话,其实上面错误差不多😂
唯一可能的解决方案……
给Azure返回的chunks
都缓存起来,等都返回完了,把全部chunks
交stream_provider
,流式直接变非流👍真有你的
抛出一个自定义异常的办法也许管用,但是我一直没接住……
所以,不要使用 blacksheep,否则你的人生会变得不幸
参考
完整代码可见 https://gist.github.com/BennyThink/94ac6e088feb1cec829cf7c280c56783