登录
  • 人们都希望被别人需要 却往往事与愿违
  • 不必为自己的独特看法而害怕, 因为我们现在所接受的常识都曾是独特看法@《自由思想的十诫》罗素 (哲学家 数学家)

使用50行Python代码实现一个Azure OpenAI Proxy

Azure Benny小土豆 154次浏览 4606字 2个评论
文章目录[显示]
不要使用 blacksheep,否则你的人生会变得不幸

Azure OpenAI和OpenAI提供的服务基本一致的,除了了Azure 更新会慢一点之外,最大的区别是请求路径不同。

对于Azure而言,需要去Azure AI Foundry 里创建部署

使用50行Python代码实现一个Azure OpenAI Proxy

部署名称就是请求时的URL参数的一部分,举例如下

https://xxx.openai.azure.com/openai/deployments/{your_deployment_name}/chat/completions?api-version=2024-12-01-preview

api-version 需要根据个人情况选择,新版本的包含更多的功能。

那么问题就简单了,如果要做代理转发,需要做的事情就是创建一个模型名称(如gpt-4-mini)和部署名称的映射表,写一个简单的程序读取body,然后动态拼接为正确的URL,然后请求返回即可。

如果你会写lua,那么直接搭配下openresty就可以了。然而……

选择Web框架

为了图简单,语言就选Python吧。在网上找到了一个 Python Web framework的性能对比图,还有一个比较新的对比图

使用50行Python代码实现一个Azure OpenAI Proxy

可惜tornado已经廉颇老矣了,在没有asyncio的那个时代,tornado是王者🥲

信我的,不要选择blacksheep(后面会讲)。这次我们就选择第二名的sanic,文档很好,语法很像flask,原生支持asyncio,星标也很多

非流实现

非流式响应直接用httpx发请求,然后返回就行了,非常简单,记得跟着把状态码也设置了

代码如下,非常简单,应该不用解释就能看懂

import os

import httpx
from sanic import Sanic
from sanic import json as json_response
from sanic.request import Request

client = httpx.AsyncClient(
    http2=True,
    timeout=httpx.Timeout(
        connect=15.0,
        read=300.0,
        write=300.0,
        pool=10.0,
    ),
)

app = Sanic(__name__)

url = os.getenv("URL")
api_key = os.getenv("API_KEY")


@app.route("/v1/chat/completions", methods=["POST"])
async def chat_completions(request: Request):
    body = request.json
    if body.get("stream"):
        pass
    else:
        return await non_stream(body)


async def non_stream(body):
    response = await client.post(url, json=body, headers={"api-key": api_key})
    return json_response(response.json(), status=response.status_code)


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=8000, debug=True, dev=True, auto_reload=True)

流式实现

Sanic的流式也很简单,我们直接使用 httpx的stream就可以。

需要注意的是,如果请求参数错误,那么Azure会给返回400类错误,此时不能返回SSE,而且普通的json。这里可以通过response.aread() 来读取响应体。

也就是说代码大概长这样

async with client.stream("POST", url, json=body, headers={"api-key": api_key}) as response:
    if response.status_code != 200:
        error = await response.aread()
        # 由于 aread() 直接返回了bytes,所以就用raw方法返回,设置content-type为application json,没必要在反序列化一次用json返回
        return raw(error, content_type="application/json", status=response.status_code)

如果请求正常开始返回,那么就先设置设置content-type

server = await request.respond(content_type="text/event-stream")

然后去迭代

async for chunk in response.aiter_text():
    await server.send(chunk)

使用50行Python代码实现一个Azure OpenAI Proxy

恭喜你,用50行代码实现了代理服务!至于 text-embedding 这种根本不支持流式的模型,甚至可以写一个通用的函数,反正就是原样发送、原样返回

进一步……

当然, 你可以根据自己的需求进一步扩展。比如说……

  • 创建一个yaml配置文件,配置Azure OpenAI区域
  • 配置好deployment和OpenAI model名字的映射表
  • 加上权重
  • 根据一定的算法,如轮询,加权轮询,最少使用等选择最佳区域
  • 过滤响应内容中的字段(比如content_filter_results 之类的)
  • 内容审查

用Python操作json可比用Go方便多了!不用两眼一发黑的写type真的是太幸福了!

具体操作空间,那留给自己想象啦!

为什么不要使用 blacksheep

最开始我选择了blacksheep,因为这个最快嘛,文档看起来也不错。

后来发现踩了很多坑,直接整个一天时间没了🫠……

SSE的序列化

Blacksheep用SSE是这样子滴:

import asyncio
from collections.abc import AsyncIterable

from blacksheep import Application, get
from blacksheep.server.sse import ServerSentEvent, ServerSentEventsResponse

app = Application()


# An AsyncGenerator yielding ServerSentEvent...
async def events_provider() -> AsyncIterable[ServerSentEvent]:
    for i in range(3):
        yield ServerSentEvent({"message": f"Hello World {i}"})
        await asyncio.sleep(1)


# A request handler returning a streaming response bound to the generator...
@get("/events")
def events_handler():
    return ServerSentEventsResponse(events_provider)

ServerSentEvent 会自动json序列化你的传入的参数,正常OpenAI最后一个响应是[DONE]

使用50行Python代码实现一个Azure OpenAI Proxy

然而用它你发现……你永远无法正确返回 [DONE],比如

yield ServerSentEvent("[DONE]")
yield ServerSentEvent(["DONE"])

你会发现这引号是怎么回事🤡

使用50行Python代码实现一个Azure OpenAI Proxy

解决方案是自定义他的json dumps,硬编码一下,如果是[DONE]的时候直接返回

from blacksheep.settings.json import default_json_dumps, json_settings


def custom_dumps(value):
    if value == "[DONE]":
        return value
    else:
        return default_json_dumps(value)


json_settings.use(dumps=custom_dumps)

异步生成器锁死

Blacksheep是使用的异步生成器,看yieldasync就知道。但是在流式请求的时候,如果azure返回了错误json,我们也要返回错误json给客户端,而不是返回SSE。

然而……一旦你用了yield+async,这个函数就是异步生成器函数了,你可以使用return结束生成器,但是却不能使用 return 123这样的表达式。

所以试图在 as response后判断状态码,然后试图返回一个json的操作,比如

return json({"message": "Hello, World!"})

都是不行滴!

使用50行Python代码实现一个Azure OpenAI Proxy

SSE锁死

实际上,当你路由中调用return ServerSentEventsResponse(events_provider)后,整个请求只可能以SSE的格式返回了🫠

聪明的你可能会想着既然不能return,那我yield一下

if response.status_code != HTTPStatus.OK:
    content = await response.aread()
    yield content
    return

IDE没报错,但是运行时……

TypeError: Argument 'event' has incorrect type (expected blacksheep.contents.ServerSentEvent, got bytes)

别想着改type annotation了,不管用的🤣

提升 httpx.Client 也没用

我也想到了这个办法,先在路由后调用 client.stream() 然后看status code是不是200,如果是,那么走 return ServerSentEventsResponse 否则就是 return json

恭喜你!发现了新的坑!你会发现……

raise StreamClosed() httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.

那尝试手动进入,不用async with

stream_ctx = client.stream(   )
stream_response = await stream_ctx.__aenter__()
....
return ServerSentEventsResponse(partial(stream_provider, stream_ctx, stream_response))

很好

line 155, in stream_provider
async for c in stream_response.aiter_text():
httpx.ReadTimeout

那就闭包,用前朝的剑指挥今朝的兵!这样的话,其实上面错误差不多😂

唯一可能的解决方案……

给Azure返回的chunks都缓存起来,等都返回完了,把全部chunksstream_provider,流式直接变非流👍真有你的

抛出一个自定义异常的办法也许管用,但是我一直没接住……

所以,不要使用 blacksheep,否则你的人生会变得不幸

参考

完整代码可见 https://gist.github.com/BennyThink/94ac6e088feb1cec829cf7c280c56783


文章版权归原作者所有丨本站默认采用CC-BY-NC-SA 4.0协议进行授权|
转载必须包含本声明,并以超链接形式注明原作者和本文原始地址:
https://dmesg.app/py-azure-oai-proxy.html
喜欢 (0)
分享:-)
关于作者:
If you have any further questions, feel free to contact me in English or Chinese.
发表我的评论
取消评论

                     

去你妹的实名制!

  • 昵称 (必填)
  • 邮箱 (必填,不要邮件提醒可以随便写)
  • 网址 (选填)
(2)个小伙伴在吐槽
  1. SSE 的问题不是blacksheep的锅,事实上现在大多数*人性化*的web框架都用generator表示stream 然而http stream 一旦开始,就不能再返回40x的异常响应,你只能中断stream,让客户端收到一个connection reset 归根结底是因为sanic的API更*raw*,能让你控制该send 什么东西。 用生成器风格的API也有办法解决,就是在生成器的上一层去执行azure的请求(也就是你代码中events_handler函数),然后检查状态码抛异常,一旦逻辑进入generator,就由不得你抛异常了
    Frost2025-03-18 10:27 (6 天前)回复
    • 用你说的那个办法似乎也没法解决,那个 response 如果正常,在stream的函数里调用会是 StreamClosed 或 httpx.ReadTimeout,闭包也一样,也不知道为啥可能是真的太菜了,序列化也挺头疼的。 所以还是 sanic 好!👍
      Benny小土豆2025-03-18 16:49 (6 天前)回复