package handle import ( "bufio" "bytes" "context" "crypto/md5" "encoding/json" "errors" "fmt" "fts/config" "fts/etcdclient" "fts/nsqclient" "github.com/gorilla/websocket" _ "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api" "io" "io/ioutil" "log" "net/http" "os" "os/exec" "path" "path/filepath" "strconv" "strings" "time" ) //登陆账号 var gobalLoginUserName string //登陆账号Id var gobalLoginUserId string //key:filepath,value:hash var gobalFileMap = make(map[string] string) var gobalFileDownLoadingMap = make(map[string] int) //手动上传文件,非自动上传 var goabalAddFileMap = make(map[string] int) //本地项目空间目录 var gobalLocalProjectDir string var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关 //全局消息通知管道 var GobalMessageNotify = make(chan string,1000) var ipfsPath=os.Getenv("IPFS-PATH") /** 文件上传下载进度 */ type processStruct struct { Size string `json:"size"` CurrentSize string `json:"currentSize"` Unit string `json:"unit"` CurrentUnit string `json:"currentUnit"` Process float64 `json:"process"` Hash string `json:"hash"` CommitHistoryHash string `json:"commitHistoryHash"` Version int `json:"version"` } /** 初始化本地客户端配置,包括ipfs网关、引导节点 @param ipfsApi ipfs网关 例如:http://192.168.1.1:5001 @param ipfsBootstrap ipfs引导节点,多个用;分割 例如:/dns/www.lockingos.org/tcp/4001/p2p/12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP */ func InitClientConfig(ipfsApi,ipfsBootstrap string) error{ //空格路径处理 ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe" config.ServerIpfsUrl = ipfsApi log.Println("配置客户端网关:"+config.ServerIpfsUrl) ipfsBootstraps := strings.Split(ipfsBootstrap,";") for _, simpleBootstrap := range ipfsBootstraps { log.Println("配置引导节点:"+simpleBootstrap) cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap) err :=cmd.Run() if err!=nil{ return err } } return nil } /** 初始化本地工作目录 @param userName 用户登陆账号 @param userId 用户ID @param projectName 项目名称 */ func InitLocalWorkSpace(conn *websocket.Conn, userName, userId, projectName string) (error){ //空格路径处理 ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe" //初始化当前登陆用户信息 gobalLoginUserName = userName gobalLoginUserId = userId //初始化本地工作空间绝对路径 gobalLocalProjectDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName) // 检查本地目录是否存在 _,err := os.Stat(gobalLocalProjectDir) if err != nil { //创建文件目录 os.MkdirAll(gobalLocalProjectDir, os.ModePerm) } log.Println("进入项目空间:"+gobalLocalProjectDir) if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(gobalLocalProjectDir))); err != nil { log.Println(err) return err } return nil } //工作空间增加文件监听事件 func watchWalkfunc(filePath string, info os.FileInfo, err error) error { if info == nil{ return nil } if info.IsDir()==true{ //config.GobalWatch.Remove(filePath) err = config.GobalWatch.Add(filePath) if err != nil { log.Println(err) return err } } return nil } /** 下载指令 @param hash ipfs哈希值 @param projectName 项目名称 @para fileName 文件名称 @param dir 云文件目录 */ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir string) error{ //检查文件目录是否存在,不存在则创建 fileDir := gobalLocalProjectDir+"\\"+nodeDir _,err := os.Stat(fileDir) if err != nil { //创建文件目录 err = os.MkdirAll(fileDir, os.ModePerm) if err!=nil{ log.Println(err) return err } } //下载启动标识,有下载进度则设置为true var downloading bool = false //检测文件打开状态 tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1) if err != nil && (!os.IsNotExist(err)) { log.Println("文件被占用,请关闭打开的软件") if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { return err } return err } defer tfile.Close() //正在下载标识,标识用于不监测更新改动 gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1 //构建本地cmd执行 ipfs get progress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(fileDir+"\\"+fileName)) stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress) }() log.Println("资源连接中...") //异步定时读取进度反馈给前端,每500ms返回一次进度 go func(){ millSeconds := time.Now().UnixNano() / 1e6 for content := range progress { // 通道关闭后会退出for range循环 log.Println(">>>"+content) current :=time.Now().UnixNano() / 1e6 if !downloading{ log.Println("资源连接成功,下载中...") } downloading = true if current-millSeconds>500{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Printf("json.Marshal error %s\n", err) } //设置下载启动标识和下载时间戳 millSeconds = current //反馈前端 if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { log.Println(err) break } } if strings.Index(content,"100.00%")!=-1{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Printf("json.Marshal error %s\n", err) } if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { log.Println(err) } break } } }() //设置30秒连接超时,30秒未启动下载则下载失败 go func() { index :=0 for true{ //启动下载则不做超时判断 if downloading==true{ return } index++ time.Sleep(time.Duration(1)*time.Second) if downloading==false && index==30{ err = cmd.Process.Kill() log.Println("资源连接超时(30s),下载进程被终止") if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return } return } } }() //等待下载执行完成 err = cmd.Wait() if err != nil { log.Printf("cmd.Run() failed with %s\n", err) } if errStdout != nil || errStderr != nil { log.Printf("failed to capture stdout or stderr\n") } outStr := string(stdout) log.Printf("out:%s", outStr) //更新Etcd数据库的文件key对应hash值 time.Sleep(200*time.Millisecond) key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName err = etcdclient.ReplaceInto(key,hash+";0") if err != nil { log.Println(err) return err } //发送消息至文件变更订阅 config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";" //下载完成反馈 log.Printf("叮,资源文件[ %v ]下载完成",fileName) defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0 defer close(progress) return nil } //读取解析cmd返回文件上传或下载进度信息 func contentToJSONByte(content string) ([]byte,error){ sts :=strings.Split(content," ") if len(sts)<8{ log.Println("字符长度小于8") return nil,nil } var processFloat float64 if (len(sts)==9 || len(sts)==8){ processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64) }else{ processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64) } if processFloat==0{ //log.Println("当前进度0") return nil,nil } pro :=&processStruct{ Size:sts[4], CurrentSize: sts[1], Unit: sts[2], CurrentUnit: sts[5], Process: processFloat, Hash: "", } projson,err :=json.Marshal(pro) return projson,err } /** 上传本地文件 @param absolutePath 文件本地绝对路径 @param fileName 文件名称 @param projectName 项目名称 @param dir 云文件目录 @param currentHistoryHash 当前文件的历史版本管理文件hash @param note 备注 @param creator 创建人 @param milestone 是否事里程碑 */ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{ //本地文件目录 fileDir := gobalLocalProjectDir+"\\"+dir //检查目录 _,err := os.Stat(fileDir) if err != nil { //创建文件目录 err = os.MkdirAll(fileDir, os.ModePerm) if err!=nil{ return err } } //检测文件打开状态 tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1) if err != nil { log.Println("文件被占用,请关闭打开的软件") if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { return err } return err } defer tfile.Close() serverSh := shell.NewShell(config.ServerIpfsUrl) //serverSh.SetTimeout(time.Duration(30)*time.Second) //log.Println("检测引导节点存活情况"+config.ServerIpfsUrl) //检测引导节点是否连接成功 isUp := serverSh.IsUp() if !isUp { log.Println("备份节点网络连接不通!") if conn==nil{ return errors.New("备份节点连失联") }else{ if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } } return nil } //上传启动标识,有上传进度则设置为true var uploading bool=false //cmd执行ipfs add cmd := exec.Command(ipfsPath, "add",absolutePath) uploadProgress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress) }() //异步给前端反馈上传进度 go func(){ first := true millSeconds := time.Now().UnixNano() / 1e6 current :=time.Now().UnixNano() / 1e6 for content := range uploadProgress { // 通道关闭后会退出for range循环 current =time.Now().UnixNano() / 1e6 if first { projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Println("json.Marshal error %s\n", err) } //设置上传启动标识为true uploading=true if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { break } } millSeconds = current first=false } if current-millSeconds>500{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Println("json.Marshal error %s\n", err) } uploading=true if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { break } } millSeconds = current } if (strings.Index(content,"90.00%")!=-1){ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Println("json.Marshal error %s\n", err) } uploading=true if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { break } } break } } }() log.Println("资源上传中...") //上传未启动超时时间30s go func() { index :=0 for true{ if uploading==true{ return } index++ time.Sleep(time.Duration(1)*time.Second) if uploading==false && index==30{ err = cmd.Process.Kill() log.Println("资源连接超时(30s),上传进程被终止") if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return } } return } } }() //等待执行完成 err = cmd.Wait() if err != nil { log.Println("cmd.Run() failed with %s\n", err) if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } return err } if errStdout != nil || errStderr != nil { log.Println("failed to capture stdout or stderr\n") } outStr := string(stdout) fileHash := strings.Split(outStr," ")[1] log.Printf("out:%s", outStr) defer close(uploadProgress) //ipfs provide cmd = exec.Command(ipfsPath,"dht","provide",fileHash) err = cmd.Run() if err != nil { log.Println(err) if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } } return err } //判断备份节点对等节点是否已添加连接 sh := shell.NewShell(config.GobalIpfsUrl) idOut,err :=sh.ID() localId :=idOut.ID swarmConnInfos,err :=serverSh.SwarmPeers(context.Background()) hasConnect := false for _,swarmconn := range swarmConnInfos.Peers { if swarmconn.Peer==localId{ hasConnect=true break } } if !hasConnect{ log.Println("中继处理") swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId errT := serverSh.SwarmConnect(context.Background(),swarmConnectAddr) if errT!=nil{ log.Println("中继失败,引导节点备份失败") log.Println(err) if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } } } //后台备份 go func() { err = serverSh.Pin(fileHash) if err!=nil{ log.Printf("资源[ %v ]备份节点备份失败", fileName) log.Println(err) } log.Printf("资源[ %v ]备份节点备份成功", fileName) }() //文件不存在则进行本地文件夹拷贝 if !fileExist(fmt.Sprint((fileDir+"\\"+fileName))) { //记录新增文件,新增文件不做post请求,前端自行post goabalAddFileMap[fileName] = 1 err = sh.Get(fileHash,fmt.Sprint((fileDir+"\\"+fileName))) if err != nil { log.Println(err) return err } } //构建历史版本记录、写本地历史版本管理文件,上传至ipfs filenameall := path.Base(fileName) filesuffix := path.Ext(fileName) fileprefix := filenameall[0:len(filenameall) - len(filesuffix)] commitFilePath := fileDir+"\\"+fileprefix+".commit" commitVersion,commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,fileHash,note,creator,milestone) if err != nil { log.Printf("资源[ %v ]历史版本记录失败", fileName) log.Println(err) if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } } return err } //读取文件属性,构建100%进度对象 objectStat,err :=sh.ObjectStat(fileHash) if err != nil { log.Println(err) return err } prog := new(processStruct) prog.Hash=fileHash prog.Process=100.00 prog.Size=strconv.Itoa(objectStat.CumulativeSize) prog.CommitHistoryHash=commitHistoryHash prog.Version=commitVersion projson,err :=json.Marshal(prog) if conn!=nil{ if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { log.Println(err) return err } } //更新Etcd数据库hash key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName err = etcdclient.ReplaceInto(key,prog.Hash+";0") if err != nil { log.Println(err) return err } //自动协同逻辑 //if conn==nil{ // folderName := strings.Split(dir,"\\")[0] // var relativePath string // if len(strings.Split(dir, "\\"))==1{ // relativePath = "" // }else{ // relativePath = strings.Replace(relativePath, folderName+"\\","",1) // } // size,_ := strconv.ParseInt(prog.Size,10,64) // err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion) // if err!=nil{ // return err // } //} //发送文件至文件变更订阅 config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";" log.Printf("叮,资源文件[ %v ]上传完成",fileName) return nil } //文件信息入参 type FileParam struct { Digest string `json:"digest"` //md5(ProjectName|FolderName|RelativePath|FileName|FileVersion|UserName) ProjectName string `json:"projectName"` FolderName string `json:"folderName"` RelativePath string `json:"relativePath"` IpfsCid string `json:"ipfsCid"` FileName string `json:"fileName"` FileSize int64 `json:"fileSize,string"` FileVersion int `json:"fileVersion"` UserId int64 `json:"userId,string"` HistoryCurrentIpfsCid string `json:"historyCurrentIpfsCid"` HistoryPreIpfsCid string `json:"historyPreIpfsCid"` } //获取文件的历史版本管理文件最新hash值 func postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath string)(string,error){ url:=config.ServerUrl+"/api/pms/sdk/queryFileHistoryCurrentCid" contentType := "application/json" fileParam :=FileParam{ ProjectName: projectName, FolderName: folderName, RelativePath: relativePath, FileName: fileName, IpfsCid: "", FileSize: 0, FileVersion: 0, UserId: 0, HistoryPreIpfsCid: "", HistoryCurrentIpfsCid: "", } text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,fileParam.IpfsCid,fileName,fileParam.FileSize,fileParam.FileVersion,fileParam.UserId,fileParam.HistoryCurrentIpfsCid,fileParam.HistoryPreIpfsCid) textByte := []byte(text) md5Byte := md5.Sum(textByte) digest := fmt.Sprintf("%x", md5Byte) fileParam.Digest=digest jsonData,err :=json.Marshal(fileParam) if err!=nil{ log.Printf("json序列化化错误!") return "",err } //log.Print(string(jsonData[:])) resp, err := http.Post(url, contentType, bytes.NewReader(jsonData)) if err != nil { fmt.Println("post failed, err:%v\n", err) return "",err } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println("get resp failed,err:%v\n", err) return "",err } //log.Printf("post response:%v", string(b)) result := make(map[string] interface{}) err=json.Unmarshal(b,&result) if err!=nil{ log.Printf("字符串%v反序列化出错", string(b[:])) } if result["Msg"].(string)==""{ log.Printf("资源[ %v ]历史版本管理文件Hash:%v",fileName,result["Data"].(string)) return result["Data"].(string),nil } return "",errors.New(result["Msg"].(string)) } //更新文件记录 func postUpdateFile(projectName, folderName, relativePath, ipfsCid, fileName, historyCurrentIpfsCid, historyPreIpfsCid,userId string, fileSize int64, fileVersion int)(err error){ url:=config.ServerUrl+"/api/pms/sdk/updateFile" contentType := "application/json" intUserId,_:=strconv.ParseInt(userId,10,64) fileParam :=FileParam{ ProjectName: projectName, FolderName: folderName, RelativePath: relativePath, IpfsCid: ipfsCid, FileName: fileName, FileSize: fileSize, FileVersion: fileVersion, UserId: intUserId, HistoryCurrentIpfsCid: historyCurrentIpfsCid, HistoryPreIpfsCid: historyPreIpfsCid, } text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,ipfsCid,fileName,fileSize,fileVersion,userId,historyCurrentIpfsCid,historyPreIpfsCid) textByte := []byte(text) md5Byte := md5.Sum(textByte) digest := fmt.Sprintf("%x", md5Byte) fileParam.Digest=digest jsonData,err :=json.Marshal(fileParam) if err!=nil{ log.Printf("json序列化化错误!") return err } resp, err := http.Post(url, contentType, bytes.NewReader(jsonData)) if err != nil { fmt.Println("post failed, err:%v\n", err) return err } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println("get resp failed,err:%v\n", err) return err } //log.Printf("post response:%v", string(b)) result := make(map[string] interface{}) err=json.Unmarshal(b,&result) if err!=nil{ log.Printf("字符串%v反序列化出错", string(b[:])) return err } log.Printf("资源[ %v ]服务记录更新成功",fileName) return nil } /** 记录提交记录 */ func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (int,string,error){ commitHistory := new(commitHistory) //历史文件不存在则创建 localSh :=shell.NewShell(config.GobalIpfsUrl) localSh.SetTimeout(10*time.Second) if len(currentHistoryHash)!=0 { os.Remove(path) err := localSh.Get(currentHistoryHash,path) if err != nil { log.Println("历史版本管理文件下载失败") return -1,"",err } } //初始化历史管理文件 exist := fileExist(path) if !exist { commitFile,err := os.Create(path) if err != nil { log.Println("历史版本管理文件创建失败") return -1,"",err } commitFile.Close() } //设置文件隐藏属性 attribCmd :=exec.Command("attrib","+h",path) err :=attribCmd.Run() if err != nil { log.Println("设置文件隐藏属性失败") return -1,"",err } //读取历史管理文件 content ,err :=ioutil.ReadFile(path) if len(content)!=0{ rows :=strings.Split(string(content),"\n") endRow :=rows[len(rows)-2] columns :=strings.Split(endRow,"\t") commitHistory.Version,_=strconv.Atoi(columns[6]) commitHistory.Version++ commitHistory.ParentHash=columns[1] }else{ commitHistory.Version=1 commitHistory.ParentHash="0000000000000000000000000000000000000000" } commitHistory.CurrentHash=hash commitHistory.Milestone = milestone commitHistory.Creator = creator commitHistory.Note = note commitHistory.CreateTime=time.Now().Unix() if commitHistory.ParentHash==commitHistory.CurrentHash{ if commitHistory.Version>1{ commitHistory.Version-- } return commitHistory.Version,currentHistoryHash,nil } file,err :=os.OpenFile(path,os.O_APPEND,0666) if err != nil{ log.Println(err) return -1,"",err } //写入历史管理文件 w :=bufio.NewWriter(file) writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version) fmt.Fprintln(w,writeContent) err = w.Flush() if err != nil { log.Println("历史版本管理文件写入失败") return -1,"",err } file.Close() addFile,err :=os.Open(path) if err != nil{ log.Println(err) return -1,"",err } defer addFile.Close() //add 历史管理文件 historyHash,err:=localSh.Add(addFile) if err != nil { log.Println("历史版本管理文件上传失败") return -1,"",err } serverSh :=shell.NewShell(config.ServerIpfsUrl) serverSh.SetTimeout(30*time.Second) err = serverSh.Pin(historyHash) if err != nil { log.Println("历史版本管理文件备份失败") return -1,"",err } return commitHistory.Version,historyHash,nil } func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) { var out []byte buf := make([]byte, 1024, 1024) for { n, err := r.Read(buf[:]) if n > 0 { d := buf[:n] out = append(out, d...) progress <- string(d) } if err != nil { // Read returns io.EOF at the end of file, which is not an error for us if err == io.EOF { err = nil } return out, err } } // never reached panic(true) return nil, nil } /** 单个文件信息 */ type simpleFileInfo struct { Name string `json:"name" ` Extension string `json:"extension"` RelativePath string `json:"relativePath"` AbsolutePath string `json:"absolutePath"` } var gobalFolderFileMap map[string] *simpleFileInfo var gobalRelativePath string /** 获取指定目录或文件的文件信息,如果是目录递归获取文件信息 @param id 文件id */ func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{ gobalFolderFileMap = make(map[string] *simpleFileInfo) fileInfo,err :=os.Stat(absolutePath) if err!=nil{ log.Println(err) return err } log.Println(filepath.Dir(absolutePath)) //单个文件处理 if !fileInfo.IsDir() { simpleFileInfo := new(simpleFileInfo) simpleFileInfo.Name=fileInfo.Name() simpleFileInfo.Extension=path.Ext(absolutePath) simpleFileInfo.RelativePath="" simpleFileInfo.AbsolutePath=absolutePath gobalFolderFileMap[absolutePath]=simpleFileInfo bytes,err :=json.Marshal(gobalFolderFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil { log.Println(err) return err } return nil } //文件目录处理 gobalRelativePath = filepath.Dir(absolutePath) err =filepath.Walk(absolutePath, myWalkfunc) if err != nil { log.Println(err) return err } bytes,err :=json.Marshal(gobalFolderFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil { log.Println(err) return err } return nil } func myWalkfunc(path string, info os.FileInfo, err error) error { if info.IsDir()==false{ simpleFileInfo := new(simpleFileInfo) simpleFileInfo.Name=info.Name() simpleFileInfo.Extension=filepath.Ext(path) simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1)) simpleFileInfo.AbsolutePath=path gobalFolderFileMap[path]=simpleFileInfo return nil } return nil } /** 本地文件是否存在 */ func fileExist(path string) bool { _, err := os.Lstat(path) return !os.IsNotExist(err) } /** 获取本地文件列表 */ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ projectPath := gobalLocalProjectDir //log.Println("切换文件列表:"+projectPath) keyPrefix := gobalLoginUserName+"\\"+projectName+"\\" //添加监控 err := filepath.Walk(projectPath,watchWalkfunc) if err != nil { log.Println(err) return err } //初始化通道 if config.GobalWatchChannelMap[projectPath] != nil { close(config.GobalWatchChannelMap[projectPath]) } config.GobalWatchChannelMap[projectPath]=make(chan string,100) log.Println("添加文件监控:"+projectPath) //定期校验缓存的本地文件状态 dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix) if err != nil { log.Println(err) } if dataMapa!=nil && len(dataMapa)>0{ for k,_ := range dataMapa { if !fileExist(config.LocalWorkSpaceDir+k){ err = etcdclient.DeleteWithPrefix(k) if err != nil { log.Println(err) } } } } //优先etcd查询 dataMap,err := etcdclient.QueryWithPrefix(keyPrefix) if err != nil { log.Println(err) return err } if dataMap==nil || len(dataMap)==0{ // 不存在则初始化进etcd err =filepath.Walk(gobalLocalProjectDir,walkfunc) //路径错误 if err != nil { log.Println(err) if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil { log.Println(err) return err } } mapByte,err:=json.Marshal(gobalFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { log.Println(err) return err } cacheMap := make(map[string] string) for k,v := range gobalFileMap { k := strings.Replace(k,config.LocalWorkSpaceDir,"",1) cacheMap[k]=v } //异步缓存 //go func() { err = etcdclient.BatchAdd(cacheMap) if err != nil { log.Println(err) } //}() //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟") //清空gobalFileMap gobalFileMap = make(map[string] string) } err=sendFileListFromEtcd(keyPrefix,projectName,conn) if err != nil { log.Println(err) return err } for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[gobalLocalProjectDir] { //log.Println(actionAndModifyFilePathStr) actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";") queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1) //当前登陆用户判断 if gobalLoginUserName != strings.Split(queryKey,"\\")[0] && actionAndModifyFilePathStr!=";"{ log.Printf("非法用户修改%v", actionAndModifyFilePathStr) continue } if actionAndModifyFilePath[0]=="remove"{ queryMap,err :=etcdclient.QueryWithPrefix(queryKey) if len(queryMap)==0{ continue } err = etcdclient.DeleteWithPrefix(queryKey) if err != nil { log.Println(err) } }else if actionAndModifyFilePath[0]=="write"{ querymap,err := etcdclient.QueryWithPrefix(queryKey) if err != nil { log.Println(err) continue } if len(querymap)==0{ continue } //更新判断 if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{ continue } oldValue := strings.Split(querymap[queryKey],";") newValue := oldValue[0]+";" +"1" err = etcdclient.ReplaceInto(queryKey,newValue) if err!=nil{ log.Println(err) continue } log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr) //保存即同步逻辑,如果非新增文件则自动post //获取文件的历史版本管理文件hash //filePath := actionAndModifyFilePath[1] //fileName :=filepath.Base(filePath) //folderName := strings.Split(queryKey,"\\")[2] //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) //var relativePath string //if len(strings.Split(dir, "\\"))==1{ // relativePath = "" //}else{ // relativePath = strings.Replace(relativePath, folderName+"\\","",1) //} //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) //if err!=nil{ // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) // continue //} // ////自动更新文件 //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) //if err!=nil{ // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) // log.Printf("UploadCommand 返回失败,%v",err.Error()) // //记录修改状态 // newValue := oldValue[0]+";" +"1" // err = etcdclient.ReplaceInto(queryKey,newValue) // if err!=nil{ // log.Println(err) // continue // } // continue //} //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) //continue }else if actionAndModifyFilePath[0]=="create"{ querymap,err := etcdclient.QueryWithPrefix(queryKey) if err != nil { log.Println(err) continue } if len(querymap)==0{ continue } //更新判断 if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{ continue } oldValue := strings.Split(querymap[queryKey],";") newValue := oldValue[0]+";" +"1" err = etcdclient.ReplaceInto(queryKey,newValue) if err!=nil{ log.Println(err) continue } log.Printf("文件变更 [ %v ] create", actionAndModifyFilePathStr) //如果非新增文件则自动post //if goabalAddFileMap[] //获取文件的历史版本管理文件hash //filePath := actionAndModifyFilePath[1] //fileName :=filepath.Base(filePath) //folderName := strings.Split(queryKey,"\\")[2] //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1) //var relativePath string //if len(strings.Split(dir, "\\"))==1{ // relativePath = "" //}else{ // relativePath = strings.Replace(relativePath, folderName+"\\","",1) //} //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath) //if err!=nil{ // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error()) // continue //} // ////自动更新文件 //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false) //if err!=nil{ // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName) // log.Printf("UploadCommand 返回失败,%v",err.Error()) // //记录修改状态 // newValue := oldValue[0]+";" +"1" // err = etcdclient.ReplaceInto(queryKey,newValue) // if err!=nil{ // log.Println(err) // continue // } // continue //} //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName) //continue } err = sendFileListFromEtcd(keyPrefix,projectName,conn) if err != nil { log.Println(err) return err } } return nil } func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{ dataMap,err := etcdclient.QueryWithPrefix(keyPrefix) if err != nil { log.Println(err) return err } if dataMap!=nil && len(dataMap)>0{ cacheMap := make(map[string] string) for k,v := range dataMap { //历史数据加默认值 if len(strings.Split(v, ";"))==1{ v=v+";0" } cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v } mapByte,err:=json.Marshal(cacheMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { log.Println(err) return err } return nil }else{ log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName) } return nil } /** 打开方式 */ func OpenFileWith(filePath string) error{ //判断文件有效性 _,err := os.Stat(filePath) if err!=nil{ return err } //filePath = strings.Replace(filePath," ","~1",1) cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath); err =cmd.Run() if err!=nil{ log.Println(err) return err } return nil } /** 手动检查软件更新 0:不强制更新 1:强制更新 */ func CheckForUpdates(forceUpdate string) error{ tszdir :=os.Getenv("TSZDIR") //空格路径处理 ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs" //判断文件有效性 _,err := os.Stat(tszdir+config.UpdaterName) if err!=nil{ return err } cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck"); err =cmd.Run() if err!=nil{ log.Println(err) return err } cmd = exec.Command(tszdir+config.UpdaterName,"/checknow"); err =cmd.Run() if err!=nil{ log.Println(err) return err } //检测到更新 kill所有客户端进程 log.Println("close all process") cmd = exec.Command("cmd.exe","/c",ipfsPath); err =cmd.Run() if err!=nil{ log.Println(err) return err } return nil } func walkfunc(filePath string, info os.FileInfo, err error) error { if info == nil{ return nil } if info.IsDir()==false{ //历史文件不扫描 if path.Ext(filePath)==".commit" { return nil } sh := shell.NewShell(config.GobalIpfsUrl) file,err :=os.Open(filePath) if err != nil{ log.Println(err) return err } defer file.Close() hash,err :=sh.Add(file) if err != nil { log.Println(err) return err } dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(gobalLocalProjectDir+"\\"),"",1) gobalFileMap[dir]=hash } return nil } /** 查询历史文件 path 文件路径 hash 历史版本文件hash */ func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){ result := make(map[int] *commitHistory) //校验文件路径 _,err :=os.Stat(filePath) if err != nil { log.Println("文件 "+filePath+"not found") return nil,errors.New("参数错误!") } if len(hash) == 0 { return result,nil } //根据hash更新文件 localSh := shell.NewShell(config.GobalIpfsUrl) //连接失败判断 localSh.SetTimeout(5*time.Second) ext :=path.Ext(filePath) commitFilePath :=strings.Replace(filePath,ext,".commit",1) os.Remove(commitFilePath) err = localSh.Get(hash,commitFilePath) if err!=nil { log.Println("文件"+hash+"下载失败") return result,errors.New("历史文件获取失败,请稍后重试") } //设置文件隐藏属性 attribCmd :=exec.Command("attrib","+h",commitFilePath) err =attribCmd.Run() if err != nil { log.Println("设置文件隐藏属性失败") return result,err } //解析历史版本文件 contentByte,err := ioutil.ReadFile(commitFilePath) content := string(contentByte) if content==""{ return result,nil } rows :=strings.Split(content,"\n") length := len(rows) var index int = 0 for i:=length-2;i>=0;i--{ columns := strings.Split(rows[i],"\t") commitHistoryInstance := new(commitHistory) commitHistoryInstance.ParentHash =columns[0] commitHistoryInstance.CurrentHash=columns[1] commitHistoryInstance.Version,_=strconv.Atoi(columns[6]) commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5]) commitHistoryInstance.Creator=columns[2] commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64) commitHistoryInstance.Note = columns[4] result[index]=commitHistoryInstance index++ } return result,nil } /** 设定某个历史版本为里程碑版本 @param filePath 文件绝对路径 @param commitHistoryHash 历史版本管理文件hash @param hash 文件hash @param milestone 是否是里程碑 */ func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){ //result := make(map[int] *commitHistory) //校验文件路径 _,err :=os.Stat(filePath) if err != nil { log.Println("文件 "+filePath+"not found") return "",errors.New("参数错误!") } if len(commitHistoryHash) == 0 { log.Println("参数hash must not empty") return "",errors.New("参数错误!") } //根据hash更新文件 localSh := shell.NewShell(config.GobalIpfsUrl) //连接失败判断 localSh.SetTimeout(5*time.Second) ext :=path.Ext(filePath) commitFilePath :=strings.Replace(filePath,ext,".commit",1) os.Remove(commitFilePath) err = localSh.Get(commitHistoryHash,commitFilePath) if err!=nil { log.Println("文件"+commitHistoryHash+"下载失败") return "",errors.New("历史文件获取失败,请稍后重试") } //设置文件隐藏属性 attribCmd :=exec.Command("attrib","+h",commitFilePath) err =attribCmd.Run() if err != nil { log.Println("设置文件隐藏属性失败") return "",err } //解析历史版本文件 contentByte,err := ioutil.ReadFile(commitFilePath) content := string(contentByte) if content==""{ return "",nil } rows :=strings.Split(content,"\n") length := len(rows) resultString :="" for i:=0;i>>"+userId) if strconv.FormatInt(acceptUserId,10)==userId{ messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10) err =etcdclient.ReplaceInto(messagekey,message) if err!=nil{ nsqclient.MsgQueue <- message log.Println(err) } err = queryEtcdToWebSocket(conn, messagekey) if err!=nil{ nsqclient.MsgQueue <- message log.Println(err) } } } } return nil } /** 消息通知 @param userId 用户ID */ func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){ msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId) err = etcdclient.DeleteWithPrefix(msgKey) return err } /** 查询etcd对应key值发送给前端 */ func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){ msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix) if err!=nil{ log.Println(err) return err } mapByte,err :=json.Marshal(msgs) if err!=nil{ log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { log.Println(err) return err } return nil }