DGraph 是由前 Google 员工 Manish Rai Jain 离职创业后, 在 2016 年推出的图数据库产品, 基于 Go 语言编写, 底层数据模型是 RDF, 存储引擎基于 BadgerDB 改造, 使用 RAFT 保证数据读写的强一致性。基本介绍和使用请移步: Dgraph 入门与学习, 本篇主要从源码的角度来分析一下Dgraph Query 的流程。
先来一张大致的 Dgraph Query 函数调用堆栈图:
1. 一些主要数据结构
Dgraph Query 中用到的主要数据结构, 我也大致罗列一下, 主要有一下几个:
- gql.GraphQuery
- api.Request
- gql.Result: 结构包含查询列表、其对应的变量使用列表和变异块
- query.Request: 包装执行查询时使用的状态。最初设置了 ReadTs、Cache 和 GqlQuery。处理查询时填充子图、变量和延迟
- edgraph.queryContext: 查询上下文, 用于传递处理查询、突变或 upsert 请求所需的所有变量
- query.SubGraph
- worker.functionContext
1.1 gql.GraphQuery
// gql.GraphQuery - 以树格式存储解析的查询。在处理查询之前, query.SubGraph将转换为 pb.y used
type GraphQuery struct {
UID []uint64
Attr string
Langs []string
Alias string
IsCount bool
IsInternal bool
IsGroupby bool
Var string
NeedsVar []VarContext
Func *Function
Expand string // Which variable to expand with.
Args map[string]string
// Query can have multiple sort parameters.
Order []*pb.Order
Children []*GraphQuery
Filter *FilterTree
MathExp *MathTree
Normalize bool
Recurse bool
RecurseArgs RecurseArgs
ShortestPathArgs ShortestPathArgs
Cascade []string
IgnoreReflex bool
Facets *pb.FacetParams
FacetsFilter *FilterTree
GroupbyAttrs []GroupByAttr
FacetVar map[string]string
FacetsOrder []*FacetOrder
// 用于启用 ACL 的查询以将结果缩减为仅可访问的参数
AllowedPreds []string
// 下面的内部字段.
// 如果 gq.fragment 是非空的, 那么它是一个片段引用/传播
fragment string
// True for blocks that don't have a starting function and hence no starting nodes. They are
// used to aggregate and get variables defined in another block.
// 对于没有起始功能且因此没有起始节点的块是正确的。 它们用于聚合和获取在另一个块中定义的变量。
IsEmpty bool
}
1.2 api.Request
// api.Request -
type Request struct {
StartTs uint64 `protobuf:"varint,1,opt,name=start_ts,json=startTs,proto3" json:"start_ts,omitempty"`
Query string `protobuf:"bytes,4,opt,name=query,proto3" json:"query,omitempty"`
Vars map[string]string `protobuf:"bytes,5,rep,name=vars,proto3" json:"vars,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
ReadOnly bool `protobuf:"varint,6,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"`
BestEffort bool `protobuf:"varint,7,opt,name=best_effort,json=bestEffort,proto3" json:"best_effort,omitempty"`
Mutations []*Mutation `protobuf:"bytes,12,rep,name=mutations,proto3" json:"mutations,omitempty"`
CommitNow bool `protobuf:"varint,13,opt,name=commit_now,json=commitNow,proto3" json:"commit_now,omitempty"`
RespFormat Request_RespFormat `protobuf:"varint,14,opt,name=resp_format,json=respFormat,proto3,enum=api.Request_RespFormat" json:"resp_format,omitempty"`
Hash string `protobuf:"bytes,15,opt,name=hash,proto3" json:"hash,omitempty"`
}
1.3 gql.Result
// gql.Result - 结构包含查询列表、其对应的变量使用列表和变异块
type Result struct {
Query []*GraphQuery
QueryVars []*Vars
Schema *pb.SchemaRequest
}
1.4 query.Request
// query.Request - 包装执行查询时使用的状态。最初设置了ReadTs、Cache 和 GqlQuery。处理查询时填充子图、变量和延迟
type Request struct {
ReadTs uint64 // ReadTs for the transaction.
Cache int // 0 represents use txn cache, 1 represents not to use cache.
Latency *Latency
GqlQuery *gql.Result
Subgraphs []*SubGraph
Vars map[string]varValue
}
1.5 edgraph.queryContext
// edgraph.queryContext -
type queryContext struct {
// 尚未解析的请求, 包含一个查询或多个突变或两者都有(在 upsert 的情况下)
req *api.Request
// 解析 req.Mutations 后的突变列表
gmuList []*gql.Mutation
// 解析 req.Query 后的结果
gqlRes gql.Result
// 查询(修改后的)中使用的条件变量, 用于确定 Upsert 中的条件是否为真。如果相应的突变不是条件 upsert, 则该字符串将为空。 请注意, len(condVars) == len(gmuList)
condVars []string
// 存储从变量名称到 UID 变量的 UID 的映射。 这些变量要么是用于 Upsert 条件的虚拟变量, 要么是用于传入请求的突变块中的变量
uidRes map[string][]string
// 存储 mutation 请求块中使用的变量(从变量名到值)的映射
valRes map[string]map[uint64]types.Val
// 存储延迟数
latency *query.Latency
// 存储在整个查询处理过程中使用的 opencensus span
span *trace.Span
// 指示给定的请求是否来自 graphql 管理员。
graphql bool
// 存储正在处理查询的 GraphQL 字段。仅当请求是来自 GraphQL 层的查询时才会设置, 否则将为 nil。例如 nil 案例: 在 DQL 查询的情况下, 从 GraphQL 层执行突变
gqlField gqlSchema.Field
// 维护将作为此请求的一部分插入的 nquad 数量。在某些情况下(主要是 upserts), 要插入的 nquad 数量可能会很大(我们已经看到高达 1B)并导致 OOM。我们限制了可以在单个请求中插入的 nquad 数量。
nquadsCount int
}
1.6 query.SubGraph
// query.SubGraph
type SubGraph struct {
ReadTs uint64
Cache int
Attr string
UnknownAttr bool
// read only parameters which are populated before the execution of the query and are used to
// execute this query.
Params params
// count stores the count of an edge (predicate). There would be one value corresponding to each
// uid in SrcUIDs.
counts []uint32
// valueMatrix is a slice of ValueList. If this SubGraph is for a scalar predicate type, then
// there would be one list for each uid in SrcUIDs storing the value of the predicate.
// The individual elements of the slice are a ValueList because we support scalar predicates
// of list type. For non-list type scalar predicates, there would be only one value in every
// ValueList.
valueMatrix []*pb.ValueList
// uidMatrix is a slice of List. There would be one List corresponding to each uid in SrcUIDs.
// In graph terms, a list is a slice of outgoing edges from a node.
uidMatrix []*pb.List
// facetsMatrix contains the facet values. There would a list corresponding to each uid in
// uidMatrix.
facetsMatrix []*pb.FacetsList
ExpandPreds []*pb.ValueList
GroupbyRes []*groupResults // one result for each uid list.
LangTags []*pb.LangList
// SrcUIDs is a list of unique source UIDs. They are always copies of destUIDs
// of parent nodes in GraphQL structure.
SrcUIDs *pb.List
// SrcFunc specified using func. Should only be non-nil at root. At other levels,
// filters are used.
SrcFunc *Function
FilterOp string
Filters []*SubGraph // List of filters specified at the current node.
facetsFilter *pb.FilterTree
MathExp *mathTree
Children []*SubGraph // children of the current node, should be empty for leaf nodes.
// destUIDs is a list of destination UIDs, after applying filters, pagination.
DestUIDs *pb.List
List bool // whether predicate is of list type
pathMeta *pathMetadata
}
1.7 worker.functionContext
// worker.functionContext -
type functionContext struct {
tokens []string
geoQuery *types.GeoQueryData
intersectDest bool
// 供compareAttr 函数使用 eqTokens。它存储与每个函数参数对应的值。`eq` 函数可能有多个参数, 但其他 compareAttr 函数只有一个参数。
// TODO(@Animesh): 更改可以更好地解释其用途的字段名称。检查我们是否真的需要所有 ineqValue、eqTokens、tokens
eqTokens []types.Val
ineqValueToken []string
n int
threshold []int64
uidsPresent []uint64
fname string
fnType FuncType
regex *cregexp.Regexp
isFuncAtRoot bool
isStringFn bool
atype types.TypeID
}
func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) error {}
2. 流程说明
2.1 服务初始化与监听
dgraph/cmd/alpha 目录是Dgraph alpha 的程序入口, 主要复制程序的初始化以及各种服务的监听设置, 如图:
而在http 处理程序, 主要就是封装一个api.Request结构体, 然后调用服务的提供者edgraph.Server
就是:
// dgraph/cmd/alpha/http.go - queryHandler
req := api.Request{
Vars: params.Variables,
Query: params.Query,
StartTs: startTs,
Hash: hash,
}
// dgraph/cmd/alpha/http.go - queryHandler
resp, err := (&edgraph.Server{}).Query(ctx, &req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
return
}
留意这里的edgraph.Server, 其实也是 RPC 服务的提供者:
// dgraph/cmd/alpha/run.go - serveGRPC
api.RegisterDgraphServer(s, &edgraph.Server{})
2.2 服务的提供者: edgraph.Server
首先, Query 函数是对外提供的接口, 应该只是简单的预处理了一些前置条件。然后真正的操作还是调用 doQuery 函数:
// edgraph/server.go - Query
return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)})
在 doQuery 函数里, 我们留意到结构体: queryContext, 这个结构体我们上面已经提到, 它是查询上下文, 用于传递处理查询、突变或 upsert 请求所需的所有变量:
// edgraph/server.go - doQuery
qc := &queryContext{
req: req.req,
latency: l,
span: span,
graphql: isGraphQL,
gqlField: req.gqlField,
}
// parseRequest 里又转而调用了 validateQuery 函数
if rerr = parseRequest(qc); rerr != nil {
return
}
这些还都只是在查询上下文的分析和验证(parseRequest, validateQuery)上下功夫, 进一步完善和填充queryContext结构体:
// edgraph/server.go - Query
if resp, rerr = processQuery(ctx, qc); rerr != nil {
// if rerr is just some error from GraphQL encoding, then we need to continue the normal
// execution ignoring the error as we still need to assign latency info to resp. If we can
// change the api.Response proto to have a field to contain GraphQL errors, that would be
// great. Otherwise, we will have to do such checks a lot and that would make code ugly.
if qc.gqlField != nil && x.IsGqlErrorList(rerr) {
gqlErrs = rerr
} else {
return
}
}
来到processQuery, 先是构造 query.Request 结构, 逐步填充这个结构, 然后调用实际业务逻辑的实施者query.Request
:
// edgraph/server.go - processQuery
qr := query.Request{
Latency: qc.latency,
GqlQuery: &qc.gqlRes,
}
// Core processing happens here.
er, err := qr.Process(ctx)
2.3 服务的实施者: query.Request
query.Request 函数的开头就是对ProcessQuery的调用:
// query/query.go - Process
err = req.ProcessQuery(ctx)
在 ProcessQuery 函数里有连个遍历, 一个是完成了 query.Request.GqlQuery.Query 到 query.SubGraph 的转换, 一个则是查询的实施:
// query/query.go - ProcessQuery
sg, err := ToSubGraph(ctx, gq)
// query/query.go - ToSubGraph
func ToSubGraph(ctx context.Context, gq *gql.GraphQuery) (*SubGraph, error) {
sg, err := newGraph(ctx, gq)
if err != nil {
return nil, err
}
err = treeCopy(gq, sg)
if err != nil {
return nil, err
}
return sg, err
}
// query/query.go - ProcessQuery
switch {
case sg.Params.Alias == "shortest":
// We allow only one shortest path block per query.
go func() {
shortestSg, err = shortestPath(ctx, sg)
errChan <- err
}()
case sg.Params.Recurse:
go func() {
errChan <- recurse(ctx, sg)
}()
default:
go ProcessGraph(ctx, sg, nil, errChan)
}
跟踪发现, shortestPath 和 recurse 里最终都会调用到ProcessGraph
。 接下来就是使用通道阻塞, 等待 ProcessGraph 的处理结果。
query/query.go 文件头部有一段注释很能说明 ProcessGraph 的处理方式:
/*
* QUERY:
* Let's take this query from GraphQL as example:
* {
* me {
* id
* firstName
* lastName
* birthday {
* month
* day
* }
* friends {
* name
* }
* }
* }
*
* REPRESENTATION:
* This would be represented in SubGraph format pb.y, as such:
* SubGraph [result uid = me]
* |
* Children
* |
* --> SubGraph [Attr = "xid"]
* --> SubGraph [Attr = "firstName"]
* --> SubGraph [Attr = "lastName"]
* --> SubGraph [Attr = "birthday"]
* |
* Children
* |
* --> SubGraph [Attr = "month"]
* --> SubGraph [Attr = "day"]
* --> SubGraph [Attr = "friends"]
* |
* Children
* |
* --> SubGraph [Attr = "name"]
*
* ALGORITHM:
* This is a rough and simple algorithm of how to process this SubGraph query
* and populate the results:
*
* For a given entity, a new SubGraph can be started off with NewGraph(id).
* Given a SubGraph, is the Query field empty? [Step a]
* - If no, run (or send it to server serving the attribute) query
* and populate result.
* Iterate over children and copy Result Uids to child Query Uids.
* Set Attr. Then for each child, use goroutine to run Step:a.
* Wait for goroutines to finish.
* Return errors, if any.
*/
在优化了一些特殊的查询之后, 就是创建一个查询任务createTaskQuery
, 然后执行:
// query/query.go - ProcessQuery
taskQuery, err := createTaskQuery(ctx, sg)
if err != nil {
rch <- err
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
2.4 我是一个打工人: worker
DGraph是为分布式而生的, Dgraph Alpha 被分割到不同的组里, 所以 ProcessTaskOverNetwork 先获取当前查询所属的组ID, 然后判断是不是在当前实例上, 如果是则执行立即processTask, 否则发起 RPC 远程调用。
// worker/task.go - ProcessTaskOverNetwork
gid, err := groups().BelongsToReadOnly(attr, q.ReadTs)
// worker/task.go - ProcessTaskOverNetwork
if groups().ServesGroup(gid) {
// No need for a network call, as this should be run from within this instance.
return processTask(ctx, q, gid)
}
result, err := processWithBackupRequest(ctx, gid,
func(ctx context.Context, c pb.WorkerClient) (interface{}, error) {
return c.ServeTask(ctx, q)
})
远程 RPC 调用到另一个Dgraph Alpha实例上 的 ServeTask 接口:
// worker/task.go - processWithBackupRequest
reply, err := invokeNetworkRequest(ctx, addrs[0], f)
// worker/task.go
func invokeNetworkRequest(ctx context.Context, addr string,
f func(context.Context, pb.WorkerClient) (interface{}, error)) (interface{}, error) {
pl, err := conn.GetPools().Get(addr)
if err != nil {
return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
}
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
}
c := pb.NewWorkerClient(pl.Get())
return f(ctx, c)
}
最终还是回到了 processTask 函数调用。所以我们这里继续追踪 processTask 函数。
如注释所说, processTask 处理查询, 累积并返回结果,
// worker/task.go - processTask
out, err := qs.helpProcessTask(ctx, q, gid)
真正的查询还得看 helpProcessTask 函数:
- 第一步, 获取查询函数: parseSrcFn(通过对 parseFuncType 和 parseFuncTypeHelper的调用)
- 第二步, 调用对应的函数获取数据
// worker/task.go - helpProcessTask
srcFn, err := parseSrcFn(ctx, q)
// worker/task.go - helpProcessTask
if srcFn.fnType == hasFn && srcFn.isFuncAtRoot {
span.Annotate(nil, "handleHasFunction")
if err := qs.handleHasFunction(ctx, q, out, srcFn); err != nil {
return nil, err
}
}
2.5 最后的汇总: Filter\Groupby\Aggregation 等
数据取出, 就是对数据进行后期的处理, 如排序, 分组, 应用@filter 等。这里之所以摘出来一说, 是因为如果要对算法进行扩展。
进入 populatePostAggregation 函数。这是一个递归函数, 但它最后有调用到了 valueVarAggregation 函数:
// query/query.go - populatePostAggregation
func (sg *SubGraph) populatePostAggregation(doneVars map[string]varValue, path []*SubGraph,
parent *SubGraph) error {
for idx := 0; idx < len(sg.Children); idx++ {
child := sg.Children[idx]
path = append(path, sg)
err := child.populatePostAggregation(doneVars, path, sg)
path = path[:len(path)-1]
if err != nil {
return err
}
}
return sg.valueVarAggregation(doneVars, path, parent)
}
valueVarAggregation 就是数据处理算法的’最佳’接入点了
// query/query.go - valueVarAggregation
switch {
case sg.IsGroupBy():
if err := sg.processGroupBy(doneVars, path); err != nil {
return err
}
}
3. 小结
初窥 Dgraph Alpha代码, 一是为了熟悉Dgraph Alpha的运作流程, 二是为以后的自行扩展做准备, 所以略过一些具体操作细节和算法, 留待以后深入, 同时对结构体的具体作用没有深挖。