OSCP代码季-SCDB 支持 explain statement
为SCDB增加explain statement功能,支持查看查询执行计划图(graphviz dot格式)
关键修改在方案迭代
任务介绍
- 任务名称:SCDB 支持 explain statement
- 技术方向:SCQL
- 任务难度:进阶🌟🌟
- 任务预估完成时间:4周
- 任务 Reviewer:@tongke6
详细要求
参照 MySQL 的 explain statement 和 Broker explain query 的实现,为 SCDB 增加执行 explain statement 的能力,通过 explain statement,可以查看 query 实际执行的执行图,以 graphviz dot 表示返回。
- 开发要求:请添加必要的单元测试和集成测试,并在 SCDB 模式下验证效果
- 代码规范:GO语言规范可参考:https://github.com/uber-go/guide/blob/master/style.md 整体和现有代码风格保持一致
- 提交说明:开发完成后,关联该 ISSUE 并提交代码至 https://github.com/secretflow/scql
能力要求
- 了解基本 git 操作,熟练使用 Go 语言
- 熟悉基本的 SQL 用法
- 熟悉 SCQL 的基本工作流程和代码结构
操作说明
- Broker explain query 接口定义:https://github.com/secretflow/scql/blob/main/api/broker.proto#L68
- 相关代码逻辑:https://github.com/secretflow/scql/blob/main/pkg/broker/services/intra/query_handler.go#L319
- 建议1:可先参考手册体验 P2P 模式,并尝试 explain query:SecretFlow
- 建议2:可考虑复用SCDB API 中的 SubmitAndGet 接口,通过增加额外参数实现对 explain statement 的支持。
架构研究
整体架构
SCDB模式采用client-server架构,客户端将query提交并等待,如果为DQL:服务端解析,编译,返回执行计划子图给客户端,客户端引擎仅执行可见子图;如果为DDL/DCL:服务端解析,编译并在服务端引擎执行,不返回执行结果给客户端(可能会返回报错等信息)
核心组件
App
结构充当主应用程序容器,用于保存所有核心组件和配置。
HTTP API 端点
SCDB 服务器公开了多个用于查询作的 REST 端点:
查询处理管道
SCDB 服务器实现了一个复杂的查询处理管道,用于处理 DQL(数据查询语言)和 DDL/DCL 作:
查询类型检测
服务器将查询分类为不同的类型以进行适当的处理:
会话管理
SCDB 服务器管理查询会话以跟踪执行状态和结果:
会话存储
会话存储在内存缓存中,并具有可配置的过期时间:
Cache Type: *cache.Cache
(go-cache library)
Expiration: Configured via SessionExpireTime
Cleanup: Automatic via SessionCheckInterval
引擎协调
SCDB 服务器协调多个 SCQLEngine 实例之间的执行:
分布式执行计划
服务器为每个引擎创建特定于参与方的执行计划:
安全验证(Security and Authentication)
SCDB 服务器通过多个层实施安全策略:
安全配置(Security Configuration)
错误处理及监控
源码解读
客户端发起query,配置时间,日志,错误信息等等;核心处理函数为submitAndGet
func (app *App) SubmitAndGetHandler(c *gin.Context) {
timeStart := time.Now()
logEntry := &logutil.MonitorLogEntry{
ActionName: fmt.Sprintf("%v@%v", "SCDBSubmitAndGetHandler", c.FullPath()),
}
request := &scql.SCDBQueryRequest{}
inputEncodingType, err := message.DeserializeFrom(c.Request.Body, request, c.Request.Header.Get("Content-Type"))
if err != nil {
logEntry.Reason = constant.ReasonInvalidRequestFormat
logEntry.ErrorMsg = err.Error()
logEntry.CostTime = time.Since(timeStart)
logrus.Errorf("%v|ClientIP:%v", logEntry, c.ClientIP())
c.String(http.StatusOK, errorResponse(scql.Code_BAD_REQUEST, "invalid request body", message.EncodingTypeJson))
return
}
resp := app.submitAndGet(c.Request.Context(), request)
body, _ := message.SerializeTo(resp, inputEncodingType)
setResponseContentType(c, inputEncodingType)
c.String(http.StatusOK, body)
logEntry.RequestID = request.BizRequestId
logEntry.SessionID = resp.ScdbSessionId
logEntry.RawRequest = SCDBQueryRequestToLogString(request)
logEntry.CostTime = time.Since(timeStart)
if resp.Status.Code != int32(scql.Code_OK) {
logEntry.Reason = constant.ReasonInvalidRequest
logEntry.ErrorMsg = resp.Status.Message
logrus.Errorf("%v|ClientIP:%v", logEntry, c.ClientIP())
return
}
logrus.Infof("%v|ClientIP:%v", logEntry, c.ClientIP())
}
核心函数submitAndGet配置会话,认证信息;如果query为DQL,则调用submitAndGetDQL;如果不是DQL,调用runSQL函数
// submitAndGet implements core logic of SubmitAndGetHandler
func (app *App) submitAndGet(ctx context.Context, req *scql.SCDBQueryRequest) *scql.SCDBQueryResultResponse {
session, err := newSession(ctx, req, app.storage)
if err != nil {
return newErrorSCDBQueryResultResponse(scql.Code_INTERNAL, err.Error())
}
// authentication
auth.BindPartyAuthenticator(session, app.partyAuthenticator)
if err = session.authenticateUser(req.User); err != nil {
return newErrorSCDBQueryResultResponse(scql.Code_UNAUTHENTICATED, err.Error())
}
isDQL, err := isDQL(req.Query)
if err != nil {
return newErrorSCDBQueryResultResponse(scql.Code_SQL_PARSE_ERROR, err.Error())
}
session.isDQLRequest = isDQL
if isDQL {
return app.submitAndGetDQL(ctx, session)
}
app.runSQL(session)
return session.result
}
submitAndGetDQL配置状态,报错等信息,核心在runDQL中实现
// submitAndGetDQL executes query and gets result back
func (app *App) submitAndGetDQL(ctx context.Context, s *session) *scql.SCDBQueryResultResponse {
resp, err := app.runDQL(ctx, s, false)
if err != nil {
var st *status.Status
if errors.As(err, &st) {
return newErrorFetchResponse(s.id, st.Code(), st.Message())
}
return newErrorFetchResponse(s.id, scql.Code_INTERNAL, err.Error())
}
s.setResultWithOutputColumnsAndAffectedRows(resp.OutColumns, resp.AffectedRows)
return s.result
}
runDQL函数调用编译函数,翻译函数,配置日志,会话等信息,核心是编译分布式执行计划图信息,并返回给对应的客户端
// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
compileReq, err := app.buildCompileRequest(ctx, s)
if err != nil {
return nil, err
}
intrpr := interpreter.NewInterpreter()
compiledPlan, err := intrpr.Compile(ctx, compileReq)
if err != nil {
return nil, err
}
s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())
sessionStartParams := &scql.JobStartParams{
JobId: s.id,
SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
TimeZone: s.GetSessionVars().GetTimeZone(),
}
var partyCodes []string
for _, p := range compiledPlan.Parties {
partyCodes = append(partyCodes, p.GetCode())
}
var partyInfo *graph.PartyInfo
{
db := s.GetSessionVars().Storage
var users []storage.User
result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
if result.Error != nil {
return nil, result.Error
}
partyMap := make(map[string]*graph.Participant)
for _, u := range users {
participant := &graph.Participant{
PartyCode: u.PartyCode,
Endpoints: strings.Split(u.EngineEndpoints, ";"),
Token: u.EngineToken,
PubKey: u.EnginePubKey,
}
partyMap[u.PartyCode] = participant
}
participants := make([]*graph.Participant, 0, len(partyCodes))
for i, code := range partyCodes {
party, exists := partyMap[code]
if !exists {
return nil, fmt.Errorf("could not find info for party %s", code)
}
participants = append(participants, party)
sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
Code: code,
Name: code,
Host: party.Endpoints[0],
Rank: int32(i),
PublicKey: party.PubKey,
})
}
partyInfo = graph.NewPartyInfo(participants)
}
pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
for party, graph := range compiledPlan.SubGraphs {
startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
if !ok {
return nil, fmt.Errorf("failed to clone session start params")
}
startParams.PartyCode = party
cbURL := url.URL{
Scheme: app.config.Protocol,
Host: app.config.SCDBHost,
Path: engineCallbackPath,
}
pbRequests[party] = &scql.RunExecutionPlanRequest{
JobParams: startParams,
Graph: graph,
Async: async,
CallbackUrl: cbURL.String(),
GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
}
}
engineClient := executor.NewEngineClient(
app.config.Engine.ClientMode,
app.config.Engine.ClientTimeout,
&bcfg.TLSConf{
Mode: app.config.Engine.TLSCfg.Mode,
CertPath: app.config.Engine.TLSCfg.CertFile,
KeyPath: app.config.Engine.TLSCfg.KeyFile,
CACertPath: app.config.Engine.TLSCfg.CACertFile,
},
app.config.Engine.ContentType,
app.config.Engine.Protocol,
)
s.engineStub = executor.NewEngineStub(
s.id,
app.config.Protocol,
app.config.SCDBHost,
engineCallbackPath,
engineClient,
)
var outputNames []string
for _, col := range compiledPlan.GetSchema().GetColumns() {
outputNames = append(outputNames, col.GetName())
}
exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
if err != nil {
return nil, err
}
s.executor = exec
resp, err := exec.RunExecutionPlan(ctx, async)
if err != nil {
return nil, err
}
if async {
// In async mode, result will be set in callback
return nil, nil
}
return resp, nil
}
如果submitAndGet调用isDQL判断不是DQL,则会调用runSQL函数,配置状态,报错,模式等信息,核心在run函数中实现
// runSQL run DDL/DCL
func (app *App) runSQL(s *session) {
var err error = nil
rt := []*scql.Tensor{}
defer func() {
if err != nil {
var status *status.Status
if errors.As(err, &status) {
s.setStatus(int32(status.Code()), status.Message())
} else {
s.setStatus(int32(scql.Code_INTERNAL), err.Error())
}
logrus.Errorf("Failed to runSQL, sessionid: %v, err: %v", s.id, err.Error())
} else {
s.setResult(rt)
}
}()
needSchema, err := isQueryNeedInfoSchema(s.request.Query)
if err != nil {
return
}
var is infoschema.InfoSchema
// skip query infoschema from storage for create user statement
if needSchema {
is, err = storage.QueryDBInfoSchema(s.GetSessionVars().Storage, s.request.DbName)
if err != nil {
return
}
}
rt, err = scdbexecutor.Run(s, s.request.Query, is)
}
run函数parse-plan-exe三阶段执行DDL/DCL,parse阶段核心在ParseOneStmt,计划阶段核心在BuildLogicalPlan,执行阶段较为复杂,总体采用open-next-close三阶段
// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
// Step 1: Parsing
p := parser.New()
ast, err := p.ParseOneStmt(stmt, "", "")
if err != nil {
return nil, err
}
if err := core.Preprocess(ctx, ast, is); err != nil {
return nil, err
}
// Step 2: Planning
lp, _, err := core.BuildLogicalPlan(context.Background(), ctx, ast, is)
if err != nil {
return nil, err
}
// Step 3: Executing
eb := newExecutorBuilder(ctx, is)
exec := eb.build(lp)
if eb.err != nil {
return nil, eb.err
}
if err := exec.Open(context.Background()); err != nil {
return nil, err
}
retTypes := []*types.FieldType{}
for _, c := range lp.Schema().Columns {
retTypes = append(retTypes, c.RetType)
}
ck := chunk.New(retTypes, ResultMaxRows, ResultMaxRows)
var result []*scql.Tensor
for {
if err := exec.Next(context.Background(), ck); err != nil {
return nil, err
}
if result, err = mergeResultFromChunk(ck, lp, result); err != nil {
return nil, err
}
if ck.NumRows() == 0 {
break
}
}
return result, err
}
方案迭代
核心问题:explain是作为DDL/DCL还是作为DQL处理
1.作为DDL/DCL
最开始观察show语句在DDL/DCKL中实现,但DDL/DCL没有编译执行计划图的过程,故不考虑
2.作为DQL
DQL执行流程存在编译执行计划图的过程,更适合
最终选取explain作为DQL执行实现
首先确保能接受explain语句,在isDQL中扩展
func isDQL(sql string) (bool, error) {
p := parser.New()
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return false, err
}
switch stmt.(type) {
case *ast.SelectStmt, *ast.UnionStmt, *ast.ExplainStmt:
return true, nil
}
return false, nil
}
在编译阶段,扩展buildexplain以支持explain能支持select和union语句
func (b *PlanBuilder) buildExplain(ctx context.Context, explain *ast.ExplainStmt) (Plan, error) {
switch stmt := explain.Stmt.(type) {
case *ast.ShowStmt:
return b.buildShow(ctx, stmt)
case *ast.SelectStmt:
return b.buildSelect(ctx, stmt)
case *ast.UnionStmt:
return b.buildUnion(ctx, stmt)
default:
return nil, ErrUnsupportedType.GenWithStack("Unsupported explain stmt %T", explain.Stmt)
}
}
编译计划后,跳过发送给SCDB engine步骤,直接将dot形式的计划图返回
isExplain, err := isExplainQuery(s.request.GetQuery())
if err != nil {
return nil, err
}
if isExplain {
return &scql.SCDBQueryResultResponse{
Status: &scql.Status{
Code: int32(scql.Code_OK),
Message: compiledPlan.GetExplain().GetExeGraphDot(),
},
OutColumns: nil,
ScdbSessionId: s.id,
AffectedRows: 0,
Warnings: nil,
}, nil
}
更多推荐
所有评论(0)