wenmo8 发布的文章

一 、 概述

为了弥补代码的遗失,木舟IOT平台正在加班加点进行研发,后面不只是针对于IOT设备接入上报,告警,视频管理,组态数据可视化大屏,后面还会有快速搭建微服务平台,利用surging.cli工具根据数据库表生成微服务,中间服务,能让程序员快速完成BOSS交给的任务,从而在这个内卷的社会能占有一席之地。这些都是没有完成任务的空话,现在发此篇的目的是作者有能力开发出优秀的IOT平台,先介绍一个比较突出的功能,就是可以基于共享或者独立配置添加网络组件, 下面来介绍一下如何添加网络组件。

一键运行打包成品下载:
https://pan.baidu.com/s/11hcf9ieCkJxlGrzvIuxeQA?pwd=ajsr

测试用户:fanly

测试密码:123456

为了让大家节约时间,能尽快运行产品看到效果,上面有 一键运行打包成品可以进行下载测试运行。

二、如何测试运行

以下是目录结构,

IDE:consul 注册中心

kayak.client: 网关

kayak.server:微服务

apache-skywalking-apm:skywalking链路跟踪

以上是目录结构,大家不需要一个个运行,只需要打开运行startup.bat,如果需要测试skywalking ,只需要apache-skywalking-apm\bin\startup.bat  文件就可以了,以下是运行的界面

三、如何添加组件

1.添加http服务组件,

打开平台界面,然后点击设备接入->网络组件,然后可以看到如下界面

再点击新增组件或者编辑组件,完成后注意启动状态是关闭状态,此时并不能对于该组件功能进行访问调用,只有把启动状态打开,才能访问调用

以上是http服务组件,启动完成后,如果设置了webservice和swagger,你可以访问webservice和swagger,看是否可以访问

2.添加/编辑Tcp服务组件

当添加/编辑Tcp组件时,设置Host:127.0.0.1 ,port:248并且还有解析方式选项,选项里面有不处理,固定长度,分隔符,自定义脚本,下面我们就来看自定义脚本

添加脚本如下:

parser.Fixed(4).Handler(function(buffer){var buf = BytesUtils.Slice(buffer,1,4);
parser.Fixed(buffer.ReadableBytes).Result(buf);
}).Handler(
function(buffer){parser.Fixed(8).Result(buffer);}
).Handler(
function(buffer){
parser.Result(
'处理完成','gb2312').Complete();
}
)

而基于TCP服务代码如下,需要继承于TcpBehavior

internal class TcpDeviceDataService : TcpBehavior, ITcpDeviceDataService
{
private readonly IDeviceProvider _deviceProvider;
public TcpDeviceDataService(IDeviceProvider deviceProvider)
{
_deviceProvider
=deviceProvider;
}

public override
voidLoad(string clientId, NetworkProperties tcpServerProperties)
{
var deviceStatus =_deviceProvider.IsConnected(clientId);this.Parser.HandlePayload().Subscribe(async buffer =>await ParserBuffer(buffer));
}

public override
voidDeviceStatusProcess(DeviceStatus status, string clientId, NetworkProperties tcpServerProperties)
{
//throw new NotImplementedException(); }

public async Task ParserBuffer(IByteBuffer buffer)
{
List
<string> result = new List<string>();while (buffer.ReadableBytes > 0)
{
result.Add(buffer.ReadString(
this.Parser.GetNextFixedRecordLength(),
Encoding.GetEncoding(
"gb2312")));
}
//var str= buffer.ReadString(buffer.ReadableBytes, Encoding.UTF8); var byteBuffer =Unpooled.Buffer();
byteBuffer.WriteString(
"\r\n", Encoding.UTF8);
byteBuffer.WriteString(
"处理完成", Encoding.GetEncoding("gb2312"));
await Sender.SendAndFlushAsync(byteBuffer);
//await Sender.SendAndFlushAsync("消息已接收",Encoding.GetEncoding("gb2312")); this.Parser.Close();
}

public Task
<bool>ChangeDeviceStage(string deviceId)
{
throw newNotImplementedException();
}
}

用测试Tcp调试工具结果如下

3.添加/编辑UDP服务组件

当添加/编辑UDP组件时, 设置Host:127.0.0.1 ,port:267 并且可以是否开启组播

而基于udp服务代码如下,需要继承于
UdpBehavior

internal class UdpDeviceDataService : UdpBehavior, IUdpDeviceDataService
{
public Task
<bool>ChangeDeviceStage(string deviceId)
{
throw newNotImplementedException();
}

public override async Task Dispatch(IEnumerable
<byte>bytes)
{
await Sender.SendAndFlushAsync(
"\r\n", Encoding.UTF8);
await Sender.SendAndFlushAsync(
"处理完成", Encoding.GetEncoding("gb2312"));
}
}

测试结果如下:

4.添加/编辑WebSocket服务组件

当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:55

而基于websocket服务代码如下,需要继承于WSBehavior

internal classWSDeviceDataService : WSBehavior, IWSDeviceDataService
{
protected override voidOnMessage(MessageEventArgs e)
{
this.Client.Value.SendTo($"send:{e.Data},\r\n reply:hello,welcome to you!",ID);
}
protected override voidOnOpen()
{

}
}

测试结果如下:

5.添加/编辑UDP服务组件

当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:345

添加greet.proto文件,脚本如下:

syntax = "proto3";

package Greet;

service Greeter {
//Sends a greeting rpc ChangeDeviceStage (DeviceRequest) returns (DeviceReply) {}
}

message DeviceRequest {
string deviceId = 1;
}

message DeviceReply {
bool message = 1;
}

然后再创建GreeterBehavior,继承Greeter.GreeterBase, IServiceBehavior,代码如下

public partial classGreeterBehavior : Greeter.GreeterBase, IServiceBehavior
{
privateServerReceivedDelegate received;public eventServerReceivedDelegate Received
{
add
{
if (value == null)
{
received
+=value;
}
}
remove
{
received
-=value;
}
}
public string MessageId { get; } = Guid.NewGuid().ToString("N");public async Task Write(object result, int statusCode = 200, string exceptionMessage = "")
{
if (received == null)return;var message = new TransportMessage(MessageId, newReactiveResultMessage
{
ExceptionMessage
=exceptionMessage,
StatusCode
=statusCode,
Result
=result

});
awaitreceived(message);
}
public T CreateProxy<T>(string key) where T : class{return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
}
public objectCreateProxy(Type type)
{
return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
}
public object CreateProxy(stringkey, Type type)
{
return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);
}
public T CreateProxy<T>() where T : class{return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();
}
public T GetService<T>(string key) where T : class{if (ServiceLocator.Current.IsRegisteredWithKey<T>(key))return ServiceLocator.GetService<T>(key);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
}
public T GetService<T>() where T : class{if (ServiceLocator.Current.IsRegistered<T>())return ServiceLocator.GetService<T>();else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();

}
public objectGetService(Type type)
{
if(ServiceLocator.Current.IsRegistered(type))returnServiceLocator.GetService(type);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
}
public object GetService(stringkey, Type type)
{
if(ServiceLocator.Current.IsRegisteredWithKey(key, type))returnServiceLocator.GetService(key, type);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);

}
public voidPublish(IntegrationEvent @event)
{
GetService
<IEventBus>().Publish(@event);
}

}

而基于grpc服务代码如下,需要继承于刚刚创建的GreeterBehavior

    public classGrpcDeviceDataService : GreeterBehavior, IGrpcDeviceDataService
{
public override Task<DeviceReply>ChangeDeviceStage(DeviceRequest request, ServerCallContext context)
{
return Task.FromResult(newDeviceReply
{
Message
= true}) ;
}
}

以下是测试结果:

6.添加/编辑MQTT服务组件

当添加/编辑MQTT组件时, 设置Host:127.0.0.1 ,port:425

而基于mqtt服务代码如下,需要继承于MqttBehavior

 public classMQTTDeviceDataService : MqttBehavior, IMQTTDeviceDataService
{
public override async Task<bool> Authorized(string username, stringpassword)
{
bool result = false;if (username == "admin" && password == "123456")
result
= true;return awaitTask.FromResult(result);
}
public async Task<bool> IsOnline(stringdeviceId)
{
return await base.GetDeviceIsOnine(deviceId);
}
public async Task Publish(stringdeviceId, WillMessage message)
{
var willMessage = newMqttWillMessage
{
WillMessage
=message.Message,
Qos
=message.Qos,
Topic
=message.Topic,
WillRetain
=message.WillRetain
};
awaitPublish(deviceId, willMessage);awaitRemotePublish(deviceId, willMessage);
}
}

以下是测试结果:

三、总结

木舟IOT平台会在github开源社区版本,可以自由更改代码,用于商业项目,但不能自营平台,如低代码平台,IOT平台等,如有违反,后果自负,还有最好不要更改命名空间,然后跟公司说是自己研发的,如果知道后,我在博客全网通报此人,以前surging相关的事件就算了,就当没发生过。,如果碰到困难,比较紧急的话,可以联系作者,加群:744677125

本篇是 Python 系列教程第 3 篇,更多内容敬请访问我的 Python 合集

Visual Studio Code的安装非常简单,就不放这里增加文章篇幅了。

相比PyCharm,VSCode更加轻量,启动速度快。并且搭配Python插件就能实现和Pycharm一样的代码提示、高亮效果。

1 安装插件

安装插件也非常简单,打开VSCode->拓展->搜python->install

安装完成后重启一下VSCode

2 创建项目

VSCode中并不能像在PyCharm中直接创建项目就有虚拟环境。VSCode中我们要手动创建一个项目文件夹,然后在此文件夹内创建Python虚拟环境。

2.1 创建文件夹

用VSCode打开这个文件夹

这样我们就有了一个空的项目

创建一个hello.py文件,并打印Hello Python!!!

运行(右键运行或者点右上角运行按钮):

运行结果:

2.2 创建虚拟环境

首先讲一下什么是虚拟环境:虚拟环境的作用是让不同的Python项目使用不同的Python解释器、第三方库等,把项目和项目之间进行隔离,安装的软件包互不冲突。对于我们学习Python来说,用虚拟环境不是必须的。如果道友想再多一点了解虚拟环境可以查询专栏里《Python虚拟环境介绍》一文。

如何创建虚拟环境呢?

步骤:
Ctrl+P打开控制面板->输入
>python:
->选择Create Environment->选择Venv或Conda->选择Python解释器。接下来就开始创建了

2.3 配置debugger并debug

Debug是代码开发过程必备可少的操作,Python也有Debug模式。

step1:打断点。有两种方式,一是直接点击行号左侧,二是将鼠标放到这行然后按F9。

step2:选择调试器。按F5打开调试器列表,选择Python Debugger。

step3:选择调试器配置,第一项为“调试当前选中的文件”

step4:点击文件右上角的运行按钮旁边的向下箭头,选择Python Debugger: Debug Python File

能看到成功进入了Debug模式

使用FastAPI开发项目时,良好的目录结构可以帮助你更好地组织代码,提高可维护性和扩展性。同样,对基类的封装,也可以进一步减少开发代码,提供便利,并减少出错的几率。

下面是一个推荐的目录结构示例:

my_fastapi_project/├── app/│   ├── __init__.py
│ ├── main.py # 入口文件
│ ├── core
/│ │ ├── __init__.py
│ │ ├── config.py # 配置文件
│ │ ├── security.py # 安全相关
│ │ └── ... # 其他核心功能
│ ├── api
/│ │ ├── __init__.py
│ │ ├── v1
/│ │ │ ├── __init__.py
│ │ │ ├── endpoints
/│ │ │ │ ├── __init__.py
│ │ │ │ ├── users.py # 用户相关接口
│ │ │ │ ├── items.py # 其他接口
│ │ │ │ └── ...
│ │ │ └── ... # 其他版本的API
│ ├── models
/│ │ ├── __init__.py
│ │ ├── user.py # 用户模型
│ │ ├── item.py # 其他模型
│ │ └── ...
│ ├── schemas
/│ │ ├── __init__.py
│ │ ├── user.py # 用户数据模型
│ │ ├── item.py # 其他数据模型
│ │ └── ...
│ ├── crud
/│ │ ├── __init__.py
│ │ ├── user.py # 用户CRUD操作
│ │ ├── item.py # 其他CRUD操作
│ │ └── ...
│ ├── db
/│ │ ├── __init__.py
│ │ ├── base.py # 数据库基础设置
│ │ ├── session.py # 数据库会话
│ │ └── ...
│ ├── tests
/│ │ ├── __init__.py
│ │ ├── test_main.py # 测试主文件
│ │ ├── test_users.py # 用户相关测试
│ │ └── ...
│ └── utils
/│ ├── __init__.py
│ ├── utils.py # 工具函数
│ └── ...
├── .
env# 环境变量文件
├── alembic
/# 数据库迁移工具目录
│ ├──
env.py
│ ├── script.py.mako
│ └── versions
/│ └── ...
├── alembic.ini # Alembic 配置文件
├── requirements.txt # 项目依赖
├── Dockerfile # Docker 配置文件
└── README.md # 项目说明文件

目录结构说明:

  • app/
    : 项目的主目录,包含所有应用相关代码。
    • main.py
      : 项目的入口文件,启动FastAPI应用。
    • core/
      : 核心功能,如配置、安全等。
    • api/
      : API路由和视图,分版本管理。
    • models/
      : 数据库模型。
    • schemas/
      : 数据模型,用于请求和响应的验证。
    • crud/
      : 数据库操作(CRUD:创建、读取、更新、删除)。
    • db/
      : 数据库相关设置和会话管理。
    • tests/
      : 测试代码。
    • utils/
      : 工具函数和公用模块。
  • .env
    : 环境变量文件,用于存储敏感信息,如数据库连接字符串。
  • alembic/
    : 数据库迁移工具Alembic的配置目录。
  • requirements.txt
    : 项目依赖列表。
  • Dockerfile
    : Docker配置文件,用于容器化部署。
  • README.md
    : 项目说明文件。

这个结构可以根据项目需求进行调整,但保持清晰和模块化是良好的实践。

python项目总的__init__.py,有意义吗

在Python项目中,
__init__.py
文件的主要作用是将目录标识为一个Python包。它使得目录中的模块可以被导入和使用。在一些情况下,
__init__.py
可以不仅仅是一个空文件,还可以包含一些初始化代码。

__init__.py
的意义:

  1. 将目录标识为包:


    • 任何包含
      __init__.py
      的目录都会被Python解释器认为是一个包,这样你就可以使用包导入语法,如
      import mypackage.module
  2. 初始化代码:

  • 可以在
    __init__.py
    中包含一些初始化代码,如导入包内的子模块、设置包级别的变量或函数、配置日志记录等。例如:
# mypackage/__init__.py
from .submodule1 import func1
from .submodule2 import func2

__all__
= ["func1", "func2"]

3.简化导入

    • 通过在
      __init__.py
      中导入子模块,可以简化包的导入路径,使得用户可以直接从包中导入函数或类,而不必知道具体的模块结构。
# mypackage/__init__.py
from .submodule import MyClass

# Now you can
dofrom mypackage import MyClass

对于Python 3.3及以上版本,
__init__.py
文件不是强制性的,即使没有
__init__.py
文件,Python解释器也可以识别包。然而,添加
__init__.py
文件仍然是一个良好的习惯,可以避免某些情况下的意外行为,并且明确表示该目录是一个包。

2、Fast API项目的开发处理过程

在FastAPI项目中,CRUD操作通常在一个专门的
crud
模块中实现。这个模块会调用SQLAlchemy模型对象来进行数据库操作。

1. 定义模型 (
models/user.py
)

from sqlalchemy importColumn, Integer, Stringfrom app.db.base_class importBaseclassUser(Base):__tablename__ = "users"id= Column(Integer, primary_key=True, index=True)
email
= Column(String, unique=True, index=True, nullable=False)
hashed_password
= Column(String, nullable=False)
full_name
= Column(String, index=True)

2. 创建数据库会话 (
db/session.py
)

from sqlalchemy importcreate_enginefrom sqlalchemy.orm importsessionmaker

DATABASE_URL
= "sqlite:///./test.db" #使用SQLite数据库作为示例 engine=create_engine(DATABASE_URL)
SessionLocal
= sessionmaker(autocommit=False, autoflush=False, bind=engine)

3. 定义CRUD操作 (
crud/user.py
)

from sqlalchemy.orm importSessionfrom app.models.user importUserfrom app.schemas.user importUserCreate, UserUpdatedefget_user(db: Session, user_id: int):return db.query(User).filter(User.id ==user_id).first()defget_user_by_email(db: Session, email: str):return db.query(User).filter(User.email ==email).first()def get_users(db: Session, skip: int = 0, limit: int = 10):returndb.query(User).offset(skip).limit(limit).all()defcreate_user(db: Session, user: UserCreate):
db_user
=User(
email
=user.email,
hashed_password
=user.hashed_password, #在实际应用中应该对密码进行哈希处理 full_name=user.full_name
)
db.add(db_user)
db.commit()
db.refresh(db_user)
returndb_userdefupdate_user(db: Session, user_id: int, user: UserUpdate):
db_user
=get_user(db, user_id)ifdb_user:
db_user.email
=user.email
db_user.full_name
=user.full_name
db.commit()
db.refresh(db_user)
returndb_userdefdelete_user(db: Session, user_id: int):
db_user
=get_user(db, user_id)ifdb_user:
db.delete(db_user)
db.commit()
return db_user

4. 定义数据模型 (
schemas/user.py
)

from pydantic importBaseModelclassUserBase(BaseModel):
email: str
full_name: str
=NoneclassUserCreate(UserBase):
hashed_password: str
classUserUpdate(UserBase):pass classUser(UserBase):
id: int
classConfig:
orm_mode
= True

5. 在API端点中使用CRUD操作 (
api/v1/endpoints/users.py
)

from fastapi importAPIRouter, Depends, HTTPExceptionfrom sqlalchemy.orm importSessionfrom app importcrud, models, schemasfrom app.db.session importSessionLocal

router
=APIRouter()defget_db():
db
=SessionLocal()try:yielddbfinally:
db.close()

@router.post(
"/users/", response_model=schemas.User)def create_user(user: schemas.UserCreate, db: Session =Depends(get_db)):
db_user
= crud.get_user_by_email(db, email=user.email)ifdb_user:raise HTTPException(status_code=400, detail="Email already registered")return crud.create_user(db=db, user=user)

@router.get(
"/users/{user_id}", response_model=schemas.User)def read_user(user_id: int, db: Session =Depends(get_db)):
db_user
= crud.get_user(db, user_id=user_id)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")returndb_user

@router.put(
"/users/{user_id}", response_model=schemas.User)def update_user(user_id: int, user: schemas.UserUpdate, db: Session =Depends(get_db)):
db_user
= crud.update_user(db=db, user_id=user_id, user=user)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")returndb_user

@router.delete(
"/users/{user_id}", response_model=schemas.User)def delete_user(user_id: int, db: Session =Depends(get_db)):
db_user
= crud.delete_user(db=db, user_id=user_id)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")returndb_user

@router.get(
"/users/", response_model=List[schemas.User])def read_users(skip: int = 0, limit: int = 10, db: Session =Depends(get_db)):
users
= crud.get_users(db, skip=skip, limit=limit)return users

6. 注册路由 (
main.py
)

from fastapi importFastAPIfrom app.api.v1.endpoints importusers

app
=FastAPI()

app.include_router(users.router, prefix
="/api/v1", tags=["users"])if __name__ == "__main__":importuvicorn
uvicorn.run(app, host
="0.0.0.0", port=8000)

7. 初始化数据库 (
db/base.py
)

from app.db.session importenginefrom app.models importuser

user.Base.metadata.create_all(bind
=engine)

8. 运行应用

在项目根目录下运行:

uvicorn app.main:app --reload

这样,你的CRUD层就可以调用模型对象来进行数据库操作了。上述代码展示了如何定义模型、数据库会话、CRUD操作、数据模型和API端点,并将它们结合在一起,实现一个简单的用户管理系统。

3、实际FastAPI项目对基类的封装

可以通过创建一个通用的CRUD基类来封装常规的CRUD操作,然后让特定的CRUD类继承这个基类。这样可以减少重复代码,提高代码的可维护性和可复用性。下面是一个实现示例。

1、创建通用CRUD基类 (
crud/base.py
)

rom typing importGeneric, Type, TypeVar, Optional, Listfrom pydantic importBaseModelfrom sqlalchemy.orm importSessionfrom app.db.base_class importBase

ModelType
= TypeVar("ModelType", bound=Base)
CreateSchemaType
= TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType
= TypeVar("UpdateSchemaType", bound=BaseModel)classCRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):def __init__(self, model: Type[ModelType]):
self.model
=modeldef get(self, db: Session, id: int) ->Optional[ModelType]:return db.query(self.model).filter(self.model.id ==id).first()def get_multi(self, db: Session, skip: int = 0, limit: int = 100) ->List[ModelType]:returndb.query(self.model).offset(skip).limit(limit).all()def create(self, db: Session, obj_in: CreateSchemaType) ->ModelType:
obj_in_data
=obj_in.dict()
db_obj
= self.model(**obj_in_data) #type: ignore db.add(db_obj)
db.commit()
db.refresh(db_obj)
returndb_objdef update(self, db: Session, db_obj: ModelType, obj_in: UpdateSchemaType) ->ModelType:
obj_data
=db_obj.dict()
update_data
= obj_in.dict(skip_defaults=True)for field inobj_data:if field inupdate_data:
setattr(db_obj, field, update_data[field])
db.commit()
db.refresh(db_obj)
returndb_objdef remove(self, db: Session, id: int) ->ModelType:
obj
=db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

2、定义用户CRUD操作 (
crud/user.py
)

from typing importAnyfrom sqlalchemy.orm importSessionfrom app.crud.base importCRUDBasefrom app.models.user importUserfrom app.schemas.user importUserCreate, UserUpdateclassCRUDUser(CRUDBase[User, UserCreate, UserUpdate]):def get_by_email(self, db: Session, email: str) ->Any:return db.query(self.model).filter(self.model.email ==email).first()

user
= CRUDUser(User)

3、定义数据模型 (
schemas/user.py
)

from pydantic importBaseModelclassUserBase(BaseModel):
email: str
full_name: str
=NoneclassUserCreate(UserBase):
hashed_password: str
classUserUpdate(UserBase):pass classUser(UserBase):
id: int
classConfig:
orm_mode
= True

4、在API端点中使用CRUD操作 (
api/v1/endpoints/users.py
)

from fastapi importAPIRouter, Depends, HTTPExceptionfrom sqlalchemy.orm importSessionfrom typing importListfrom app importcrud, schemasfrom app.db.session importSessionLocalfrom app.models.user importUser

router
=APIRouter()defget_db():
db
=SessionLocal()try:yielddbfinally:
db.close()

@router.post(
"/users/", response_model=schemas.User)def create_user(user: schemas.UserCreate, db: Session =Depends(get_db)):
db_user
= crud.user.get_by_email(db, email=user.email)ifdb_user:raise HTTPException(status_code=400, detail="Email already registered")return crud.user.create(db=db, obj_in=user)

@router.get(
"/users/{user_id}", response_model=schemas.User)def read_user(user_id: int, db: Session =Depends(get_db)):
db_user
= crud.user.get(db, id=user_id)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")returndb_user

@router.put(
"/users/{user_id}", response_model=schemas.User)def update_user(user_id: int, user: schemas.UserUpdate, db: Session =Depends(get_db)):
db_user
= crud.user.get(db=db, id=user_id)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")return crud.user.update(db=db, db_obj=db_user, obj_in=user)

@router.delete(
"/users/{user_id}", response_model=schemas.User)def delete_user(user_id: int, db: Session =Depends(get_db)):
db_user
= crud.user.get(db=db, id=user_id)if db_user isNone:raise HTTPException(status_code=404, detail="User not found")return crud.user.remove(db=db, id=user_id)

@router.get(
"/users/", response_model=List[schemas.User])def read_users(skip: int = 0, limit: int = 10, db: Session =Depends(get_db)):
users
= crud.user.get_multi(db, skip=skip, limit=limit)return users

其他的就是类似前面的做法了。

通过这种方式,你可以在通用的CRUD基类中封装常规的CRUD操作,而特定的CRUD类(如
CRUDUser
)只需要继承这个基类并添加特定的操作方法。这样不仅减少了重复代码,也提高了代码的可维护性和可复用性。

如果你希望可以通过定义一个通用的API基类来封装常规的CRUD操作方法,然后在具体的端点文件中继承这个基类。这样可以进一步减少重复代码,提高代码的可维护性和可复用性。

创建通用API基类 (
api/deps.py
)

from typing importType, TypeVar, Generic, Listfrom fastapi importAPIRouter, Depends, HTTPExceptionfrom sqlalchemy.orm importSessionfrom pydantic importBaseModelfrom app.crud.base importCRUDBasefrom app.db.session importSessionLocal

ModelType
= TypeVar("ModelType", bound=BaseModel)
CreateSchemaType
= TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType
= TypeVar("UpdateSchemaType", bound=BaseModel)classCRUDRouter(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):def __init__(self, crud: CRUDBase[ModelType, CreateSchemaType, UpdateSchemaType]):
self.crud
=crud
self.router
=APIRouter()

self.router.post(
"/", response_model=ModelType)(self.create_item)
self.router.get(
"/{item_id}", response_model=ModelType)(self.read_item)
self.router.put(
"/{item_id}", response_model=ModelType)(self.update_item)
self.router.delete(
"/{item_id}", response_model=ModelType)(self.delete_item)
self.router.get(
"/", response_model=List[ModelType])(self.read_items)defget_db(self):
db
=SessionLocal()try:yielddbfinally:
db.close()

async
def create_item(self, item_in: CreateSchemaType, db: Session =Depends(self.get_db)):
db_item
= self.crud.create(db=db, obj_in=item_in)returndb_item

async
def read_item(self, item_id: int, db: Session =Depends(self.get_db)):
db_item
= self.crud.get(db=db, id=item_id)if notdb_item:raise HTTPException(status_code=404, detail="Item not found")returndb_item

async
def update_item(self, item_id: int, item_in: UpdateSchemaType, db: Session =Depends(self.get_db)):
db_item
= self.crud.get(db=db, id=item_id)if notdb_item:raise HTTPException(status_code=404, detail="Item not found")return self.crud.update(db=db, db_obj=db_item, obj_in=item_in)

async
def delete_item(self, item_id: int, db: Session =Depends(self.get_db)):
db_item
= self.crud.get(db=db, id=item_id)if notdb_item:raise HTTPException(status_code=404, detail="Item not found")return self.crud.remove(db=db, id=item_id)

async
def read_items(self, skip: int = 0, limit: int = 10, db: Session =Depends(self.get_db)):
items
= self.crud.get_multi(db=db, skip=skip, limit=limit)return items

使用通用API基类定义用户端点(
api/v1/endpoints/users.py
)

from fastapi importAPIRouterfrom app.crud.user importuser as user_crudfrom app.schemas.user importUser, UserCreate, UserUpdatefrom app.api.deps importCRUDRouter

user_router
=CRUDRouter[User, UserCreate, UserUpdate](user_crud)
router
= user_router.router

注册路由 (
main.py
)

rom fastapi importFastAPIfrom app.api.v1.endpoints importusers

app
=FastAPI()

app.include_router(users.router, prefix
="/api/v1/users", tags=["users"])if __name__ == "__main__":importuvicorn
uvicorn.run(app, host
="0.0.0.0", port=8000)

通过这种方式,你可以在
CRUDRouter
基类中封装常规的CRUD操作方法,然后在具体的端点文件中继承这个基类并传递相应的CRUD对象。这样可以进一步减少重复代码,提高代码的可维护性和可复用性。

4、SQLAlchemy模型的基类定义

app.db.base_class
通常是用于定义SQLAlchemy模型基类的文件。在这个文件中,我们会定义一个基本的Base类,这个类是所有SQLAlchemy模型的基类。下面是一个实现示例:

定义
Base
类 (
db/base_class.py
)

from sqlalchemy.ext.declarative importas_declarative, declared_attr

@as_declarative()
classBase:
id: int
__name__: str

@declared_attr
def __tablename__(cls) ->str:return cls.__name__.lower()

详细解释

  1. @as_declarative()
    : 这是SQLAlchemy提供的一个装饰器,它会将类装饰为一个声明性基类。所有继承自这个类的子类都会自动成为声明性类。

  2. id: int
    : 这是一个类型注释,表示每个模型类都会有一个
    id
    属性。具体的字段定义(例如
    Column(Integer, primary_key=True)
    )会在每个具体的模型类中定义。

  3. __name__: str
    : 这是另一个类型注释,表示每个模型类都会有一个
    __name__
    属性。

  4. @declared_attr
    : 这是SQLAlchemy提供的一个装饰器,允许我们为声明性基类定义一些通用的属性。在这个例子中,它用于自动生成
    __tablename__
    属性。这个属性的值是模型类的名称的小写形式。

这样定义的
Base
类可以作为所有SQLAlchemy模型的基类,简化模型的定义。

完整示例项目结构:为了更好地理解,这里展示一个包含
Base
类定义的完整项目结构:

.
├── app
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ └── v1
│ │ ├── __init__.py
│ │ └── endpoints
│ │ ├── __init__.py
│ │ └── users.py
│ ├── crud
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── user.py
│ ├── db
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── base_class.py
│ │ └── session.py
│ ├── models
│ │ ├── __init__.py
│ │ └── user.py
│ ├── schemas
│ │ ├── __init__.py
│ │ └── user.py
│ └── main.py

models/user.py 类文件如下定义

from sqlalchemy importColumn, Integer, Stringfrom app.db.base_class importBaseclassUser(Base):__tablename__ = "users"id= Column(Integer, primary_key=True, index=True)
email
= Column(String, unique=True, index=True, nullable=False)
hashed_password
= Column(String, nullable=False)
full_name
= Column(String, index=True)

通过这种结构和定义,您可以创建一个简洁、可扩展的FastAPI项目,能够快速定义新的数据库模型并生成相应的CRUD操作和API端点。

在光照条件不佳下捕获的图像可能同时包含过曝和欠曝。目前的方法主要集中在调整图像亮度上,这可能会加剧欠曝区域的色调失真,并且无法恢复过曝区域的准确颜色。论文提出通过学习估计和校正这种色调偏移,来增强既有过曝又有欠曝的图像。先通过基于
UNet
的网络推导输入图像的增亮和变暗版本的色彩特征图,然后使用伪正常特征生成器生成伪正常色彩特征图。接着,通过论文提出的
COlor Shift Estimation
(
COSE
) 模块来估计推导的增亮(或变暗)色彩特征图与伪正常色彩特征图之间的色调偏移,分别校正过曝和欠曝区域的估计色调偏移。最后,使用提出的
COlor MOdulation
(
COMO
) 模块来调制过曝和欠曝区域中分别校正后的颜色,以生成增强图像。

来源:晓飞的算法工程笔记 公众号

论文: Color Shift Estimation-and-Correction for Image Enhancement

Introduction


现实世界的场景通常涉及广泛的照明条件,这对摄影构成了重大挑战。尽管相机具有自动曝光模式来根据场景亮度确定“理想”的曝光设置,但是在整个图像范围内均匀调整曝光仍可能导致区域过度明亮和过度昏暗,这种欠曝和过曝的区域可能表现出明显的色调失真。欠曝区域相对较高的噪音水平会改变数据分布,导致色调偏移,而过曝区域则会失去原始的色彩。因此,增强这类图像通常涉及到亮度调整和色调偏移校正。

近年来,已经进行了许多努力来增强不正确曝光的图像。这些方法可以大致分为两类。

  1. 第一类专注于增强过曝或欠曝的图像。一些方法提出学习曝光不变的表示空间,其中不同的曝光水平可以映射到一个标准化和不变的表示中。其他方法则提出将频率信息与空间信息整合,这有助于模拟图像固有的结构特征,从而增强图像的亮度和结构失真。然而,上述方法通常假设过度或欠曝发生在整个图像上,对于同时存在过度曝光和欠曝光的图像(例如,图
    1
    (b)),它们效果不佳。
  2. 第二类工作旨在增强同时存在过度曝光和欠曝光的图像,利用局部颜色分布作为先验来引导增强过程。然而,尽管设计了金字塔式的局部颜色分布先验,仍然倾向于产生在大面积均质区域中出现显著色彩偏移的结果(例如,图
    1
    (c))。

本文旨在校正同时存在过度曝光和欠曝光的图像的亮度和色彩失真问题。为了解决这个问题,首先在图
1
(f)和
1
(g)中展示了从两个相关数据集(
MSEC

LCDP
)中随机抽样的像素的
PCA
结果。
MSEC
数据集中每个场景包含五张不同曝光值(
EV
)的输入图像,而
LCDP
数据集中每个场景只有一张同时包含过度曝光和欠曝光的输入图像。从这个初步研究中,可以得出了两个观察结果。

  1. 在这两个数据集中,欠曝光像素(绿点)倾向于与过度曝光像素(红点)有相反的分布偏移。

  2. MSEC
    数据集包含了
    0 EV
    输入图像作为曝光标准化过程的参考图像不同,
    LCDP
    的图像没有这样的“正常曝光”像素。

第一个观察结果启发我们考虑估计和校正这样的色彩偏移,而第二个观察结果则启发我们创建伪正常曝光特征图,作为色彩偏移估计和矫正的参考。

为此,论文提出了一种新方法,联合调整图像亮度并校正色调失真。首先使用基于
UNet
的网络,从输入图像的增亮和变暗版本中提取过度曝光和欠曝光区域的色彩特征图。接着,伪正常特征生成器基于这些派生的色彩特征图创建伪正常色彩特征图。随后,论文提出了一种新的颜色偏移估计(
COSE
)模块,分别估计和校正派生的增亮(或变暗)色彩特征图与创建的伪正常色彩特征图之间的色彩偏移,通过在颜色特征域中扩展可变形卷积来实现
COSE
模块。进一步,论文提出了一种新的颜色调制(
COMO
)模块,通过定制的交叉注意力机制,在过度曝光和欠曝光区域的分别校正的色彩上进行调制,以生成增强图像。通过在输入图像和估计的变暗/增亮色彩偏移上执行定制的交叉注意力机制来实现
COMO
模块,图
1
(d)显示了我们的方法能够生成视觉上令人愉悦的图像。

论文的主要贡献可以总结如下:

  1. 提出了一种新颖的神经网络方法,通过建模色彩分布的变化来增强同时存在过度曝光和欠曝光的图像。

  2. 提出了一种新颖的神经网络,包括两个新模块:一是用于分别估计和校正过度曝光和欠曝光区域中色彩的新颖颜色偏移估计(
    COSE
    )模块,二是用于调制校正后的颜色以生成增强图像的新颖颜色调制(
    COMO
    )模块。

  3. 广泛的实验证明,论文的网络具有轻量化的特点,并且在流行的基准测试中表现优于现有的图像增强方法。

Proposed Method


论文的方法受到两点观察的启发。首先,与欠曝光像素相比,过曝光像素倾向于具有反向分布偏移,这表明有必要分别捕捉和修正这样的色彩偏移。其次,由于绝大多数(如果不是全部)像素都受到过曝光或欠曝光的影响,因此有必要创建伪正常曝光信息,以指导过曝光或欠曝光像素色彩偏移的估计。基于这两点观察,我们提出了一种新的网络,其中包括两个新模块:新的色彩偏移估计(
COSE
)模块和新的色彩调制(
COMO
)模块,用于增强具有过曝光或欠曝光的图像。

Network Overview

给定一个具有过曝光和欠曝光的输入图像
\(I_x\in \mathcal{R}^{3\times H\times W}\)
,旨在生成一个增强后的图像
\(I_y\in \mathcal{R}^{3\times H\times W}\)
,具有校正的图像亮度以及恢复的图像细节和颜色,模型结构如图
2
所示。给定输入图像
\(I_x\)
,首先通过计算其反向版本
\(\hat{I}_x=1-I_x\)
,然后将两者输入基于
UNet
的网络,以提取两个光照图
\(F_L^U\in \mathcal{R}^{1\times H\times W}\)

\(F_L^O\in \mathcal{R}^{1\times H\times W}\)
,这两个光照图(即
\(F_L^U\)

\(F_L^O\)
)分别表示受欠曝光和过曝光影响的区域。接下来,计算暗化特征图
\(F_D\)
和增亮特征图
\(F_B\)
,具体如下:

\[\begin{align}
F_B = \frac{I_x}{F_L^U} &= \frac{I_x}{f(I_x)}, \\
F_D = 1-\frac{1-I_x}{F_L^O} &= 1 - \frac{1-I_x}{f(1 - I_x)},
\end{align}
\]

其中,
\(f(\cdot)\)
表示基于
UNet
的特征提取器。根据增亮和暗化的特征图
\(F_B, F_D \in \mathbb{R}^{3 \times H \times W}\)
来建模色彩偏移。

给定
\(F_B\)

\(F_D\)
,首先使用伪正常特征生成器将它们与输入图像
\(I_x\)
融合,生成伪正常特征图
\(F_N\)
,具体如下:

\[\begin{align}
F_N = g(F_B, F_D, I_x),
\end{align}
\]

其中,
\(g(\cdot)\)
表示伪正常曝光生成器。然后,将
\(F_N\)
可以作为参考,分别通过两个
COSE
模块引导估计
\(F_B\)

\(F_N\)
以及
\(F_D\)

\(F_N\)
之间的色彩偏移。这两个
COSE
模块产生的暗化偏移
\(O_D\)
和增亮偏移
\(O_B\)
,针对输入图像
\(I_x\)
模拟了亮度和色彩的变化。因此,
\(O_D\)

\(O_B\)

\(I_x\)
被送入提出的
COMO
模块中,用于调整图像的亮度并纠正色彩偏移,生成最终的图像
\(I_y\)

Color Shift Estimation (COSE) Module

与亮度调整不同,色彩偏移校正更具挑战性,因为它本质上要求网络在
RGB
色彩空间中建模像素方向,而不是像素强度的幅度。尽管有一些工作使用余弦相似性正则化来帮助在训练过程中保持图像的颜色,但这样的策略通常在大面积低曝光或过曝区域失败,因为这些区域中的小值或高值像素预期具有不同的颜色。

论文提出基于可变形卷积技术的
COSE
模块来解决这一问题。可变形卷积(
DConv
)通过引入空间偏移
\(\Delta p_n\)
扩展了普通卷积,能够自适应地在任何
\(N\times N\)
像素的任意位置执行卷积,其中
\(N\times N\)
表示卷积核的大小。调制项
\(\Delta m_n\)
被提出来为不同的卷积核位置分配不同的权重,使卷积运算符聚焦于重要的像素。虽然可变形卷积可以预测相对于基础的偏移量,从而捕捉颜色分布的变化,但由于之前的方法只在像素空间域应用了可变形卷积,论文提出将可变形卷积扩展到空间域和色彩空间中,以联合建模亮度变化和色彩偏移。

如图
3
所示,
COSE
模块首先沿通道维度连接伪正常特征图
\(F_N\)
和增亮/暗化特征图
\(F_B\)
/
\(F_D\)
,然后使用三个独立的
\(3\times 3\)
卷积来提取位置偏移
\(\Delta p_n\in \mathcal{R}^{B\times 2N\times H\times W}\)
,颜色偏移
\(\Delta c_n\in \mathcal{R}^{B\times 3N\times H\times W}\)
和调制项
\(\Delta m_n\in \mathcal{R}^{B\times N\times H\times W}\)
。位置偏移
\(\Delta p_n\)
和调制项
\(\Delta m_n\)
在空间域内执行,以聚合卷积操作中变形不规则感受野的空间上下文信息。此外,引入了颜色偏移
\(\Delta c_n\)
,用于表示每个通道在每个卷积核位置上的颜色偏移。学习到的颜色偏移
\(\Delta c_n\)
被设计为具有
\(3N\)
个通道,用于模拟具有
3
个通道的输入
sRGB
图像的颜色偏移。

可变形卷积在空间域和色彩空间中的计算可以写成:

\[\begin{align}
y = \sum_{p_n\in \mathcal{R}} (w_n\cdot x(p_0 + p_n + \Delta p_n) + \Delta c_n) \cdot \Delta m_n,\label{eq:cdc}
\end{align}
\]

其中,
\(x\)
表示卷积操作的输入特征,而
\(p_0\)

\(p_n\)

\(\Delta p_n\)
是表示空间位置的二维变量。
\(y\)
(或
\(y(p_0)\)
)表示输入图像中每个像素
\(p_0\)
的色彩空间可变形卷积的输出。集合
\(\mathcal{R} = \{(-1, -1), (-1, 0), \dots, (1, 1)\}\)
表示常规
\(3\times 3\)
卷积核的网格。
\(n\)

\(\mathcal{R}\)
中元素的枚举器,指示第
\(n\)
个位置,
\(N\)

\(\mathcal{R}\)
的长度(对于常规
\(3\times 3\)
卷积核,
\(N=9\)
)。由于位移
\(\Delta p_n\)
在实践中可能具有小数,采用双线性插值进行计算,这与空间可变形卷积相一致。

Color Modulation (COMO) Module

COMO
模块用于调节输入图像的亮度和颜色,生成最终的输出图像
\(I_y\)
,基于学习到的亮化特征
\(F_B\)
和变暗特征
\(F_D\)
之间的偏移量
\(O_B\)
/
\(O_D\)
,以及伪正常特征
\(F_N\)
。由于在生成具有和谐颜色的校正图像时聚合全局信息至关重要,论文从非局部上下文建模中汲取灵感,并通过将
self-affinity
计算扩展为
cross-affinity
计算来制定
COMO
模块,以便
COMO
能够通过查询
\(O_B\)

\(O_D\)
来增强输入图像。

如图
4
所示,为处理输入图像
\(I_x\)
、变暗偏移量
\(O_D\)
和亮化偏移量
\(O_B\)
分别分配了三个分支,每个分支包含三个
\(1\times 1\)
卷积层(分别表示为
\(Conv\psi\)

\(Conv\phi\)

\(ConvZ\)
)。然后,在每个分支中计算
self-affinity
矩阵
\(A_i\)
,如下所示:

\[\begin{align}
A_i = \psi_i \otimes \phi_i,\ for\ i\in \{I, B, D\},
\end{align}
\]

其中,
\(\otimes\)
表示矩阵乘法,
\(\psi_i\)

\(\phi_i\)
分别是由
\(Conv\psi\)

\(Conv\phi\)
得到的特征图。然后,
\(A_i\)
被对称化并归一化,以确保存在实特征值并稳定反向传播。
\(A_i\)
的每一行用作空间注意力图,而
\(Z_i\)
(通过
\(ConvZ\)
获得)用作注意力图的权重。接下来,通过矩阵乘法建模
\(I_x\)

\(O_B\)
/
\(O_D\)
之间的相关性,并将它们与
self-affinity
特征相加,如下所示:

\[\begin{align}
f_j = w_1 A_j \otimes Z_j + w_2 A_j \otimes Z_I,
\end{align}
\]

其中,
\(j \in \{B, D\}\)
是亮化或变暗分支中的亲和矩阵
\(A_j\)
和特征图
\(Z_j\)
的索引。
\(w_1\)

\(w_2\)
是由
\(1\times 1\)
卷积生成的权重矩阵。在公式
6
中,第一项是为了发现由
COSE
学习到的
\(O_B\)

\(O_D\)
中显著的颜色偏移区域,而第二项旨在利用输入
\(Z_I\)
的学习权重来关注
\(O_B\)

\(O_D\)
的注意力图,以了解输入的显著区域中的偏移情况。

最后,将
\(f_B\)

\(f_D\)
和输入图像
\(I_x\)
结合起来,作为指导输入图像的探索的颜色偏移,生成最终的结果
\(I_y\)
,如下所示:

\[\begin{align}
I_y = w_4(BN(f_B) + BN(f_D) + w_3A_I\otimes Z_I) + I_x,
\end{align}
\]

其中,
\(BN(\cdot)\)
表示批量归一化,
\(w_3\)

\(w_4\)
是由
\(1\times 1\)
卷积生成的权重矩阵。

Loss Function

使用两个损失函数
\(\mathcal{L}_{pesudo}\)

\(\mathcal{L}_{output}\)
来训练。由于需要生成一个伪正常的特征图来帮助识别颜色偏移,使用
\(\mathcal{L}_{pesudo}\)
来为生成过程提供中间监督。

\[\begin{align}
\mathcal{L}_{pesudo} = ||F_N - GT||_1.
\end{align}
\]

\(\mathcal{L}_{output}\)
包含四个项,用于监督网络生成增强图像,即
\(L1\)
损失,余弦相似度
\(\mathcal{L}_{cos}\)
,结构相似性(
SSIM
)损失
\(\mathcal{L}_{ssim}\)

VGG
损失
\(\mathcal{L}_{vgg}\)

\(\mathcal{L}_{output}\)
可以表达为:

\[\begin{align}
\mathcal{L}_{output} = \lambda_1 \mathcal{L}_{L1} + \lambda_2 \mathcal{L}_{cos} + \lambda_3 \mathcal{L}_{ssim} + \lambda_4 \mathcal{L}_{vgg},
\end{align}
\]

其中,
\(\lambda_1\)

\(\lambda_2\)

\(\lambda_3\)

\(\lambda_4\)
是四个平衡超参数。整体损失函数为:

\[\begin{align}
\mathcal{L} = \lambda_p \mathcal{L}_{pesudo} + \lambda_o \mathcal{L}_{output},
\end{align}
\]

其中,
\(\lambda_p\)

\(\lambda_o\)
是两个平衡超参数。

Experiments




如果本文对你有帮助,麻烦点个赞或在看呗~
更多内容请关注 微信公众号【晓飞的算法工程笔记】

work-life balance.

前言

在上一篇文章:
OpenTelemetry 实战:从零实现分布式链路追踪
讲解了链路相关的实战,本次我们继续跟进如何使用 OpenTelemetry 集成 metrics 监控。

建议对指标监控不太熟的朋友可以先查看这篇前菜文章:
从 Prometheus 到 OpenTelemetry:指标监控的演进与实践

名称 作用 语言 版本
java-demo 发送 gRPC 请求的客户端 Java opentelemetry-agent: 2.4.0/SpringBoot: 2.7.14
k8s-combat 提供 gRPC 服务的服务端 Golang go.opentelemetry.io/otel: 1.28/ Go: 1.22
Jaeger trace 存储的服务端以及 TraceUI 展示 Golang jaegertracing/all-in-one:1.56
opentelemetry-collector-contrib OpenTelemetry 的 collector 服务端,用于收集 trace/metrics/logs 然后写入到远端存储 Golang otel/opentelemetry-collector-contrib:0.98.0
Prometheus 作为 metrics 的存储和展示组件,也可以用
VictoriaMetrics
等兼容 Prometheus 的存储替代。
Golang quay.io/prometheus/prometheus:v2.49.1
image.png

快速开始

以上是加入 metrics 之后的流程图,在原有的基础上会新增一个
Prometheus
组件,collector 会将 metrics 指标数据通过远程的 remote write 的方式写入到 Prometheus 中。

Prometheus 为了能兼容 OpenTelemetry 写入过来的数据,需要开启相关
特性
才可以。

如果是 docker 启动的话需要传入相关参数:

docker run  -d -p 9292:9090 --name prometheus \
-v /prometheus/prometheus.yml:/etc/prometheus/prometheus.yml \
quay.io/prometheus/prometheus:v2.49.1 \
--config.file=/etc/prometheus/prometheus.yml \
--storage.tsdb.path=/prometheus \
--web.console.libraries=/etc/prometheus/console_libraries \
--web.console.templates=/etc/prometheus/consoles \
--enable-feature=exemplar-storage \
--enable-feature=otlp-write-receiver

--enable-feature=otlp-write-receiver
最主要的就是这个参数,用于开启接收 OTLP 格式的数据。

但使用这个 Push 特性就会丧失掉 Prometheus 的许多 Pull 特性,比如服务发现,定时抓取等,不过也还好,Push 和 Pull 可以同时使用,原本使用 Pull 抓取的组件依然不受影响。

修改 OpenTelemetry-Collector

接着我们需要修改下 Collector 的配置:

exporters:
  debug:
  otlp:
    endpoint: "jaeger:4317"
    tls:
      insecure: true
  otlphttp/prometheus:
    endpoint: http://prometheus:9292/api/v1/otlp
    tls:
      insecure: true      

processors:
  batch:

service:
  pipelines:
    traces:
      receivers:
      - otlp
      processors: [batch]
      exporters:
      - otlp
      - debug        
    metrics:
      exporters:
      - otlphttp/prometheus
      - debug
      processors:
      - batch
      receivers:
      - otlp

这里我们在
exporter
中新增了一个
otlphttp/prometheus
的节点,用于指定导出
prometheus

endpoint
地址。

同时我们还需要在
server.metrics.exporters
中配置相同的 key:
otlphttp/prometheus

需要注意的是这里我们一定得是配置在
metrics.exporters
这个节点下,如果配置在
traces.exporters
下时,相当于是告诉 collector 讲 trace 的数据导出到
otlphttp/prometheus.endpoint
这个 endpoint 里了。

所以重点是需要理解这里的配对关系。

运行效果

这样我们只需要将应用启动之后就可以在 Prometheus 中查询到应用上报的指标了。

java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.traces.exporter=otlp \
-Dotel.metrics.exporter=otlp \
-Dotel.logs.exporter=none \
-Dotel.service.name=java-demo \
-Dotel.exporter.otlp.protocol=grpc \
-Dotel.propagators=tracecontext,baggage \
-Dotel.exporter.otlp.endpoint=http://127.0.0.1:5317 -jar target/demo-0.0.1-SNAPSHOT.jar

# Run go app
export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:5317 OTEL_RESOURCE_ATTRIBUTES=service.name=k8s-combat
./k8s-combat

因为我们在 collector 中开启了 Debug 的 exporter,所以可以看到以下日志:

2024-07-22T06:34:08.060Z	info	MetricsExporter	{"kind": "exporter", "data_type": "metrics", "name": "debug", "resource metrics": 1, "metrics": 18, "data points": 44}

此时是可以说明指标上传成功的。

然后我们打开
Prometheus
的地址:
http://127.0.0.1:9292/graph
便可以查询到 Java 应用和 Go 应用上报的指标。

OpenTelemetry 的 javaagent 会自动上报 JVM 相关的指标。


而在 Go 程序中我们还是需要显式的配置一些埋点:

func initMeterProvider() *sdkmetric.MeterProvider {  
    ctx := context.Background()  
  
    exporter, err := otlpmetricgrpc.New(ctx)  
    if err != nil {  
       log.Printf("new otlp metric grpc exporter failed: %v", err)  
    }  
    mp := sdkmetric.NewMeterProvider(  
       sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),  
       sdkmetric.WithResource(initResource()),  
    )    otel.SetMeterProvider(mp)  
    return mp  
}

mp := initMeterProvider()
defer func() {
	if err := mp.Shutdown(context.Background()); err != nil {
		log.Printf("Error shutting down meter provider: %v", err)
	}
}()

和 Tracer 类似,我们首先也得在 main 函数中调用
initMeterProvider()
函数来初始化 Meter,此时它会返回一个
sdkmetric.MeterProvider
对象。

OpenTelemetry Go 的 SDK 中已经提供了对 go runtime 的自动埋点,我们只需要调用相关函数即可:

err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second))
if err != nil {
    log.Fatal(err)
}

之后我们启动应用,在 Prometheus 中就可以看到 Go 应用上报的相关指标了。
image.png

runtime_uptime_milliseconds_total Go 的运行时指标

Prometheus
中展示指标的 UI 能力有限,通常我们都是配合
grafana
进行展示的。
image.png

手动上报指标

当然除了 SDK 自动上报的指标之外,我们也可以类似于 trace 那样手动上报一些指标;

比如我就想记录某个函数调用的次数。

var meter =  otel.Meter("test.io/k8s/combat")  
apiCounter, err = meter.Int64Counter(  
    "api.counter",  
    metric.WithDescription("Number of API calls."),  
    metric.WithUnit("{call}"),  
)  
if err != nil {  
    log.Err(err)  
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {  
    defer apiCounter.Add(ctx, 1)  
    return &pb.HelloReply{Message: fmt.Sprintf("hostname:%s, in:%s, md:%v", name, in.Name, md)}, nil  
}

只需要创建一个
Int64Counter
类型的指标,然后在需要埋点处调用它的函数
apiCounter.Add(ctx, 1)
即可。

image.png
之后便可以在
Prometheus
中查到这个指标了。

除此之外 OpenTelemetry 中的 metrics 定义和 Prometheus 也是类似的,还有以下几种类型:

  • Counter
    :单调递增计数器,比如可以用来记录订单数、总的请求数。
  • UpDownCounter
    :与 Counter 类似,只不过它可以递减。
  • Gauge
    :用于记录随时在变化的值,比如内存使用量、CPU 使用量等。
  • Histogram
    :通常用于记录请求延迟、响应时间等。

在 Java 中也提供有类似的 API 可以完成自定义指标:

messageInCounter = meter    
        .counterBuilder(MESSAGE_IN_COUNTER)    
        .setUnit("{message}")    
        .setDescription("The total number of messages received for this topic.")    
        .buildObserver();

对于 Gauge 类型的数据用法如下,使用
buildWithCallback
回调函数上报数据,
OpenTelemetry
会在框架层面每 30s 回调一次。

public static void registerObservers() {      
    Meter meter = MetricsRegistration.getMeter();      
      
    meter.gaugeBuilder("pulsar_producer_num_msg_send")      
            .setDescription("The number of messages published in the last interval")      
            .ofLongs()      
            .buildWithCallback(      
                    r -> recordProducerMetrics(r, ProducerStats::getNumMsgsSent));  
  
private static void recordProducerMetrics(ObservableLongMeasurement observableLongMeasurement, Function<ProducerStats, Long> getter) {      
    for (Producer producer : CollectionHelper.PRODUCER_COLLECTION.list()) {      
        ProducerStats stats = producer.getStats();      
        String topic = producer.getTopic();      
        if (topic.endsWith(RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX)) {      
            continue;      
        }        observableLongMeasurement.record(getter.apply(stats),      
                Attributes.of(PRODUCER_NAME, producer.getProducerName(), TOPIC, topic));      
    }}

更多具体用法可以参考官方文档链接:
https://opentelemetry.io/docs/languages/java/instrumentation/#metrics

如果我们不想将数据通过 collector 而是直接上报到 Prometheus 中,使用 OpenTelemetry 框架也是可以实现的。

我们只需要配置下环境变量:

export OTEL_METRICS_EXPORTER=prometheus

这样我们就可以访问
http://127.0.0.1:9464/metrics
获取到当前应用暴露出来的指标,此时就可以在
Prometheus
里配置好采集 job 来获取数据。

scrape_configs:
  - job_name: "k8s-combat"
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
      - targets: ["k8s-combat:9464"]   

这就是典型的 Pull 模型,而 OpenTelemetry 推荐使用的是 Push 模型,数据由 OpenTelemetry 进行采集然后推送到 Prometheus。

这两种模式各有好处:

Pull模型 Push 模型
优点 可以在一个集中的配置里管理所有的抓取端点,也可以为每一个应用单独配置抓取频次等数据。 在 OpenTelemetry 的 collector中可以集中对指标做预处理之后再将过滤后的数据写入 Prometheus,更加的灵活。
缺点 1. 预处理指标比较麻烦,所有的数据是到了 Prometheus 后再经过relabel处理后再写入存储。
2. 需要配置服务发现
1. 额外需要维护一个类似于 collector 这样的指标网关的组件

比如我们是用和 Prometheus 兼容的 VictoriaMetrics 采集了 istio 的相关指标,但里面的指标太多了,我们需要删除掉一部分。

就需要在采集任务里编写规则:

apiVersion: operator.victoriametrics.com/v1beta1  
kind: VMPodScrape  
metadata:  
  name: isito-pod-scrape  
spec:  
  podMetricsEndpoints:  
    - scheme: http  
      scrape_interval: "30s"  
      scrapeTimeout: "30s"  
      path: /stats/prometheus  
      metricRelabelConfigs:  
        - regex: ^envoy_.*|^url\_\_\_\_.*|istio_request_bytes_sum|istio_request_bytes_count|istio_response_bytes_sum|istio_request_bytes_sum|istio_request_duration_milliseconds_sum|istio_response_bytes_count|istio_request_duration_milliseconds_count|^ostrich_apigateway.*|istio_request_messages_total|istio_response_messages_total  
          action: drop_metrics  
  namespaceSelector:  
    any: true

换成在 collector 中处理后,这些逻辑都可以全部移动到 collector 中集中处理。

总结

metrics 的使用相对于 trace 更简单一些,不需要理解复杂的 context、span 等概念,只需要搞清楚有哪几种 metrics 类型,分别应用在哪些不同的场景即可。

参考链接: