|
- package handle
-
- import (
- "encoding/json"
- "fmt"
- "fts/config"
- "fts/etcdclient"
- "github.com/gorilla/websocket"
- _ "github.com/ipfs/go-ipfs-api"
- shell "github.com/ipfs/go-ipfs-api"
- "io"
- "log"
- "os"
- "os/exec"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- )
-
- var gobalLoginUserName string
-
- //key:filepath,value:hash
- var gobalFileMap = make(map[string] string)
- var gobalFileUpdateTimeMap = make(map[string] string)
-
- //var gobalFileChangeMap = make(map[string] string)
- var getLocalFileListDir string
- var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
-
-
-
- var ipfsPath=os.Getenv("IPFS-PATH")
-
- /**
- 文件上传下载进度
- */
- type processStruct struct {
- Size string `json:"size"`
- CurrentSize string `json:"currentSize"`
- Unit string `json:"unit"`
- CurrentUnit string `json:"currentUnit"`
- Process float64 `json:"process"`
- Hash string `json:"hash"`
- }
-
- func main() {
-
-
- //config.InitConfig()
- //InitLocalWorkSpace("320872793405132801","test1")
- //
- //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
- //
- //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
- //
- //GetLocalFileList("testOne")
- }
-
-
-
- /**
- 初始化本地工作目录
- @param userId 用户ID
- @param projectName 项目名称
- */
- func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
-
- //初始化当前登陆用户
- gobalLoginUserName = userName
-
- // 检查本地目录是否存在
- var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName
- _,err := os.Stat(projectPath)
- if err != nil {
- //创建文件目录
- os.MkdirAll(projectPath, os.ModePerm)
- }
-
- log.Println("切换本地工作目录至 "+projectPath)
-
- if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
- log.Println(err)
- return err
- }
-
- return nil
- }
-
- /**
- 初始化本地客户端配置
- */
- func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
-
-
- config.ServerIpfsUrl = ipfsApi
- log.Println("初始化客户端配置项IPFS—API:"+config.ServerIpfsUrl)
- ipfsBootstraps := strings.Split(ipfsBootstrap,";")
- for _, simpleBootstrap := range ipfsBootstraps {
- log.Println("增加引导节点:"+simpleBootstrap)
- cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
- err :=cmd.Run()
- if err!=nil{
- return err
- }
- }
- return nil
- }
-
- /**
- 下载指令
- @param hash ipfs哈希值
- @param projectName 项目名称
- @para fileName 文件名称
- @param dir 云文件目录
- */
- func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
-
- absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
- //检查目录
- _,err := os.Stat(absoluteDir)
- if err != nil {
- //创建文件目录
- err = os.MkdirAll(absoluteDir, os.ModePerm)
- if err!=nil{
- log.Println(err)
- return err
- }
- }
-
-
- var downloading bool = false
-
- //检测文件打开状态
- tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1)
-
- if err != nil && (!os.IsNotExist(err)) {
-
- log.Println("文件被占用,请关闭打开的软件")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
- return err
- }
- return err
- }
- defer tfile.Close()
-
- //serverSh := shell.NewShell(config.ServerIpfsUrl)
- ////检测引导节点是否连接成功
- //isUp := serverSh.IsUp()
- //if !isUp {
- // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- // return err
- // }
- // return nil
- //}
-
- cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
- progress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
- }()
-
- go func(){
- millSeconds := time.Now().UnixNano() / 1e6
- for content := range progress { // 通道关闭后会退出for range循环
- current :=time.Now().UnixNano() / 1e6
-
- if current-millSeconds>500{
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
- if err != nil {
- log.Printf("json.Marshal error %s\n", err)
- }
- millSeconds = current
- downloading = true
-
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- log.Println(err)
- break
- }
-
- }
- if strings.Index(content,"100.00%")!=-1{
-
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
- if err != nil {
- log.Printf("json.Marshal error %s\n", err)
- }
- downloading = true
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- panic(err)
- }
- break
-
- }
- }
- }()
- log.Println("下载资源连接中...")
-
- //设置30秒连接超时
- go func() {
- index :=0
- for true{
- index++
- if downloading==true{
- return
- }
- time.Sleep(time.Duration(1)*time.Second)
- if downloading==false && index==30{
- err = cmd.Process.Kill()
- log.Println("进程连接超时30s已被Kill")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return
- }
- return
- }
- }
- }()
-
- err = cmd.Wait()
- if err != nil {
- log.Printf("cmd.Run() failed with %s\n", err)
- }
- if errStdout != nil || errStderr != nil {
- log.Printf("failed to capture stdout or stderr\n")
- }
- outStr := string(stdout)
- log.Printf("out:%s", outStr)
-
- //TODO test 更新数据库hash
- key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
- err = etcdclient.ReplaceInto(key,hash)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err==nil{
- log.Println("下载成功")
- }
-
- //time.Sleep(time.Duration(6)*time.Second)
-
- defer close(progress)
-
- return nil
- }
-
- func contentToJSONByte(content string) ([]byte,error){
- sts :=strings.Split(content," ")
- if len(sts)<8{
- log.Println("字符长度小于8")
- return nil,nil
- }
- var processFloat float64
- if (len(sts)==9 || len(sts)==8){
- processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
- }else{
- processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
- }
-
- if processFloat==0{
- //log.Println("当前进度0")
- return nil,nil
- }
-
- pro :=&processStruct{
- Size:sts[4],
- CurrentSize: sts[1],
- Unit: sts[2],
- CurrentUnit: sts[5],
- Process: processFloat,
- Hash: "",
-
- }
- projson,err :=json.Marshal(pro)
-
- return projson,err
- }
-
-
- /**
- 上传本地文件
- @param absolutePath 文件本地绝对路径
- @param fileName 文件名称
- @param projectName 项目名称
- @param dir 云文件目录
- */
- func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{
-
- //本地拷贝文件
- absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
- //检查目录
- _,err := os.Stat(absoluteDir)
- if err != nil {
- //创建文件目录
- err = os.MkdirAll(absoluteDir, os.ModePerm)
- if err!=nil{
- return err
- }
- }
-
- //检测文件打开状态
- tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
- if err != nil {
- log.Println("文件被占用,请关闭打开的软件")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
- return err
- }
- return err
- }
- defer tfile.Close()
-
- serverSh := shell.NewShell(config.ServerIpfsUrl)
- //serverSh.SetTimeout(time.Duration(30)*time.Second)
- log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
- //检测引导节点是否连接成功
- isUp := serverSh.IsUp()
- if !isUp {
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- return nil
- }
-
-
- var uploading bool=false
-
- cmd := exec.Command(ipfsPath, "add",absolutePath)
- uploadProgress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
- }()
-
- go func(){
- millSeconds := time.Now().UnixNano() / 1e6
- for content := range uploadProgress { // 通道关闭后会退出for range循环
- current :=time.Now().UnixNano() / 1e6
- if current-millSeconds>500{
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
- if err != nil {
- log.Println("json.Marshal error %s\n", err)
- }
- uploading=true
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- break
- }
- millSeconds = current
- }
- if strings.Index(content,"100.00%")!=-1{
-
- projson,err := contentToJSONByte(content)
- if projson==nil && err==nil{
- continue
- }
-
-
- if err != nil {
- log.Println("json.Marshal error %s\n", err)
- }
- uploading=true
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- panic(err)
- }
- break
- }
- }
- }()
-
- log.Println("文件上传中...")
-
- //设置30秒连接超时
- go func() {
- index :=0
- for true{
- index++
- if uploading==true{
- return
- }
- time.Sleep(time.Duration(1)*time.Second)
- if uploading==false && index==30{
- err = cmd.Process.Kill()
- log.Println("进程连接超时30s已被Kill")
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return
- }
- return
- }
- }
- }()
-
- err = cmd.Wait()
-
- if err != nil {
- log.Println("cmd.Run() failed with %s\n", err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- return err
- }
- if errStdout != nil || errStderr != nil {
- log.Println("failed to capture stdout or stderr\n")
- }
- outStr := string(stdout)
- log.Printf("out:%s", outStr)
- /*sh.Get(hash,absoluteDir+fileName)
- if err != nil {
- return "",err
- }*/
- defer close(uploadProgress)
-
- prog := new(processStruct)
- prog.Hash=strings.Split(outStr," ")[1]
- prog.Process=100.00
-
- sh := shell.NewShell(config.GobalIpfsUrl)
- //sh.SetTimeout(time.Duration(30)*time.Second)
- objectStat,err :=sh.ObjectStat(prog.Hash)
- if err != nil {
- log.Println(err)
- return err
- }
- prog.Size=strconv.Itoa(objectStat.CumulativeSize)
-
- projson,err :=json.Marshal(prog)
-
- //
- cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash)
- err = cmd.Run()
- if err != nil {
- log.Println(err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- return err
- }
-
- //serverSh.SetTimeout(time.Duration(600)*time.Second)
-
- err = serverSh.Pin(prog.Hash)
- if err != nil {
- log.Println("引导节点备份失败")
- log.Println(err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
- return err
- }
- return err
- }
- log.Println("引导节点文件备份成功")
-
- //本地文件夹拷贝
- err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
- if err != nil {
- log.Println(err)
- return err
- }
-
- //TODO test 更新数据库hash
- key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
- err = etcdclient.ReplaceInto(key,prog.Hash)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
- log.Println(err)
- }
- log.Println("上传成功")
-
- return nil
- }
-
- func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
- var out []byte
- buf := make([]byte, 1024, 1024)
- for {
- n, err := r.Read(buf[:])
- if n > 0 {
- d := buf[:n]
- out = append(out, d...)
- progress <- string(d)
-
- }
- if err != nil {
- // Read returns io.EOF at the end of file, which is not an error for us
- if err == io.EOF {
- err = nil
- }
- return out, err
- }
- }
- // never reached
- panic(true)
- return nil, nil
- }
-
- /**
- 单个文件信息
- */
- type simpleFileInfo struct {
- Name string `json:"name" `
- Extension string `json:"extension"`
- RelativePath string `json:"relativePath"`
- AbsolutePath string `json:"absolutePath"`
- }
-
- var gobalFolderFileMap map[string] *simpleFileInfo
- var gobalRelativePath string
- /**
- 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
- @param id 文件id
- */
- func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
-
- gobalFolderFileMap = make(map[string] *simpleFileInfo)
- fileInfo,err :=os.Stat(absolutePath)
- if err!=nil{
- log.Println(err)
- return err
- }
-
- log.Println(filepath.Dir(absolutePath))
- //单个文件处理
-
- if !fileInfo.IsDir() {
- simpleFileInfo := new(simpleFileInfo)
- simpleFileInfo.Name=fileInfo.Name()
- simpleFileInfo.Extension=path.Ext(absolutePath)
- simpleFileInfo.RelativePath=""
- simpleFileInfo.AbsolutePath=absolutePath
- gobalFolderFileMap[absolutePath]=simpleFileInfo
- bytes,err :=json.Marshal(gobalFolderFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
- if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
- log.Println(err)
- return err
- }
- return nil
- }
-
- //文件目录处理
- gobalRelativePath = filepath.Dir(absolutePath)
- err =filepath.Walk(absolutePath, myWalkfunc)
-
- if err != nil {
- log.Println(err)
- return err
- }
-
- bytes,err :=json.Marshal(gobalFolderFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
- if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
- log.Println(err)
- return err
- }
-
- return nil
- }
-
- func myWalkfunc(path string, info os.FileInfo, err error) error {
-
- if info.IsDir()==false{
- simpleFileInfo := new(simpleFileInfo)
- simpleFileInfo.Name=info.Name()
- simpleFileInfo.Extension=filepath.Ext(path)
- simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
- simpleFileInfo.AbsolutePath=path
- gobalFolderFileMap[path]=simpleFileInfo
- return nil
- }
-
- return nil
- }
-
- /**
- 本地文件是否存在
- */
- func fileExist(path string) bool {
- _, err := os.Lstat(path)
- return !os.IsNotExist(err)
- }
-
- /**
- 获取本地文件列表
- */
- func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
-
- getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\")
- keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
-
- //定期校验缓存的本地文件状态
- go func() {
- for true {
- time.Sleep(time.Duration(5)*time.Minute)
- dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
- if err != nil {
- log.Println(err)
- continue
- }
- if dataMap!=nil && len(dataMap)>0{
- for k,_ := range dataMap {
- if !fileExist(config.LocalWorkSpaceDir+k){
- err = etcdclient.DeleteWithPrefix(k)
- if err != nil {
- log.Println(err)
- }
- }
- }
- }
- }
- }()
-
- for true {
-
- //优先etcd查询
- dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
- if err != nil {
- log.Println(err)
- return err
- }
- if dataMap!=nil && len(dataMap)>0{
-
- cacheMap := make(map[string] string)
- for k,v := range dataMap {
- cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
- }
-
- mapByte,err:=json.Marshal(cacheMap)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
- log.Println(err)
- return err
- }
- time.Sleep(time.Duration(1)*time.Minute)
- continue
- }
-
- // 不存在则初始化进etcd
-
- err =filepath.Walk(getLocalFileListDir,walkfunc)
- //路径错误
- if err != nil {
- log.Println(err)
- if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
- log.Println(err)
- return err
- }
- time.Sleep(time.Duration(1)*time.Minute)
- continue
- }
-
- mapByte,err:=json.Marshal(gobalFileMap)
- if err != nil {
- log.Println(err)
- return err
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
- log.Println(err)
- return err
- }
-
- cacheMap := make(map[string] string)
- for k,v := range gobalFileMap {
- k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
- cacheMap[k]=v
-
- }
-
- //异步缓存
- //go func() {
- err = etcdclient.BatchAdd(cacheMap)
- if err != nil {
- log.Println(err)
- }
- //}()
-
- //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
-
- //清空gobalFileMap
- gobalFileMap = make(map[string] string)
- time.Sleep(time.Duration(1)*time.Minute)
- }
-
- return nil
- }
-
-
-
- /**
- TODO 文件变更通知
- */
- func FileChangeNotify(){
-
- }
-
- /**
- TODO 监视文件变动
- */
- func WatchFile(filePaths string) error{
- if len(filePaths)==0{
- //TODO
- return nil
- }
- files := strings.Split(filePaths,";")
- for _,file := range files{
- err :=config.GobalWatch.Add(file)
- if err != nil {
- log.Println(err)
- }
- log.Println("文件[ "+file+" ]添加监听事件成功")
- }
-
- return nil
- }
-
- /**
- 打开方式
- */
- func OpenFileWith(filePath string) error{
-
- //判断文件有效性
- _,err := os.Stat(filePath)
- if err!=nil{
- return err
- }
-
- //filePath = strings.Replace(filePath," ","~1",1)
-
- cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
-
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
- return nil
- }
-
- /**
- 手动检查软件更新
- 0:不强制更新
- 1:强制更新
- */
- func CheckForUpdates(forceUpdate string) error{
-
- tszdir :=os.Getenv("TSZDIR")
-
- //空格路径处理
- ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
-
-
- //判断文件有效性
- _,err := os.Stat(tszdir+config.UpdaterName)
- if err!=nil{
- return err
- }
-
-
- cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
-
- cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
-
- //检测到更新 kill所有客户端进程
- log.Println("close all process")
- cmd = exec.Command("cmd.exe","/c",ipfsPath);
- err =cmd.Run()
- if err!=nil{
- log.Println(err)
- return err
- }
- return nil
-
- }
-
- func walkfunc(path string, info os.FileInfo, err error) error {
-
- if info == nil{
- return nil
- }
-
- if info.IsDir()==false{
-
- sh := shell.NewShell(config.GobalIpfsUrl)
- file,err :=os.Open(path)
-
- if err != nil{
- log.Println(err)
- return err
- }
-
- defer file.Close()
-
- hash,err :=sh.Add(file)
- if err != nil {
- log.Println(err)
- return err
- }
-
- dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1)
-
- gobalFileMap[dir]=hash
-
- }
- return nil
- }
-
-
-
-
-
-
-
-
|