Iawen's Blog

我喜欢这样自由的随手涂鸦, 因为我喜欢风......

Hadoop MapReduce是一个软件框架, 可以轻松地编写应用程序, 以可靠, 容错的方式在商品硬件的大型群集(数千个节点)上并行并行处理大量数据(多TB数据集)。

MapReduce 作业通常将输入数据集拆分为独立的块, 这些任务由map tasks以完全并行的方式进行处理。框架对Map的输出进行排序, 然后将其输入到Reduce任务。通常, 作业的输入和输出都存储在文件系统中。该框架负责安排任务, 监视任务并重新执行失败的任务。
今天, 出于测试和学习的原因, 我想在Go中编写一个简单的MapReduce作业。

1. 用例

我们有一个包含约18K足球运动员信息的csv文件(我从https://www.kaggle.com下载)。目标是按年龄或我们需要的任何其他属性排序。
演示的样本结构
我们仅为演示创建一个简单的结构, 即使我们在原始csv中拥有大多数键或项, 也将基于“名称”或“年龄”进行过滤/缩减。

备注: 我使用的是https://www.kaggle.com/antoinekrajnc/soccer-players-statistics

// 示例PlayerInfo结构
type PlayerInfo struct {
    // ID          string  `json:"ID"`
    // PlayerID    string  `json:"PlayerID"`
    Name        string  `json:"Name"`
    Age         int     `json:"Age"`
}

2. 开启档案

通过使用标准Golang实现打开文件。

// 打开文件
f, err := os.Open(absPath)
if err != nil {
    panic(err)
}
defer f.Close()

lines, err := csv.NewReader(f).ReadAll()
if err != nil {
    panic(err)
}

3. 建立频道

不要通过共享内存进行通信; 而是通过通信共享内存。

该频道是Go的美丽优势之一。

// 实施渠道
lists := make(chan []PlayerInfo)

finalValue := make(chan []PlayerInfo)

var wg sync.WaitGroup
wg.Add(len(lines))

4. 映射器实现

此处的Mapper函数用于确保将列表变平并在我们要处理struct时返回

// Map -
func Map(player []string) []PlayerInfo {
	list := []PlayerInfo{}
	age, _ := strconv.Atoi(player[14])
	list = append(list, PlayerInfo{
		// ID:   player[1],
		Name: player[0],
		Age:  age,
	})
	return list
}

5. 减速器实施

此实现是要添加的过滤器。在此示例中, 我们减少玩家的年龄, 使其不超过20岁。

// Reducer -
func Reducer(mapList chan []PlayerInfo, sendFianlValue chan []PlayerInfo) {
	final := []PlayerInfo{}
	for list := range mapList {
		for _, value := range list {
			if value.Age <= 20 {
				final = append(final, value)
			}
		}
	}
	sendFianlValue <- final
}

6. 使用goroutine调用mapper和reducer

var wg sync.WaitGroup
wg.Add(len(lines))
beginTime := time.Now()
for _, line := range lines {
    go func(player []string) {
        defer wg.Done()
        lists <- Map(player)
    }(line)
}

go Reducer(lists, finalValue)
wg.Wait()
close(lists)

fmt.Println(<-finalValue)
fmt.Println("\n<<<<<<<<<<<<<<<<", time.Now().Sub(beginTime))

7. 结果

因为使用的数据集不一样, 所以代码与原文略有不同

0

8. 摘要

这只是一个简单的示例, 演示了如何在Go中实现MapReduce。而且我没有针对任何生产用例优化代码。这意味着我们缺少单元测试, 基准测试等。

原文转自: https://medium.com/@jayhuang75/a-simple-mapreduce-in-go-42c929b000c5