51学通信论坛2017新版

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 2600|回复: 0
打印 上一主题 下一主题

一个用Go写的集群数据分发工具

[复制链接]

 成长值: 15613

  • TA的每日心情
    开心
    2022-7-17 17:50
  • 2444

    主题

    2544

    帖子

    7万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    74104
    跳转到指定楼层
    楼主
    发表于 2017-11-15 21:41:36 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    背景
    在工作中遇到大数据(20G左右)批量部署的问题,刚开始通过scp或者rsync限速80M串行分发,大约每台机器的耗时为5分钟,极大的增加了大批量部署的时间和难度,各种难产。于是作者就花了一天多的时间做了个简易的传输工具,现在还不太成熟,希望对大家有启发,更希望大家多多提建议

    思路
    将传输完成的目标机器转换成数据源为其他机器提供数据源服务。(相当于《行尸走肉》中的僵尸一样,每一个被感染的个体都是传染源)
    语言选型
    对于数据传输来说,个人认为语言不是瓶颈而是传输的方法(串行、并行还是其他),之所以选择 go 是因为 go 可以编译后扔到机器上直接执行(Python需要安装各种依赖包增加了部署的难度)
    依赖
    由于公司机器都已安装FTP服务且已配置限速下载,所以工具本身不提供下载服务,下载限速也是写死在Client端的。
    架构
    C/S架构,即大家熟知的Client和Server结构。
    Client:等待接受Server下载任务并执行。
    Server:负责分发控制客户端的下载以及接受客户端的回传信息。
    效果
    第1次传输1台,第2次传输2台,第3次传输4台……第n次传输2(n-1)
    台,也就是说传输1+2+4=7台数据的时间相当于串行传输3台机器的时间,并且传输的数量越多,相对串行传输时间越少。
    废话不多说,直接上Server端代码。至于代码规范性,请大家忽略。代码以 GitHub 上为准。
    [root@docker100080 src]# cat p2pserver.go
    package main
    import (
    "bufio" "flag" "fmt" "github.com/gin-gonic/gin" "github.com/parnurzeal/gorequest" "io" "os" "strconv" "strings" "time"
    )
    //定义slice
    var ( SuccList[]string //成功列表 FailList[]string //失败列表 DstList []string //目的列表 //p2pConfig ListenPortstring Port *string Server *string Srcpath *string Dstpath *string Filename *string Timeout *int
    )
    var succ_ch = make(chan string, 10000)
    //读取文件,返回行 列表
    func Cat(filename string) []string { f,err := os.Open(filename) defer f.Close() if err != nil {
    panic(err) } reader:= bufio.NewReader(f)
    for{ line,err := reader.ReadString('\n') if err == io.EOF {
    break } line= strings.TrimSpace(line)
    if line != "" { DstList= append(DstList, line) } } returnDstList }
    //接收客户端返回的信息
    func accept() { router:= gin.Default() router.GET("/p2p",func(c *gin.Context) { status,_ := strconv.ParseBool(c.Query("status")) //判断客户端是否下载成功 host:= c.Query("host") src:= c.Query("src") //接受客户端返回的数据源 //fmt.Println(status,host) if status { succ_ch<- host succ_ch<- src SuccList= append(SuccList, host) //fmt.Println(SuccList,FailList) }else { fmt.Println("客户端下载失败:",host) succ_ch<- src } }) router.Run(ListenPort)
    //router.Run(":%s",Conf().Port)
    }
    //request向客户端发送下载任务
    func request(Server, port, src, dst,srcpath, dstpath string) { req:= gorequest.New() url:= fmt.Sprintf("http://%s:12306/Client?port=%s&src=%s&srcpath=%s&dstpath=%s&Server=%s&localhost=%s",dst, port, src, srcpath, dstpath, Server, dst) fmt.Println(url)
    //_,_, errs := req.Get(url).Timeout(10*time.Second).End() //设置url超时时间为10妙 _, _, errs := req.Get(url).End() //设置url超时时间为10妙 iflen(errs) != 0 { fmt.Println("请求客户端失败:",dst) succ_ch<- src } }
    //主程序执行分发控制
    func handle(dstlist []string) { index:= 0 lendst:= len(dstlist)
    for{
    select{ casesucchost := <-succ_ch: if index < lendst { request(*Server,*Port, succhost, dstlist[index], *Srcpath, *Dstpath) index= index + 1 }else { time.Sleep(time.Duration(*Timeout)* time.Second) if lendst != len(SuccList) {
    FailList= Set(dstlist, SuccList) fmt.Printf("成功机器数为%d,机器列表为,机器列表为%v\n",len(SuccList), SuccList) fmt.Printf("失败机器数为%d,机器列表为,机器列表为%v\n",len(FailList), FailList) }else { fmt.Printf("全部成功,成功机器数为%d,机器列表为,机器列表为%v\n",len(SuccList), SuccList) } fmt.Println("==============执行完毕=================" os.Exit(0) } } } }
    //初始化
    func init() {
    //解析commandline参数 flag.Usage= func() { fmt.Println("Usage:<-m host> [-f file] [-s srcpath] [-d dstpath] [-p port] [-ttimeout]") flag.PrintDefaults() } Filename= flag.String("f", "ip.txt", "File that contains thetarget machine") Srcpath= flag.String("s", "/home/xiaoju", "Data sourcepath") Dstpath= flag.String("d", "/home/xiaoju", "Data destinationpath") Port= flag.String("p", "12306", "Listen port") Server= flag.String("m", "", "ip or host name of the Server") Timeout= flag.Int("t", 1800, "If the Server does not receive the returnvalue within the specified time , that the transmission fails") flag.Parse() ListenPort= fmt.Sprintf(":%s", *Port) succ_ch<- *Server //将Server加入channel
    }
    //两个slice取差集
    func Set(one, two []string) []string { x:= []string{} if len(two) != 0 { for_, v := range one { fork, vv := range two { ifv == vv {
    break } if k == len(two)-1 { x= append(x, v) } } } }else { returnone } returnx }
    func main() {
    if*Server == "" { flag.Usage() os.Exit(2) } goaccept() handle(Cat(*Filename))}
    客户端代码,相关写死的参数可以自行修改或者作为变量传入
    服务执行完自动退出,主要目的:防止误操作,切断服务器后门。
    [root@docker100080src]# cat p2pClient.go
    /*根据公司背景客户端基于ftp下载下载限速为80M,且不能修改cut-dirs默认是2,且不能修改此服务下载完成后会自动退出*/
    package main
    import (
    "github.com/gin-gonic/gin" "github.com/parnurzeal/gorequest" "fmt" "os/exec" "sync"
    )
    var ( Url string
    Status string
    wg sync.WaitGroup)
    func main() { wg.Add(1)
    go accept() wg.Wait() request := gorequest.New() _, _,errs := request.Get(Url).End()
    if len(errs) != 0 { fmt.Println("访问Server失败") }
    }
    //接收Server请求
    func accept() { router := gin.Default() router.GET("/Client",func(c *gin.Context){ src := c.Query("src") Server:= c.Query("Server") srcpath := c.Query("srcpath") dstpath := c.Query("dstpath") localhost := c.Query("localhost") port := c.Query("port")
    go wget(src,srcpath,dstpath,Server,localhost,port) c.String(200,"客户端返回") }) router.Run(":12306")}
    func wget(src,srcpath,dstpath,Server,localhost,portstring) {
    defer wg.Done() download := fmt.Sprintf("wget -m -r -nH -P %s --limit-rate=80m--cut-dirs=2 ftp://%s%s ",dstpath,src,srcpath) fmt.Println(download) _,err :=exec.Command("sh","-c",download).CombinedOutput()
    if err != nil { Status = "false" }else { Status = "true" } fmt.Println(src,Server,dstpath,localhost) Url =fmt.Sprintf("http://%s:%s/p2p?host=%s&status=%s&src=%s",Server,port,localhost,Status,src)}
    PS0至于端口号是征用以前同事的,好记并且用的人少。相关问题欢迎留言
    PS1完整代码请关注 https://github.com/51reboot/spore

    声明:本文转载自网络。版权归原作者所有,如有侵权请联系删除。
    扫描并关注51学通信微信公众号,获取更多精彩通信课程分享。
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    Archiver|手机版|小黑屋|51学通信技术论坛

    GMT+8, 2025-1-31 13:03 , Processed in 0.086030 second(s), 32 queries .

    Powered by Discuz! X3

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表