Iawen's Blog

我喜欢这样自由的随手涂鸦, 因为我喜欢风......

DGraph 是由前 Google 员工 Manish Rai Jain 离职创业后, 在 2016 年推出的图数据库产品, 基于 Go 语言编写, 底层数据模型是 RDF, 存储引擎基于 BadgerDB 改造, 使用 RAFT 保证数据读写的强一致性。基本介绍和使用请移步: Dgraph 入门与学习, 本篇主要从源码的角度来分析一下Dgraph Query 的流程。

先来一张大致的 Dgraph Query 函数调用堆栈图:
0

1. 一些主要数据结构

Dgraph Query 中用到的主要数据结构, 我也大致罗列一下, 主要有一下几个:

  • gql.GraphQuery
  • api.Request
  • gql.Result: 结构包含查询列表、其对应的变量使用列表和变异块
  • query.Request: 包装执行查询时使用的状态。最初设置了 ReadTs、Cache 和 GqlQuery。处理查询时填充子图、变量和延迟
  • edgraph.queryContext: 查询上下文, 用于传递处理查询、突变或 upsert 请求所需的所有变量
  • query.SubGraph
  • worker.functionContext

1.1 gql.GraphQuery

// gql.GraphQuery - 以树格式存储解析的查询。在处理查询之前, query.SubGraph将转换为 pb.y used
type GraphQuery struct {
    UID        []uint64
    Attr       string
    Langs      []string
    Alias      string
    IsCount    bool
    IsInternal bool
    IsGroupby  bool
    Var        string
    NeedsVar   []VarContext
    Func       *Function
    Expand     string // Which variable to expand with.

    Args map[string]string
    // Query can have multiple sort parameters.
    Order            []*pb.Order
    Children         []*GraphQuery
    Filter           *FilterTree
    MathExp          *MathTree
    Normalize        bool
    Recurse          bool
    RecurseArgs      RecurseArgs
    ShortestPathArgs ShortestPathArgs
    Cascade          []string
    IgnoreReflex     bool
    Facets           *pb.FacetParams
    FacetsFilter     *FilterTree
    GroupbyAttrs     []GroupByAttr
    FacetVar         map[string]string
    FacetsOrder      []*FacetOrder

    // 用于启用 ACL 的查询以将结果缩减为仅可访问的参数
    AllowedPreds []string

    // 下面的内部字段. 
    // 如果 gq.fragment 是非空的, 那么它是一个片段引用/传播
    fragment string

    // True for blocks that don't have a starting function and hence no starting nodes. They are
    // used to aggregate and get variables defined in another block.
    // 对于没有起始功能且因此没有起始节点的块是正确的。 它们用于聚合和获取在另一个块中定义的变量。
    IsEmpty bool
}

1.2 api.Request

// api.Request -
type Request struct {
        StartTs    uint64             `protobuf:"varint,1,opt,name=start_ts,json=startTs,proto3" json:"start_ts,omitempty"`
        Query      string             `protobuf:"bytes,4,opt,name=query,proto3" json:"query,omitempty"`
        Vars       map[string]string  `protobuf:"bytes,5,rep,name=vars,proto3" json:"vars,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
        ReadOnly   bool               `protobuf:"varint,6,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"`
        BestEffort bool               `protobuf:"varint,7,opt,name=best_effort,json=bestEffort,proto3" json:"best_effort,omitempty"`
        Mutations  []*Mutation        `protobuf:"bytes,12,rep,name=mutations,proto3" json:"mutations,omitempty"`
        CommitNow  bool               `protobuf:"varint,13,opt,name=commit_now,json=commitNow,proto3" json:"commit_now,omitempty"`
        RespFormat Request_RespFormat `protobuf:"varint,14,opt,name=resp_format,json=respFormat,proto3,enum=api.Request_RespFormat" json:"resp_format,omitempty"`
        Hash       string             `protobuf:"bytes,15,opt,name=hash,proto3" json:"hash,omitempty"`
}

1.3 gql.Result

// gql.Result - 结构包含查询列表、其对应的变量使用列表和变异块
type Result struct {
    Query     []*GraphQuery
    QueryVars []*Vars
    Schema    *pb.SchemaRequest
}

1.4 query.Request

// query.Request - 包装执行查询时使用的状态。最初设置了ReadTs、Cache 和 GqlQuery。处理查询时填充子图、变量和延迟
type Request struct {
    ReadTs   uint64 // ReadTs for the transaction.
    Cache    int    // 0 represents use txn cache, 1 represents not to use cache.
    Latency  *Latency
    GqlQuery *gql.Result

    Subgraphs []*SubGraph

    Vars map[string]varValue
}

1.5 edgraph.queryContext

// edgraph.queryContext -
type queryContext struct {
    // 尚未解析的请求, 包含一个查询或多个突变或两者都有(在 upsert 的情况下)
    req *api.Request
    // 解析 req.Mutations 后的突变列表
    gmuList []*gql.Mutation
    // 解析 req.Query 后的结果
    gqlRes gql.Result
    // 查询(修改后的)中使用的条件变量, 用于确定 Upsert 中的条件是否为真。如果相应的突变不是条件 upsert, 则该字符串将为空。 请注意, len(condVars) == len(gmuList)
    condVars []string
    // 存储从变量名称到 UID 变量的 UID 的映射。 这些变量要么是用于 Upsert 条件的虚拟变量, 要么是用于传入请求的突变块中的变量
    uidRes map[string][]string
    // 存储 mutation 请求块中使用的变量(从变量名到值)的映射
    valRes map[string]map[uint64]types.Val
    // 存储延迟数
    latency *query.Latency
    // 存储在整个查询处理过程中使用的 opencensus span
    span *trace.Span
    // 指示给定的请求是否来自 graphql 管理员。
    graphql bool
    // 存储正在处理查询的 GraphQL 字段。仅当请求是来自 GraphQL 层的查询时才会设置, 否则将为 nil。例如 nil 案例: 在 DQL 查询的情况下, 从 GraphQL 层执行突变
    gqlField gqlSchema.Field
    // 维护将作为此请求的一部分插入的 nquad 数量。在某些情况下(主要是 upserts), 要插入的 nquad 数量可能会很大(我们已经看到高达 1B)并导致 OOM。我们限制了可以在单个请求中插入的 nquad 数量。
    nquadsCount int
}

1.6 query.SubGraph

// query.SubGraph 
type SubGraph struct {
    ReadTs      uint64
    Cache       int
    Attr        string
    UnknownAttr bool
    // read only parameters which are populated before the execution of the query and are used to
    // execute this query.
    Params params

    // count stores the count of an edge (predicate). There would be one value corresponding to each
    // uid in SrcUIDs.
    counts []uint32
    // valueMatrix is a slice of ValueList. If this SubGraph is for a scalar predicate type, then
    // there would be one list for each uid in SrcUIDs storing the value of the predicate.
    // The individual elements of the slice are a ValueList because we support scalar predicates
    // of list type. For non-list type scalar predicates, there would be only one value in every
    // ValueList.
    valueMatrix []*pb.ValueList
    // uidMatrix is a slice of List. There would be one List corresponding to each uid in SrcUIDs.
    // In graph terms, a list is a slice of outgoing edges from a node.
    uidMatrix []*pb.List

    // facetsMatrix contains the facet values. There would a list corresponding to each uid in
    // uidMatrix.
    facetsMatrix []*pb.FacetsList
    ExpandPreds  []*pb.ValueList
    GroupbyRes   []*groupResults // one result for each uid list.
    LangTags     []*pb.LangList

    // SrcUIDs is a list of unique source UIDs. They are always copies of destUIDs
    // of parent nodes in GraphQL structure.
    SrcUIDs *pb.List
    // SrcFunc specified using func. Should only be non-nil at root. At other levels,
    // filters are used.
    SrcFunc *Function

    FilterOp     string
    Filters      []*SubGraph // List of filters specified at the current node.
    facetsFilter *pb.FilterTree
    MathExp      *mathTree
    Children     []*SubGraph // children of the current node, should be empty for leaf nodes.

    // destUIDs is a list of destination UIDs, after applying filters, pagination.
    DestUIDs *pb.List
    List     bool // whether predicate is of list type

    pathMeta *pathMetadata
}

1.7 worker.functionContext

// worker.functionContext - 
type functionContext struct {
    tokens        []string
    geoQuery      *types.GeoQueryData
    intersectDest bool

    // 供compareAttr 函数使用 eqTokens。它存储与每个函数参数对应的值。`eq` 函数可能有多个参数, 但其他 compareAttr 函数只有一个参数。
    // TODO(@Animesh): 更改可以更好地解释其用途的字段名称。检查我们是否真的需要所有 ineqValue、eqTokens、tokens
    eqTokens       []types.Val
    ineqValueToken []string
    n              int
    threshold      []int64
    uidsPresent    []uint64
    fname          string
    fnType         FuncType
    regex          *cregexp.Regexp
    isFuncAtRoot   bool
    isStringFn     bool
    atype          types.TypeID
}

func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) error {}

2. 流程说明

2.1 服务初始化与监听

dgraph/cmd/alpha 目录是Dgraph alpha 的程序入口, 主要复制程序的初始化以及各种服务的监听设置, 如图:
1

而在http 处理程序, 主要就是封装一个api.Request结构体, 然后调用服务的提供者edgraph.Server就是:

// dgraph/cmd/alpha/http.go - queryHandler
req := api.Request{
    Vars:    params.Variables,
    Query:   params.Query,
    StartTs: startTs,
    Hash:    hash,
}
// dgraph/cmd/alpha/http.go - queryHandler
resp, err := (&edgraph.Server{}).Query(ctx, &req)
if err != nil {
    x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
    return
}

留意这里的edgraph.Server, 其实也是 RPC 服务的提供者:

// dgraph/cmd/alpha/run.go - serveGRPC
api.RegisterDgraphServer(s, &edgraph.Server{})

2.2 服务的提供者: edgraph.Server

首先, Query 函数是对外提供的接口, 应该只是简单的预处理了一些前置条件。然后真正的操作还是调用 doQuery 函数:

// edgraph/server.go - Query
return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)})

在 doQuery 函数里, 我们留意到结构体: queryContext, 这个结构体我们上面已经提到, 它是查询上下文, 用于传递处理查询、突变或 upsert 请求所需的所有变量:

// edgraph/server.go - doQuery
qc := &queryContext{
    req:      req.req,
    latency:  l,
    span:     span,
    graphql:  isGraphQL,
    gqlField: req.gqlField,
}

// parseRequest 里又转而调用了 validateQuery 函数
if rerr = parseRequest(qc); rerr != nil {
    return
}

这些还都只是在查询上下文的分析和验证(parseRequest, validateQuery)上下功夫, 进一步完善和填充queryContext结构体:

// edgraph/server.go - Query
if resp, rerr = processQuery(ctx, qc); rerr != nil {
    // if rerr is just some error from GraphQL encoding, then we need to continue the normal
    // execution ignoring the error as we still need to assign latency info to resp. If we can
    // change the api.Response proto to have a field to contain GraphQL errors, that would be
    // great. Otherwise, we will have to do such checks a lot and that would make code ugly.
    if qc.gqlField != nil && x.IsGqlErrorList(rerr) {
        gqlErrs = rerr
    } else {
        return
    }
}

来到processQuery, 先是构造 query.Request 结构, 逐步填充这个结构, 然后调用实际业务逻辑的实施者query.Request:

// edgraph/server.go - processQuery
qr := query.Request{
    Latency:  qc.latency,
    GqlQuery: &qc.gqlRes,
}

// Core processing happens here.
er, err := qr.Process(ctx)

2.3 服务的实施者: query.Request

query.Request 函数的开头就是对ProcessQuery的调用:

// query/query.go - Process
err = req.ProcessQuery(ctx)

在 ProcessQuery 函数里有连个遍历, 一个是完成了 query.Request.GqlQuery.Query 到 query.SubGraph 的转换, 一个则是查询的实施:

// query/query.go - ProcessQuery
sg, err := ToSubGraph(ctx, gq)

// query/query.go - ToSubGraph
func ToSubGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) {
	sg, err := newGraph(ctx, gq)
	if err != nil {
		return nil, err
	}
	err = treeCopy(gq, sg)
	if err != nil {
		return nil, err
	}
	return sg, err
}

// query/query.go - ProcessQuery
switch {
case sg.Params.Alias == "shortest":
    // We allow only one shortest path block per query.
    go func() {
        shortestSg, err = shortestPath(ctx, sg)
        errChan <- err
    }()
case sg.Params.Recurse:
    go func() {
        errChan <- recurse(ctx, sg)
    }()
default:
    go ProcessGraph(ctx, sg, nil, errChan)
}

跟踪发现, shortestPath 和 recurse 里最终都会调用到ProcessGraph。 接下来就是使用通道阻塞, 等待 ProcessGraph 的处理结果。
query/query.go 文件头部有一段注释很能说明 ProcessGraph 的处理方式:

/*
 * QUERY:
 * Let's take this query from GraphQL as example:
 * {
 *   me {
 *     id
 *     firstName
 *     lastName
 *     birthday {
 *       month
 *       day
 *     }
 *     friends {
 *       name
 *     }
 *   }
 * }
 *
 * REPRESENTATION:
 * This would be represented in SubGraph format pb.y, as such:
 * SubGraph [result uid = me]
 *    |
 *  Children
 *    |
 *    --> SubGraph [Attr = "xid"]
 *    --> SubGraph [Attr = "firstName"]
 *    --> SubGraph [Attr = "lastName"]
 *    --> SubGraph [Attr = "birthday"]
 *           |
 *         Children
 *           |
 *           --> SubGraph [Attr = "month"]
 *           --> SubGraph [Attr = "day"]
 *    --> SubGraph [Attr = "friends"]
 *           |
 *         Children
 *           |
 *           --> SubGraph [Attr = "name"]
 *
 * ALGORITHM:
 * This is a rough and simple algorithm of how to process this SubGraph query
 * and populate the results:
 *
 * For a given entity, a new SubGraph can be started off with NewGraph(id).
 * Given a SubGraph, is the Query field empty? [Step a]
 *   - If no, run (or send it to server serving the attribute) query
 *     and populate result.
 * Iterate over children and copy Result Uids to child Query Uids.
 *     Set Attr. Then for each child, use goroutine to run Step:a.
 * Wait for goroutines to finish.
 * Return errors, if any.
 */

在优化了一些特殊的查询之后, 就是创建一个查询任务createTaskQuery, 然后执行:

// query/query.go - ProcessQuery
taskQuery, err := createTaskQuery(ctx, sg)
if err != nil {
    rch <- err
    return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)

2.4 我是一个打工人: worker

DGraph是为分布式而生的, Dgraph Alpha 被分割到不同的组里, 所以 ProcessTaskOverNetwork 先获取当前查询所属的组ID, 然后判断是不是在当前实例上, 如果是则执行立即processTask, 否则发起 RPC 远程调用。

// worker/task.go - ProcessTaskOverNetwork
gid, err := groups().BelongsToReadOnly(attr, q.ReadTs)

// worker/task.go - ProcessTaskOverNetwork
if groups().ServesGroup(gid) {
    // No need for a network call, as this should be run from within this instance.
    return processTask(ctx, q, gid)
}

result, err := processWithBackupRequest(ctx, gid,
    func(ctx context.Context, c pb.WorkerClient) (interface{}, error) {
        return c.ServeTask(ctx, q)
    })

远程 RPC 调用到另一个Dgraph Alpha实例上 的 ServeTask 接口:

// worker/task.go - processWithBackupRequest
reply, err := invokeNetworkRequest(ctx, addrs[0], f)

// worker/task.go
func invokeNetworkRequest(ctx context.Context, addr string,
	f func(context.Context, pb.WorkerClient) (interface{}, error)) (interface{}, error) {
	pl, err := conn.GetPools().Get(addr)
	if err != nil {
		return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
	}

	if span := otrace.FromContext(ctx); span != nil {
		span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
	}
	c := pb.NewWorkerClient(pl.Get())
	return f(ctx, c)
}

最终还是回到了 processTask 函数调用。所以我们这里继续追踪 processTask 函数。
如注释所说, processTask 处理查询, 累积并返回结果,

// worker/task.go - processTask
out, err := qs.helpProcessTask(ctx, q, gid)

真正的查询还得看 helpProcessTask 函数:

  • 第一步, 获取查询函数: parseSrcFn(通过对 parseFuncType 和 parseFuncTypeHelper的调用)
  • 第二步, 调用对应的函数获取数据
// worker/task.go - helpProcessTask
srcFn, err := parseSrcFn(ctx, q)

// worker/task.go - helpProcessTask
if srcFn.fnType == hasFn && srcFn.isFuncAtRoot {
    span.Annotate(nil, "handleHasFunction")
    if err := qs.handleHasFunction(ctx, q, out, srcFn); err != nil {
        return nil, err
    }
}

2.5 最后的汇总: Filter\Groupby\Aggregation 等

数据取出, 就是对数据进行后期的处理, 如排序, 分组, 应用@filter 等。这里之所以摘出来一说, 是因为如果要对算法进行扩展。
进入 populatePostAggregation 函数。这是一个递归函数, 但它最后有调用到了 valueVarAggregation 函数:

// query/query.go - populatePostAggregation
func (sg *SubGraph) populatePostAggregation(doneVars map[string]varValue, path []*SubGraph,
	parent *SubGraph) error {
	for idx := 0; idx < len(sg.Children); idx++ {
		child := sg.Children[idx]
		path = append(path, sg)
		err := child.populatePostAggregation(doneVars, path, sg)
		path = path[:len(path)-1]
		if err != nil {
			return err
		}
	}
	return sg.valueVarAggregation(doneVars, path, parent)
}

valueVarAggregation 就是数据处理算法的’最佳’接入点了

// query/query.go - valueVarAggregation
switch {
case sg.IsGroupBy():
    if err := sg.processGroupBy(doneVars, path); err != nil {
        return err
    }
}

3. 小结

初窥 Dgraph Alpha代码, 一是为了熟悉Dgraph Alpha的运作流程, 二是为以后的自行扩展做准备, 所以略过一些具体操作细节和算法, 留待以后深入, 同时对结构体的具体作用没有深挖。