package handle import ( "encoding/json" "fmt" "fts/config" "github.com/gorilla/websocket" _ "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api" "io" "log" "os" "os/exec" "path" "path/filepath" "strconv" "strings" "time" ) var gobalLoginUserId string //key:filepath,value:hash var gobalFileMap = make(map[string] string) var gobalFileUpdateTimeMap = make(map[string] string) //var gobalFileChangeMap = make(map[string] string) var getLocalFileListDir string var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关 var ipfsPath=os.Getenv("IPFS-PATH") /** 文件上传下载进度 */ type processStruct struct { Size string `json:"size"` CurrentSize string `json:"currentSize"` Unit string `json:"unit"` CurrentUnit string `json:"currentUnit"` Process float64 `json:"process"` Hash string `json:"hash"` } func main() { //config.InitConfig() //InitLocalWorkSpace("320872793405132801","test1") // //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/") // //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/") // //GetLocalFileList("testOne") } /** 初始化本地工作目录 @param userId 用户ID @param projectName 项目名称 */ func InitLocalWorkSpace(conn *websocket.Conn,userId,projectName string) (error){ //空格路径处理 ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe" //初始化当前登陆用户 gobalLoginUserId = userId // 检查本地目录是否存在 var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName _,err := os.Stat(projectPath) if err != nil { //创建文件目录 os.MkdirAll(projectPath, os.ModePerm) /*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm) os.MkdirAll(projectPath+"\\工作文件", os.ModePerm) os.MkdirAll(projectPath+"\\协作文件", os.ModePerm) os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/ } log.Println("切换本地工作目录至 "+projectPath) if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil { log.Println(err) return err } return nil } /** 下载指令 @param hash ipfs哈希值 @param projectName 项目名称 @para fileName 文件名称 @param dir 云文件目录 */ func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{ absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir //检查目录 _,err := os.Stat(absoluteDir) if err != nil { //创建文件目录 err = os.MkdirAll(absoluteDir, os.ModePerm) if err!=nil{ log.Println(err) return err } } var downloading bool = false //检测文件打开状态 tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1) if err != nil && (!os.IsNotExist(err)) { log.Println("文件被占用,请关闭打开的软件") if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { return err } return err } defer tfile.Close() //serverSh := shell.NewShell(config.ServerIpfsUrl) ////检测引导节点是否连接成功 //isUp := serverSh.IsUp() //if !isUp { // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { // return err // } // return nil //} cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName)) progress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress) }() go func(){ millSeconds := time.Now().UnixNano() / 1e6 for content := range progress { // 通道关闭后会退出for range循环 current :=time.Now().UnixNano() / 1e6 if current-millSeconds>500{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Printf("json.Marshal error %s\n", err) } millSeconds = current downloading = true if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { log.Println(err) break } } if strings.Index(content,"100.00%")!=-1{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Printf("json.Marshal error %s\n", err) } downloading = true if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { panic(err) } break } } }() log.Println("下载资源连接中...") //设置30秒连接超时 go func() { index :=0 for true{ index++ if downloading==true{ return } time.Sleep(time.Duration(1)*time.Second) if downloading==false && index==30{ err = cmd.Process.Kill() log.Println("进程连接超时30s已被Kill") if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return } return } } }() err = cmd.Wait() if err != nil { log.Printf("cmd.Run() failed with %s\n", err) } if errStdout != nil || errStderr != nil { log.Printf("failed to capture stdout or stderr\n") } outStr, errStr := string(stdout), string(stderr) log.Printf("out:%s ,err:%s", outStr, errStr) if err==nil{ log.Println("下载成功") } //time.Sleep(time.Duration(6)*time.Second) defer close(progress) return nil } func contentToJSONByte(content string) ([]byte,error){ sts :=strings.Split(content," ") if len(sts)<8{ log.Println("字符长度小于8") return nil,nil } var processFloat float64 if (len(sts)==9 || len(sts)==8){ processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64) }else{ processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64) } if processFloat==0{ //log.Println("当前进度0") return nil,nil } pro :=&processStruct{ Size:sts[4], CurrentSize: sts[1], Unit: sts[2], CurrentUnit: sts[5], Process: processFloat, Hash: "", } projson,err :=json.Marshal(pro) return projson,err } /** 上传本地文件 @param absolutePath 文件本地绝对路径 @param fileName 文件名称 @param projectName 项目名称 @param dir 云文件目录 */ func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{ //本地拷贝文件 absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir //检查目录 _,err := os.Stat(absoluteDir) if err != nil { //创建文件目录 err = os.MkdirAll(absoluteDir, os.ModePerm) if err!=nil{ return err } } //检测文件打开状态 tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1) if err != nil { log.Println("文件被占用,请关闭打开的软件") if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil { return err } return err } defer tfile.Close() serverSh := shell.NewShell(config.ServerIpfsUrl) //检测引导节点是否连接成功 isUp := serverSh.IsUp() if !isUp { if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } return nil } var uploading bool=false cmd := exec.Command(ipfsPath, "add",absolutePath) uploadProgress := make(chan string,10000) var stdout, stderr []byte var errStdout, errStderr error stdoutIn, _ := cmd.StdoutPipe() stderrIn, _ := cmd.StderrPipe() cmd.Start() go func() { stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress) }() go func() { stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress) }() go func(){ millSeconds := time.Now().UnixNano() / 1e6 for content := range uploadProgress { // 通道关闭后会退出for range循环 current :=time.Now().UnixNano() / 1e6 if current-millSeconds>500{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Println("json.Marshal error %s\n", err) } uploading=true if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { break } millSeconds = current } if strings.Index(content,"100.00%")!=-1{ projson,err := contentToJSONByte(content) if projson==nil && err==nil{ continue } if err != nil { log.Println("json.Marshal error %s\n", err) } uploading=true if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { panic(err) } break } } }() log.Println("文件上传中...") //设置30秒连接超时 go func() { index :=0 for true{ index++ if uploading==true{ return } time.Sleep(time.Duration(1)*time.Second) if uploading==false && index==30{ err = cmd.Process.Kill() log.Println("进程连接超时30s已被Kill") if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return } return } } }() err = cmd.Wait() if err != nil { log.Println("cmd.Run() failed with %s\n", err) } if errStdout != nil || errStderr != nil { log.Println("failed to capture stdout or stderr\n") } outStr, errStr := string(stdout), string(stderr) log.Printf("out:%s,err:%s", outStr, errStr) /*sh.Get(hash,absoluteDir+fileName) if err != nil { return "",err }*/ defer close(uploadProgress) prog := new(processStruct) prog.Hash=strings.Split(outStr," ")[1] prog.Process=100.00 sh := shell.NewShell(config.GobalIpfsUrl) objectStat,err :=sh.ObjectStat(prog.Hash) if err != nil { log.Println(err) return err } prog.Size=strconv.Itoa(objectStat.CumulativeSize) projson,err :=json.Marshal(prog) err = serverSh.Pin(prog.Hash) if err != nil { log.Println("引导节点备份失败") log.Println(err) if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil { return err } return err } if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil { log.Println(err) } log.Println("引导节点文件备份成功") //本地文件夹拷贝 err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName))) if err != nil { log.Println(err) return err } log.Println("上传成功") return nil } func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) { var out []byte buf := make([]byte, 1024, 1024) for { n, err := r.Read(buf[:]) if n > 0 { d := buf[:n] out = append(out, d...) progress <- string(d) } if err != nil { // Read returns io.EOF at the end of file, which is not an error for us if err == io.EOF { err = nil } return out, err } } // never reached panic(true) return nil, nil } /** 单个文件信息 */ type simpleFileInfo struct { Name string `json:"name" ` Extension string `json:"extension"` RelativePath string `json:"relativePath"` AbsolutePath string `json:"absolutePath"` } var gobalFolderFileMap = make(map[string] *simpleFileInfo) var gobalRelativePath string /** 获取指定目录或文件的文件信息,如果是目录递归获取文件信息 @param id 文件id */ func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{ fileInfo,err :=os.Stat(absolutePath) if err!=nil{ log.Println(err) return err } log.Println(filepath.Dir(absolutePath)) //单个文件处理 if !fileInfo.IsDir() { simpleFileInfo := new(simpleFileInfo) simpleFileInfo.Name=fileInfo.Name() simpleFileInfo.Extension=path.Ext(absolutePath) simpleFileInfo.RelativePath="" simpleFileInfo.AbsolutePath=absolutePath gobalFolderFileMap[absolutePath]=simpleFileInfo bytes,err :=json.Marshal(gobalFolderFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil { log.Println(err) return err } return nil } //文件目录处理 gobalRelativePath = filepath.Dir(absolutePath)+"\\" err =filepath.Walk(absolutePath, myWalkfunc) if err != nil { log.Println(err) return err } bytes,err :=json.Marshal(gobalFolderFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil { log.Println(err) return err } return nil } func myWalkfunc(path string, info os.FileInfo, err error) error { if info.IsDir()==false{ simpleFileInfo := new(simpleFileInfo) simpleFileInfo.Name=info.Name() simpleFileInfo.Extension=filepath.Ext(path) simpleFileInfo.RelativePath=strings.Replace(path,gobalRelativePath,"",1) simpleFileInfo.AbsolutePath=path gobalFolderFileMap[path]=simpleFileInfo return nil } return nil } /** 获取本地文件列表 */ func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{ getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\") for { err :=filepath.Walk(getLocalFileListDir,walkfunc) if err != nil { log.Println(err) time.Sleep(time.Duration(1)*time.Minute) continue } mapByte,err:=json.Marshal(gobalFileMap) if err != nil { log.Println(err) return err } if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil { log.Println(err) return err } log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟") //清空gobalFileMap gobalFileMap = make(map[string] string) time.Sleep(time.Duration(1)*time.Minute) } return nil } func walkfunc(path string, info os.FileInfo, err error) error { if info.IsDir()==false{ sh := shell.NewShell(config.GobalIpfsUrl) file,err :=os.Open(path) if err != nil{ log.Println(err) return err } defer file.Close() hash,err :=sh.Add(file) if err != nil { log.Println(err) return err } dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1) gobalFileMap[dir]=hash } return nil }