|
背景
在工作中遇到大数据(20G左右)批量部署的问题,刚开始通过scp或者rsync限速80M串行分发,大约每台机器的耗时为5分钟,极大的增加了大批量部署的时间和难度,各种难产。于是作者就花了一天多的时间做了个简易的传输工具,现在还不太成熟,希望对大家有启发,更希望大家多多提建议
。
思路
将传输完成的目标机器转换成数据源为其他机器提供数据源服务。(相当于《行尸走肉》中的僵尸一样,每一个被感染的个体都是传染源)
语言选型
对于数据传输来说,个人认为语言不是瓶颈而是传输的方法(串行、并行还是其他),之所以选择 go 是因为 go 可以编译后扔到机器上直接执行(Python需要安装各种依赖包增加了部署的难度)
依赖
由于公司机器都已安装FTP服务且已配置限速下载,所以工具本身不提供下载服务,下载限速也是写死在Client端的。
架构
C/S架构,即大家熟知的Client和Server结构。
Client:等待接受Server下载任务并执行。
Server:负责分发控制客户端的下载以及接受客户端的回传信息。
效果
第1次传输1台,第2次传输2台,第3次传输4台……第n次传输2(n-1)
台,也就是说传输1+2+4=7台数据的时间相当于串行传输3台机器的时间,并且传输的数量越多,相对串行传输时间越少。
废话不多说,直接上Server端代码。至于代码规范性,请大家忽略。代码以 GitHub 上为准。
[root@docker100080 src]# cat p2pserver.go
package main
import (
"bufio" "flag" "fmt" "github.com/gin-gonic/gin" "github.com/parnurzeal/gorequest" "io" "os" "strconv" "strings" "time"
)
//定义slice
var ( SuccList[]string //成功列表 FailList[]string //失败列表 DstList []string //目的列表 //p2pConfig ListenPortstring Port *string Server *string Srcpath *string Dstpath *string Filename *string Timeout *int
)
var succ_ch = make(chan string, 10000)
//读取文件,返回行 列表
func Cat(filename string) []string { f,err := os.Open(filename) defer f.Close() if err != nil {
panic(err) } reader:= bufio.NewReader(f)
for{ line,err := reader.ReadString('\n') if err == io.EOF {
break } line= strings.TrimSpace(line)
if line != "" { DstList= append(DstList, line) } } returnDstList }
//接收客户端返回的信息
func accept() { router:= gin.Default() router.GET("/p2p",func(c *gin.Context) { status,_ := strconv.ParseBool(c.Query("status")) //判断客户端是否下载成功 host:= c.Query("host") src:= c.Query("src") //接受客户端返回的数据源 //fmt.Println(status,host) if status { succ_ch<- host succ_ch<- src SuccList= append(SuccList, host) //fmt.Println(SuccList,FailList) }else { fmt.Println("客户端下载失败:",host) succ_ch<- src } }) router.Run(ListenPort)
//router.Run(":%s",Conf().Port)
}
//request向客户端发送下载任务
func request(Server, port, src, dst,srcpath, dstpath string) { req:= gorequest.New() url:= fmt.Sprintf("http://%s:12306/Client?port=%s&src=%s&srcpath=%s&dstpath=%s&Server=%s&localhost=%s",dst, port, src, srcpath, dstpath, Server, dst) fmt.Println(url)
//_,_, errs := req.Get(url).Timeout(10*time.Second).End() //设置url超时时间为10妙 _, _, errs := req.Get(url).End() //设置url超时时间为10妙 iflen(errs) != 0 { fmt.Println("请求客户端失败:",dst) succ_ch<- src } }
//主程序执行分发控制
func handle(dstlist []string) { index:= 0 lendst:= len(dstlist)
for{
select{ casesucchost := <-succ_ch: if index < lendst { request(*Server,*Port, succhost, dstlist[index], *Srcpath, *Dstpath) index= index + 1 }else { time.Sleep(time.Duration(*Timeout)* time.Second) if lendst != len(SuccList) {
FailList= Set(dstlist, SuccList) fmt.Printf("成功机器数为%d,机器列表为,机器列表为%v\n",len(SuccList), SuccList) fmt.Printf("失败机器数为%d,机器列表为,机器列表为%v\n",len(FailList), FailList) }else { fmt.Printf("全部成功,成功机器数为%d,机器列表为,机器列表为%v\n",len(SuccList), SuccList) } fmt.Println("==============执行完毕=================" os.Exit(0) } } } }
//初始化
func init() {
//解析commandline参数 flag.Usage= func() { fmt.Println("Usage:<-m host> [-f file] [-s srcpath] [-d dstpath] [-p port] [-ttimeout]") flag.PrintDefaults() } Filename= flag.String("f", "ip.txt", "File that contains thetarget machine") Srcpath= flag.String("s", "/home/xiaoju", "Data sourcepath") Dstpath= flag.String("d", "/home/xiaoju", "Data destinationpath") Port= flag.String("p", "12306", "Listen port") Server= flag.String("m", "", "ip or host name of the Server") Timeout= flag.Int("t", 1800, "If the Server does not receive the returnvalue within the specified time , that the transmission fails") flag.Parse() ListenPort= fmt.Sprintf(":%s", *Port) succ_ch<- *Server //将Server加入channel
}
//两个slice取差集
func Set(one, two []string) []string { x:= []string{} if len(two) != 0 { for_, v := range one { fork, vv := range two { ifv == vv {
break } if k == len(two)-1 { x= append(x, v) } } } }else { returnone } returnx }
func main() {
if*Server == "" { flag.Usage() os.Exit(2) } goaccept() handle(Cat(*Filename))}
客户端代码,相关写死的参数可以自行修改或者作为变量传入
服务执行完自动退出,主要目的:防止误操作,切断服务器后门。
[root@docker100080src]# cat p2pClient.go
/*根据公司背景客户端基于ftp下载下载限速为80M,且不能修改cut-dirs默认是2,且不能修改此服务下载完成后会自动退出*/
package main
import (
"github.com/gin-gonic/gin" "github.com/parnurzeal/gorequest" "fmt" "os/exec" "sync"
)
var ( Url string
Status string
wg sync.WaitGroup)
func main() { wg.Add(1)
go accept() wg.Wait() request := gorequest.New() _, _,errs := request.Get(Url).End()
if len(errs) != 0 { fmt.Println("访问Server失败") }
}
//接收Server请求
func accept() { router := gin.Default() router.GET("/Client",func(c *gin.Context){ src := c.Query("src") Server:= c.Query("Server") srcpath := c.Query("srcpath") dstpath := c.Query("dstpath") localhost := c.Query("localhost") port := c.Query("port")
go wget(src,srcpath,dstpath,Server,localhost,port) c.String(200,"客户端返回") }) router.Run(":12306")}
func wget(src,srcpath,dstpath,Server,localhost,portstring) {
defer wg.Done() download := fmt.Sprintf("wget -m -r -nH -P %s --limit-rate=80m--cut-dirs=2 ftp://%s%s ",dstpath,src,srcpath) fmt.Println(download) _,err :=exec.Command("sh","-c",download).CombinedOutput()
if err != nil { Status = "false" }else { Status = "true" } fmt.Println(src,Server,dstpath,localhost) Url =fmt.Sprintf("http://%s:%s/p2p?host=%s&status=%s&src=%s",Server,port,localhost,Status,src)}
PS0:至于端口号是征用以前同事的,好记并且用的人少。相关问题欢迎留言
PS1:完整代码请关注 https://github.com/51reboot/spore
声明:本文转载自网络。版权归原作者所有,如有侵权请联系删除。 |
扫描并关注51学通信微信公众号,获取更多精彩通信课程分享。
|