关键修改在方案迭代

任务介绍

  • 任务名称:SCDB 支持 explain statement
  • 技术方向:SCQL
  • 任务难度:进阶🌟🌟
  • 任务预估完成时间:4周
  • 任务 Reviewer:@tongke6

详细要求

参照 MySQL 的 explain statement 和 Broker explain query 的实现,为 SCDB 增加执行 explain statement 的能力,通过 explain statement,可以查看 query 实际执行的执行图,以 graphviz dot 表示返回。

能力要求

  • 了解基本 git 操作,熟练使用 Go 语言
  • 熟悉基本的 SQL 用法
  • 熟悉 SCQL 的基本工作流程和代码结构

操作说明

架构研究

整体架构

SCDB模式采用client-server架构,客户端将query提交并等待,如果为DQL:服务端解析,编译,返回执行计划子图给客户端,客户端引擎仅执行可见子图;如果为DDL/DCL:服务端解析,编译并在服务端引擎执行,不返回执行结果给客户端(可能会返回报错等信息)

核心组件

App 结构充当主应用程序容器,用于保存所有核心组件和配置。

HTTP API 端点

SCDB 服务器公开了多个用于查询作的 REST 端点:

查询处理管道

SCDB 服务器实现了一个复杂的查询处理管道,用于处理 DQL(数据查询语言)和 DDL/DCL 作:

查询类型检测

服务器将查询分类为不同的类型以进行适当的处理:

会话管理

SCDB 服务器管理查询会话以跟踪执行状态和结果:

会话存储

会话存储在内存缓存中,并具有可配置的过期时间:

Cache Type*cache.Cache (go-cache library)

Expiration: Configured via SessionExpireTime

Cleanup: Automatic via SessionCheckInterval

引擎协调

SCDB 服务器协调多个 SCQLEngine 实例之间的执行:

分布式执行计划

服务器为每个引擎创建特定于参与方的执行计划:

安全验证(Security and Authentication)

SCDB 服务器通过多个层实施安全策略:

安全配置(Security Configuration)

错误处理及监控

源码解读

客户端发起query,配置时间,日志,错误信息等等;核心处理函数为submitAndGet

func (app *App) SubmitAndGetHandler(c *gin.Context) {
	timeStart := time.Now()
	logEntry := &logutil.MonitorLogEntry{
		ActionName: fmt.Sprintf("%v@%v", "SCDBSubmitAndGetHandler", c.FullPath()),
	}

	request := &scql.SCDBQueryRequest{}
	inputEncodingType, err := message.DeserializeFrom(c.Request.Body, request, c.Request.Header.Get("Content-Type"))
	if err != nil {
		logEntry.Reason = constant.ReasonInvalidRequestFormat
		logEntry.ErrorMsg = err.Error()
		logEntry.CostTime = time.Since(timeStart)
		logrus.Errorf("%v|ClientIP:%v", logEntry, c.ClientIP())
		c.String(http.StatusOK, errorResponse(scql.Code_BAD_REQUEST, "invalid request body", message.EncodingTypeJson))
		return
	}

	resp := app.submitAndGet(c.Request.Context(), request)
	body, _ := message.SerializeTo(resp, inputEncodingType)

	setResponseContentType(c, inputEncodingType)
	c.String(http.StatusOK, body)

	logEntry.RequestID = request.BizRequestId
	logEntry.SessionID = resp.ScdbSessionId
	logEntry.RawRequest = SCDBQueryRequestToLogString(request)
	logEntry.CostTime = time.Since(timeStart)

	if resp.Status.Code != int32(scql.Code_OK) {
		logEntry.Reason = constant.ReasonInvalidRequest
		logEntry.ErrorMsg = resp.Status.Message
		logrus.Errorf("%v|ClientIP:%v", logEntry, c.ClientIP())
		return
	}
	logrus.Infof("%v|ClientIP:%v", logEntry, c.ClientIP())
}

核心函数submitAndGet配置会话,认证信息;如果query为DQL,则调用submitAndGetDQL;如果不是DQL,调用runSQL函数

// submitAndGet implements core logic of SubmitAndGetHandler
func (app *App) submitAndGet(ctx context.Context, req *scql.SCDBQueryRequest) *scql.SCDBQueryResultResponse {
	session, err := newSession(ctx, req, app.storage)
	if err != nil {
		return newErrorSCDBQueryResultResponse(scql.Code_INTERNAL, err.Error())
	}

	// authentication
	auth.BindPartyAuthenticator(session, app.partyAuthenticator)
	if err = session.authenticateUser(req.User); err != nil {
		return newErrorSCDBQueryResultResponse(scql.Code_UNAUTHENTICATED, err.Error())
	}

	isDQL, err := isDQL(req.Query)
	if err != nil {
		return newErrorSCDBQueryResultResponse(scql.Code_SQL_PARSE_ERROR, err.Error())
	}
	session.isDQLRequest = isDQL

	if isDQL {
		return app.submitAndGetDQL(ctx, session)
	}
	app.runSQL(session)
	return session.result
}

submitAndGetDQL配置状态,报错等信息,核心在runDQL中实现

// submitAndGetDQL executes query and gets result back
func (app *App) submitAndGetDQL(ctx context.Context, s *session) *scql.SCDBQueryResultResponse {
	resp, err := app.runDQL(ctx, s, false)
	if err != nil {
		var st *status.Status
		if errors.As(err, &st) {
			return newErrorFetchResponse(s.id, st.Code(), st.Message())
		}
		return newErrorFetchResponse(s.id, scql.Code_INTERNAL, err.Error())
	}

	s.setResultWithOutputColumnsAndAffectedRows(resp.OutColumns, resp.AffectedRows)

	return s.result
}

runDQL函数调用编译函数,翻译函数,配置日志,会话等信息,核心是编译分布式执行计划图信息,并返回给对应的客户端

// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
	compileReq, err := app.buildCompileRequest(ctx, s)
	if err != nil {
		return nil, err
	}
	intrpr := interpreter.NewInterpreter()
	compiledPlan, err := intrpr.Compile(ctx, compileReq)

	if err != nil {
		return nil, err
	}

	s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
	s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
	logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())

	sessionStartParams := &scql.JobStartParams{
		JobId:         s.id,
		SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
		TimeZone:      s.GetSessionVars().GetTimeZone(),
	}
	var partyCodes []string
	for _, p := range compiledPlan.Parties {
		partyCodes = append(partyCodes, p.GetCode())
	}

	var partyInfo *graph.PartyInfo
	{
		db := s.GetSessionVars().Storage
		var users []storage.User
		result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
		if result.Error != nil {
			return nil, result.Error
		}
		partyMap := make(map[string]*graph.Participant)
		for _, u := range users {
			participant := &graph.Participant{
				PartyCode: u.PartyCode,
				Endpoints: strings.Split(u.EngineEndpoints, ";"),
				Token:     u.EngineToken,
				PubKey:    u.EnginePubKey,
			}
			partyMap[u.PartyCode] = participant
		}
		participants := make([]*graph.Participant, 0, len(partyCodes))
		for i, code := range partyCodes {
			party, exists := partyMap[code]
			if !exists {
				return nil, fmt.Errorf("could not find info for party %s", code)
			}
			participants = append(participants, party)
			sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
				Code:      code,
				Name:      code,
				Host:      party.Endpoints[0],
				Rank:      int32(i),
				PublicKey: party.PubKey,
			})
		}
		partyInfo = graph.NewPartyInfo(participants)
	}

	pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
	for party, graph := range compiledPlan.SubGraphs {
		startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
		if !ok {
			return nil, fmt.Errorf("failed to clone session start params")
		}
		startParams.PartyCode = party
		cbURL := url.URL{
			Scheme: app.config.Protocol,
			Host:   app.config.SCDBHost,
			Path:   engineCallbackPath,
		}
		pbRequests[party] = &scql.RunExecutionPlanRequest{
			JobParams:     startParams,
			Graph:         graph,
			Async:         async,
			CallbackUrl:   cbURL.String(),
			GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
		}
	}

	engineClient := executor.NewEngineClient(
		app.config.Engine.ClientMode,
		app.config.Engine.ClientTimeout,
		&bcfg.TLSConf{
			Mode:       app.config.Engine.TLSCfg.Mode,
			CertPath:   app.config.Engine.TLSCfg.CertFile,
			KeyPath:    app.config.Engine.TLSCfg.KeyFile,
			CACertPath: app.config.Engine.TLSCfg.CACertFile,
		},
		app.config.Engine.ContentType,
		app.config.Engine.Protocol,
	)
	s.engineStub = executor.NewEngineStub(
		s.id,
		app.config.Protocol,
		app.config.SCDBHost,
		engineCallbackPath,
		engineClient,
	)

	var outputNames []string
	for _, col := range compiledPlan.GetSchema().GetColumns() {
		outputNames = append(outputNames, col.GetName())
	}

	exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
	if err != nil {
		return nil, err
	}
	s.executor = exec
	resp, err := exec.RunExecutionPlan(ctx, async)
	if err != nil {
		return nil, err
	}

	if async {
		// In async mode, result will be set in callback
		return nil, nil
	}
	return resp, nil
}

如果submitAndGet调用isDQL判断不是DQL,则会调用runSQL函数,配置状态,报错,模式等信息,核心在run函数中实现

// runSQL run DDL/DCL
func (app *App) runSQL(s *session) {
	var err error = nil
	rt := []*scql.Tensor{}
	defer func() {
		if err != nil {
			var status *status.Status
			if errors.As(err, &status) {
				s.setStatus(int32(status.Code()), status.Message())
			} else {
				s.setStatus(int32(scql.Code_INTERNAL), err.Error())
			}
			logrus.Errorf("Failed to runSQL, sessionid: %v, err: %v", s.id, err.Error())
		} else {
			s.setResult(rt)
		}
	}()
	needSchema, err := isQueryNeedInfoSchema(s.request.Query)
	if err != nil {
		return
	}
	var is infoschema.InfoSchema
	// skip query infoschema from storage for create user statement
	if needSchema {
		is, err = storage.QueryDBInfoSchema(s.GetSessionVars().Storage, s.request.DbName)
		if err != nil {
			return
		}
	}

	rt, err = scdbexecutor.Run(s, s.request.Query, is)
}

run函数parse-plan-exe三阶段执行DDL/DCL,parse阶段核心在ParseOneStmt,计划阶段核心在BuildLogicalPlan,执行阶段较为复杂,总体采用open-next-close三阶段

// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
	// Step 1: Parsing
	p := parser.New()
	ast, err := p.ParseOneStmt(stmt, "", "")
	if err != nil {
		return nil, err
	}
	if err := core.Preprocess(ctx, ast, is); err != nil {
		return nil, err
	}

	// Step 2: Planning
	lp, _, err := core.BuildLogicalPlan(context.Background(), ctx, ast, is)
	if err != nil {
		return nil, err
	}

	// Step 3: Executing
	eb := newExecutorBuilder(ctx, is)
	exec := eb.build(lp)
	if eb.err != nil {
		return nil, eb.err
	}
	if err := exec.Open(context.Background()); err != nil {
		return nil, err
	}

	retTypes := []*types.FieldType{}
	for _, c := range lp.Schema().Columns {
		retTypes = append(retTypes, c.RetType)
	}

	ck := chunk.New(retTypes, ResultMaxRows, ResultMaxRows)
	var result []*scql.Tensor
	for {
		if err := exec.Next(context.Background(), ck); err != nil {
			return nil, err
		}
		if result, err = mergeResultFromChunk(ck, lp, result); err != nil {
			return nil, err
		}
		if ck.NumRows() == 0 {
			break
		}
	}
	return result, err
}

方案迭代

核心问题:explain是作为DDL/DCL还是作为DQL处理

1.作为DDL/DCL

最开始观察show语句在DDL/DCKL中实现,但DDL/DCL没有编译执行计划图的过程,故不考虑

2.作为DQL

DQL执行流程存在编译执行计划图的过程,更适合

最终选取explain作为DQL执行实现

首先确保能接受explain语句,在isDQL中扩展

func isDQL(sql string) (bool, error) {
	p := parser.New()
	stmt, err := p.ParseOneStmt(sql, "", "")
	if err != nil {
		return false, err
	}

	switch stmt.(type) {
	case *ast.SelectStmt, *ast.UnionStmt, *ast.ExplainStmt:
		return true, nil
	}

	return false, nil
}

在编译阶段,扩展buildexplain以支持explain能支持select和union语句

func (b *PlanBuilder) buildExplain(ctx context.Context, explain *ast.ExplainStmt) (Plan, error) {
	switch stmt := explain.Stmt.(type) {
	case *ast.ShowStmt:
		return b.buildShow(ctx, stmt)
	case *ast.SelectStmt:
		return b.buildSelect(ctx, stmt)
	case *ast.UnionStmt:
		return b.buildUnion(ctx, stmt)
	default:
		return nil, ErrUnsupportedType.GenWithStack("Unsupported explain stmt %T", explain.Stmt)
	}
}

编译计划后,跳过发送给SCDB engine步骤,直接将dot形式的计划图返回

	isExplain, err := isExplainQuery(s.request.GetQuery())
	if err != nil {
		return nil, err
	}

	if isExplain {
		return &scql.SCDBQueryResultResponse{
			Status: &scql.Status{
				Code:    int32(scql.Code_OK),
				Message: compiledPlan.GetExplain().GetExeGraphDot(),
			},
			OutColumns:    nil,
			ScdbSessionId: s.id,
			AffectedRows:  0,
			Warnings:      nil,
		}, nil
	}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐