文件同步
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

1244 行
28 KiB

  1. package handle
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "fts/config"
  8. "fts/etcdclient"
  9. "github.com/gorilla/websocket"
  10. _ "github.com/ipfs/go-ipfs-api"
  11. shell "github.com/ipfs/go-ipfs-api"
  12. "io"
  13. "io/ioutil"
  14. "log"
  15. "os"
  16. "os/exec"
  17. "path"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. var gobalLoginUserName string
  24. //key:filepath,value:hash
  25. var gobalFileMap = make(map[string] string)
  26. var gobalFileUpdateTimeMap = make(map[string] string)
  27. //var gobalFileChangeMap = make(map[string] string)
  28. var getLocalFileListDir string
  29. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  30. var ipfsPath=os.Getenv("IPFS-PATH")
  31. /**
  32. 文件上传下载进度
  33. */
  34. type processStruct struct {
  35. Size string `json:"size"`
  36. CurrentSize string `json:"currentSize"`
  37. Unit string `json:"unit"`
  38. CurrentUnit string `json:"currentUnit"`
  39. Process float64 `json:"process"`
  40. Hash string `json:"hash"`
  41. CommitHistoryHash string `json:"commitHistoryHash"`
  42. }
  43. func main() {
  44. //config.InitConfig()
  45. //InitLocalWorkSpace("320872793405132801","test1")
  46. //
  47. //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
  48. //
  49. //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
  50. //
  51. //GetLocalFileList("testOne")
  52. }
  53. /**
  54. 初始化本地工作目录
  55. @param userId 用户ID
  56. @param projectName 项目名称
  57. */
  58. func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){
  59. //空格路径处理
  60. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  61. //初始化当前登陆用户
  62. gobalLoginUserName = userName
  63. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  64. // 检查本地目录是否存在
  65. var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName
  66. _,err := os.Stat(projectPath)
  67. if err != nil {
  68. //创建文件目录
  69. os.MkdirAll(projectPath, os.ModePerm)
  70. }
  71. log.Println("切换本地工作目录至 "+projectPath)
  72. err = filepath.Walk(projectPath,watchWalkfunc)
  73. if err != nil {
  74. log.Println(err)
  75. return err
  76. }
  77. config.GobalWatchChannelMap[projectPath]=make(chan string,100)
  78. log.Println(projectPath+"添加文件监控")
  79. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
  80. log.Println(err)
  81. return err
  82. }
  83. return nil
  84. }
  85. func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
  86. if info == nil{
  87. return nil
  88. }
  89. if info.IsDir()==true{
  90. err = config.GobalWatch.Add(filePath)
  91. if err != nil {
  92. log.Println(err)
  93. return err
  94. }
  95. }
  96. return nil
  97. }
  98. /**
  99. 初始化本地客户端配置
  100. */
  101. func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
  102. //空格路径处理
  103. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  104. config.ServerIpfsUrl = ipfsApi
  105. log.Println("初始化客户端配置项IPFS—API:"+config.ServerIpfsUrl)
  106. ipfsBootstraps := strings.Split(ipfsBootstrap,";")
  107. for _, simpleBootstrap := range ipfsBootstraps {
  108. log.Println("增加引导节点:"+simpleBootstrap)
  109. cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
  110. err :=cmd.Run()
  111. if err!=nil{
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. /**
  118. 下载指令
  119. @param hash ipfs哈希值
  120. @param projectName 项目名称
  121. @para fileName 文件名称
  122. @param dir 云文件目录
  123. */
  124. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
  125. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  126. //检查目录
  127. _,err := os.Stat(absoluteDir)
  128. if err != nil {
  129. //创建文件目录
  130. err = os.MkdirAll(absoluteDir, os.ModePerm)
  131. if err!=nil{
  132. log.Println(err)
  133. return err
  134. }
  135. }
  136. var downloading bool = false
  137. //检测文件打开状态
  138. //tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1)
  139. //
  140. //if err != nil && (!os.IsNotExist(err)) {
  141. //
  142. // log.Println("文件被占用,请关闭打开的软件")
  143. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  144. // return err
  145. // }
  146. // return err
  147. //}
  148. //defer tfile.Close()
  149. //serverSh := shell.NewShell(config.ServerIpfsUrl)
  150. ////检测引导节点是否连接成功
  151. //isUp := serverSh.IsUp()
  152. //if !isUp {
  153. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  154. // return err
  155. // }
  156. // return nil
  157. //}
  158. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
  159. progress := make(chan string,10000)
  160. var stdout, stderr []byte
  161. var errStdout, errStderr error
  162. stdoutIn, _ := cmd.StdoutPipe()
  163. stderrIn, _ := cmd.StderrPipe()
  164. cmd.Start()
  165. go func() {
  166. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  167. }()
  168. go func() {
  169. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  170. }()
  171. go func(){
  172. millSeconds := time.Now().UnixNano() / 1e6
  173. for content := range progress { // 通道关闭后会退出for range循环
  174. current :=time.Now().UnixNano() / 1e6
  175. if current-millSeconds>500{
  176. projson,err := contentToJSONByte(content)
  177. if projson==nil && err==nil{
  178. continue
  179. }
  180. if err != nil {
  181. log.Printf("json.Marshal error %s\n", err)
  182. }
  183. millSeconds = current
  184. downloading = true
  185. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  186. log.Println(err)
  187. break
  188. }
  189. }
  190. if strings.Index(content,"100.00%")!=-1{
  191. projson,err := contentToJSONByte(content)
  192. if projson==nil && err==nil{
  193. continue
  194. }
  195. if err != nil {
  196. log.Printf("json.Marshal error %s\n", err)
  197. }
  198. downloading = true
  199. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  200. panic(err)
  201. }
  202. break
  203. }
  204. }
  205. }()
  206. log.Println("下载资源连接中...")
  207. //设置30秒连接超时
  208. go func() {
  209. index :=0
  210. for true{
  211. index++
  212. if downloading==true{
  213. return
  214. }
  215. time.Sleep(time.Duration(1)*time.Second)
  216. if downloading==false && index==30{
  217. err = cmd.Process.Kill()
  218. log.Println("进程连接超时30s已被Kill")
  219. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  220. return
  221. }
  222. return
  223. }
  224. }
  225. }()
  226. err = cmd.Wait()
  227. if err != nil {
  228. log.Printf("cmd.Run() failed with %s\n", err)
  229. }
  230. if errStdout != nil || errStderr != nil {
  231. log.Printf("failed to capture stdout or stderr\n")
  232. }
  233. outStr := string(stdout)
  234. log.Printf("out:%s", outStr)
  235. //TODO test 更新数据库hash
  236. time.Sleep(200*time.Millisecond)
  237. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  238. err = etcdclient.ReplaceInto(key,hash)
  239. if err != nil {
  240. log.Println(err)
  241. return err
  242. }
  243. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  244. if err==nil{
  245. log.Println("下载成功")
  246. }
  247. //time.Sleep(time.Duration(6)*time.Second)
  248. defer close(progress)
  249. return nil
  250. }
  251. func contentToJSONByte(content string) ([]byte,error){
  252. sts :=strings.Split(content," ")
  253. if len(sts)<8{
  254. log.Println("字符长度小于8")
  255. return nil,nil
  256. }
  257. var processFloat float64
  258. if (len(sts)==9 || len(sts)==8){
  259. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  260. }else{
  261. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  262. }
  263. if processFloat==0{
  264. //log.Println("当前进度0")
  265. return nil,nil
  266. }
  267. pro :=&processStruct{
  268. Size:sts[4],
  269. CurrentSize: sts[1],
  270. Unit: sts[2],
  271. CurrentUnit: sts[5],
  272. Process: processFloat,
  273. Hash: "",
  274. }
  275. projson,err :=json.Marshal(pro)
  276. return projson,err
  277. }
  278. /**
  279. 上传本地文件
  280. @param absolutePath 文件本地绝对路径
  281. @param fileName 文件名称
  282. @param projectName 项目名称
  283. @param dir 云文件目录
  284. */
  285. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
  286. //本地拷贝文件
  287. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  288. //检查目录
  289. _,err := os.Stat(absoluteDir)
  290. if err != nil {
  291. //创建文件目录
  292. err = os.MkdirAll(absoluteDir, os.ModePerm)
  293. if err!=nil{
  294. return err
  295. }
  296. }
  297. //检测文件打开状态
  298. ///*tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  299. //if err != nil {
  300. // log.Println("文件被占用,请关闭打开的软件")
  301. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  302. // return err
  303. // }
  304. // return err
  305. //}
  306. //defer tfile.Close()*/
  307. serverSh := shell.NewShell(config.ServerIpfsUrl)
  308. //serverSh.SetTimeout(time.Duration(30)*time.Second)
  309. log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  310. //检测引导节点是否连接成功
  311. isUp := serverSh.IsUp()
  312. if !isUp {
  313. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  314. return err
  315. }
  316. return nil
  317. }
  318. var uploading bool=false
  319. cmd := exec.Command(ipfsPath, "add",absolutePath)
  320. uploadProgress := make(chan string,10000)
  321. var stdout, stderr []byte
  322. var errStdout, errStderr error
  323. stdoutIn, _ := cmd.StdoutPipe()
  324. stderrIn, _ := cmd.StderrPipe()
  325. cmd.Start()
  326. go func() {
  327. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  328. }()
  329. go func() {
  330. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  331. }()
  332. go func(){
  333. millSeconds := time.Now().UnixNano() / 1e6
  334. for content := range uploadProgress { // 通道关闭后会退出for range循环
  335. current :=time.Now().UnixNano() / 1e6
  336. if current-millSeconds>500{
  337. projson,err := contentToJSONByte(content)
  338. if projson==nil && err==nil{
  339. continue
  340. }
  341. if err != nil {
  342. log.Println("json.Marshal error %s\n", err)
  343. }
  344. uploading=true
  345. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  346. break
  347. }
  348. millSeconds = current
  349. }
  350. if strings.Index(content,"100.00%")!=-1{
  351. projson,err := contentToJSONByte(content)
  352. if projson==nil && err==nil{
  353. continue
  354. }
  355. if err != nil {
  356. log.Println("json.Marshal error %s\n", err)
  357. }
  358. uploading=true
  359. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  360. panic(err)
  361. }
  362. break
  363. }
  364. }
  365. }()
  366. log.Println("文件上传中...")
  367. //设置30秒连接超时
  368. go func() {
  369. index :=0
  370. for true{
  371. index++
  372. if uploading==true{
  373. return
  374. }
  375. time.Sleep(time.Duration(1)*time.Second)
  376. if uploading==false && index==30{
  377. err = cmd.Process.Kill()
  378. log.Println("进程连接超时30s已被Kill")
  379. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  380. return
  381. }
  382. return
  383. }
  384. }
  385. }()
  386. err = cmd.Wait()
  387. if err != nil {
  388. log.Println("cmd.Run() failed with %s\n", err)
  389. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  390. return err
  391. }
  392. return err
  393. }
  394. if errStdout != nil || errStderr != nil {
  395. log.Println("failed to capture stdout or stderr\n")
  396. }
  397. outStr := string(stdout)
  398. log.Printf("out:%s", outStr)
  399. /*sh.Get(hash,absoluteDir+fileName)
  400. if err != nil {
  401. return "",err
  402. }*/
  403. defer close(uploadProgress)
  404. prog := new(processStruct)
  405. prog.Hash=strings.Split(outStr," ")[1]
  406. prog.Process=100.00
  407. sh := shell.NewShell(config.GobalIpfsUrl)
  408. //sh.SetTimeout(time.Duration(30)*time.Second)
  409. objectStat,err :=sh.ObjectStat(prog.Hash)
  410. if err != nil {
  411. log.Println(err)
  412. return err
  413. }
  414. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  415. //
  416. cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash)
  417. err = cmd.Run()
  418. if err != nil {
  419. log.Println(err)
  420. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  421. return err
  422. }
  423. return err
  424. }
  425. //serverSh.SetTimeout(time.Duration(600)*time.Second)
  426. err = serverSh.Pin(prog.Hash)
  427. if err != nil {
  428. log.Println("引导节点备份失败")
  429. log.Println(err)
  430. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  431. return err
  432. }
  433. return err
  434. }
  435. log.Println("引导节点文件备份成功")
  436. //本地文件夹拷贝
  437. err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
  438. if err != nil {
  439. log.Println(err)
  440. return err
  441. }
  442. //记录历史
  443. filenameall := path.Base(fileName)
  444. filesuffix := path.Ext(fileName)
  445. fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
  446. commitFilePath := absoluteDir+"\\"+fileprefix+".commit"
  447. commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,prog.Hash,note,creator,milestone)
  448. if err != nil {
  449. log.Println("历史文件写入失败")
  450. log.Println(err)
  451. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  452. return err
  453. }
  454. return err
  455. }
  456. prog.CommitHistoryHash=commitHistoryHash
  457. projson,err :=json.Marshal(prog)
  458. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  459. log.Println(err)
  460. return err
  461. }
  462. //更新数据库hash
  463. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  464. err = etcdclient.ReplaceInto(key,prog.Hash+";0")
  465. if err != nil {
  466. log.Println(err)
  467. return err
  468. }
  469. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  470. log.Println("上传成功")
  471. return nil
  472. }
  473. /**
  474. 记录提交记录
  475. */
  476. func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (string,error){
  477. commitHistory := new(commitHistory)
  478. //历史文件不存在则创建
  479. localSh :=shell.NewShell(config.GobalIpfsUrl)
  480. localSh.SetTimeout(5*time.Second)
  481. if len(currentHistoryHash)!=0 {
  482. os.Remove(path)
  483. err := localSh.Get(currentHistoryHash,path)
  484. if err != nil {
  485. log.Println("历史版本管理文件下载失败")
  486. return "",err
  487. }
  488. }
  489. //初始化历史管理文件
  490. exist := fileExist(path)
  491. if !exist {
  492. commitFile,err := os.Create(path)
  493. if err != nil {
  494. log.Println("历史版本管理文件创建失败")
  495. return "",err
  496. }
  497. commitFile.Close()
  498. }
  499. //设置文件隐藏属性
  500. attribCmd :=exec.Command("attrib","+h",path)
  501. err :=attribCmd.Run()
  502. if err != nil {
  503. log.Println("设置文件隐藏属性失败")
  504. return "",err
  505. }
  506. //读取历史管理文件
  507. content ,err :=ioutil.ReadFile(path)
  508. if len(content)!=0{
  509. rows :=strings.Split(string(content),"\n")
  510. endRow :=rows[len(rows)-2]
  511. columns :=strings.Split(endRow,"\t")
  512. commitHistory.Version,_=strconv.Atoi(columns[6])
  513. commitHistory.Version++
  514. commitHistory.ParentHash=columns[1]
  515. }else{
  516. commitHistory.Version=1
  517. commitHistory.ParentHash="0000000000000000000000000000000000000000"
  518. }
  519. commitHistory.CurrentHash=hash
  520. commitHistory.Milestone = milestone
  521. commitHistory.Creator = creator
  522. commitHistory.Note = note
  523. commitHistory.CreateTime=time.Now().Unix()
  524. if commitHistory.ParentHash==commitHistory.CurrentHash{
  525. return commitHistory.CurrentHash,nil
  526. }
  527. file,err :=os.OpenFile(path,os.O_APPEND,0666)
  528. if err != nil{
  529. log.Println(err)
  530. return "",err
  531. }
  532. //写入历史管理文件
  533. w :=bufio.NewWriter(file)
  534. writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
  535. fmt.Fprintln(w,writeContent)
  536. err = w.Flush()
  537. if err != nil {
  538. log.Println("历史版本管理文件写入失败")
  539. return "",err
  540. }
  541. file.Close()
  542. addFile,err :=os.Open(path)
  543. if err != nil{
  544. log.Println(err)
  545. return "",err
  546. }
  547. defer addFile.Close()
  548. //add 历史管理文件
  549. historyHash,err:=localSh.Add(addFile)
  550. if err != nil {
  551. log.Println("历史版本管理文件上传失败")
  552. return "",err
  553. }
  554. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  555. serverSh.SetTimeout(5*time.Second)
  556. err = serverSh.Pin(historyHash)
  557. if err != nil {
  558. log.Println("历史版本管理文件备份失败")
  559. return "",err
  560. }
  561. return historyHash,nil
  562. }
  563. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  564. var out []byte
  565. buf := make([]byte, 1024, 1024)
  566. for {
  567. n, err := r.Read(buf[:])
  568. if n > 0 {
  569. d := buf[:n]
  570. out = append(out, d...)
  571. progress <- string(d)
  572. }
  573. if err != nil {
  574. // Read returns io.EOF at the end of file, which is not an error for us
  575. if err == io.EOF {
  576. err = nil
  577. }
  578. return out, err
  579. }
  580. }
  581. // never reached
  582. panic(true)
  583. return nil, nil
  584. }
  585. /**
  586. 单个文件信息
  587. */
  588. type simpleFileInfo struct {
  589. Name string `json:"name" `
  590. Extension string `json:"extension"`
  591. RelativePath string `json:"relativePath"`
  592. AbsolutePath string `json:"absolutePath"`
  593. }
  594. var gobalFolderFileMap map[string] *simpleFileInfo
  595. var gobalRelativePath string
  596. /**
  597. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  598. @param id 文件id
  599. */
  600. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  601. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  602. fileInfo,err :=os.Stat(absolutePath)
  603. if err!=nil{
  604. log.Println(err)
  605. return err
  606. }
  607. log.Println(filepath.Dir(absolutePath))
  608. //单个文件处理
  609. if !fileInfo.IsDir() {
  610. simpleFileInfo := new(simpleFileInfo)
  611. simpleFileInfo.Name=fileInfo.Name()
  612. simpleFileInfo.Extension=path.Ext(absolutePath)
  613. simpleFileInfo.RelativePath=""
  614. simpleFileInfo.AbsolutePath=absolutePath
  615. gobalFolderFileMap[absolutePath]=simpleFileInfo
  616. bytes,err :=json.Marshal(gobalFolderFileMap)
  617. if err != nil {
  618. log.Println(err)
  619. return err
  620. }
  621. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  622. log.Println(err)
  623. return err
  624. }
  625. return nil
  626. }
  627. //文件目录处理
  628. gobalRelativePath = filepath.Dir(absolutePath)
  629. err =filepath.Walk(absolutePath, myWalkfunc)
  630. if err != nil {
  631. log.Println(err)
  632. return err
  633. }
  634. bytes,err :=json.Marshal(gobalFolderFileMap)
  635. if err != nil {
  636. log.Println(err)
  637. return err
  638. }
  639. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  640. log.Println(err)
  641. return err
  642. }
  643. return nil
  644. }
  645. func myWalkfunc(path string, info os.FileInfo, err error) error {
  646. if info.IsDir()==false{
  647. simpleFileInfo := new(simpleFileInfo)
  648. simpleFileInfo.Name=info.Name()
  649. simpleFileInfo.Extension=filepath.Ext(path)
  650. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  651. simpleFileInfo.AbsolutePath=path
  652. gobalFolderFileMap[path]=simpleFileInfo
  653. return nil
  654. }
  655. return nil
  656. }
  657. /**
  658. 本地文件是否存在
  659. */
  660. func fileExist(path string) bool {
  661. _, err := os.Lstat(path)
  662. return !os.IsNotExist(err)
  663. }
  664. /**
  665. 获取本地文件列表
  666. */
  667. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  668. //getLocalFileListDir := fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  669. log.Println("切换文件列表:"+getLocalFileListDir)
  670. keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
  671. //定期校验缓存的本地文件状态
  672. go func() {
  673. for true {
  674. time.Sleep(time.Duration(5)*time.Minute)
  675. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  676. if err != nil {
  677. log.Println(err)
  678. continue
  679. }
  680. if dataMap!=nil && len(dataMap)>0{
  681. for k,_ := range dataMap {
  682. if !fileExist(config.LocalWorkSpaceDir+k){
  683. err = etcdclient.DeleteWithPrefix(k)
  684. if err != nil {
  685. log.Println(err)
  686. }
  687. }
  688. }
  689. }
  690. }
  691. }()
  692. //优先etcd查询
  693. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  694. if err != nil {
  695. log.Println(err)
  696. return err
  697. }
  698. if dataMap==nil || len(dataMap)==0{
  699. // 不存在则初始化进etcd
  700. err =filepath.Walk(getLocalFileListDir,walkfunc)
  701. //路径错误
  702. if err != nil {
  703. log.Println(err)
  704. if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
  705. log.Println(err)
  706. return err
  707. }
  708. }
  709. mapByte,err:=json.Marshal(gobalFileMap)
  710. if err != nil {
  711. log.Println(err)
  712. return err
  713. }
  714. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  715. log.Println(err)
  716. return err
  717. }
  718. cacheMap := make(map[string] string)
  719. for k,v := range gobalFileMap {
  720. k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
  721. cacheMap[k]=v
  722. }
  723. //异步缓存
  724. //go func() {
  725. err = etcdclient.BatchAdd(cacheMap)
  726. if err != nil {
  727. log.Println(err)
  728. }
  729. //}()
  730. //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  731. //清空gobalFileMap
  732. gobalFileMap = make(map[string] string)
  733. }
  734. err=sendFileListFromEtcd(keyPrefix,projectName,conn)
  735. if err != nil {
  736. log.Println(err)
  737. return err
  738. }
  739. ch :=config.GobalWatchChannelMap[getLocalFileListDir]
  740. for actionAndModifyFilePathStr :=range ch {
  741. actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
  742. if actionAndModifyFilePath[0]=="remove"{
  743. k := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  744. queryMap,err :=etcdclient.QueryWithPrefix(k)
  745. if len(queryMap)==0{
  746. continue
  747. }
  748. err = etcdclient.DeleteWithPrefix(k)
  749. if err != nil {
  750. log.Println(err)
  751. }
  752. }else if actionAndModifyFilePath[0]=="write"{
  753. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  754. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  755. if err != nil {
  756. log.Println(err)
  757. continue
  758. }
  759. if len(querymap)==0{
  760. continue
  761. }
  762. oldValue := strings.Split(querymap[queryKey],";")
  763. newValue := oldValue[0]+";" +"1"
  764. err = etcdclient.ReplaceInto(queryKey,newValue)
  765. if err!=nil{
  766. log.Println(err)
  767. continue
  768. }
  769. }else if actionAndModifyFilePath[0]=="create"{
  770. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  771. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  772. if err != nil {
  773. log.Println(err)
  774. continue
  775. }
  776. if len(querymap)==0{
  777. continue
  778. }
  779. oldValue := strings.Split(querymap[queryKey],";")
  780. newValue := oldValue[0]+";" +"1"
  781. err = etcdclient.ReplaceInto(queryKey,newValue)
  782. if err!=nil{
  783. log.Println(err)
  784. continue
  785. }
  786. }
  787. err = sendFileListFromEtcd(keyPrefix,projectName,conn)
  788. if err != nil {
  789. log.Println(err)
  790. return err
  791. }
  792. }
  793. return nil
  794. }
  795. func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
  796. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  797. if err != nil {
  798. log.Println(err)
  799. return err
  800. }
  801. if dataMap!=nil && len(dataMap)>0{
  802. cacheMap := make(map[string] string)
  803. for k,v := range dataMap {
  804. //历史数据加默认值
  805. if len(strings.Split(v, ";"))==1{
  806. v=v+";0"
  807. }
  808. cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
  809. }
  810. mapByte,err:=json.Marshal(cacheMap)
  811. if err != nil {
  812. log.Println(err)
  813. return err
  814. }
  815. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  816. log.Println(err)
  817. return err
  818. }
  819. return nil
  820. }
  821. return nil
  822. }
  823. /**
  824. 打开方式
  825. */
  826. func OpenFileWith(filePath string) error{
  827. //判断文件有效性
  828. _,err := os.Stat(filePath)
  829. if err!=nil{
  830. return err
  831. }
  832. //filePath = strings.Replace(filePath," ","~1",1)
  833. cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
  834. err =cmd.Run()
  835. if err!=nil{
  836. log.Println(err)
  837. return err
  838. }
  839. return nil
  840. }
  841. /**
  842. 手动检查软件更新
  843. 0:不强制更新
  844. 1:强制更新
  845. */
  846. func CheckForUpdates(forceUpdate string) error{
  847. tszdir :=os.Getenv("TSZDIR")
  848. //空格路径处理
  849. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
  850. //判断文件有效性
  851. _,err := os.Stat(tszdir+config.UpdaterName)
  852. if err!=nil{
  853. return err
  854. }
  855. cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
  856. err =cmd.Run()
  857. if err!=nil{
  858. log.Println(err)
  859. return err
  860. }
  861. cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
  862. err =cmd.Run()
  863. if err!=nil{
  864. log.Println(err)
  865. return err
  866. }
  867. //检测到更新 kill所有客户端进程
  868. log.Println("close all process")
  869. cmd = exec.Command("cmd.exe","/c",ipfsPath);
  870. err =cmd.Run()
  871. if err!=nil{
  872. log.Println(err)
  873. return err
  874. }
  875. return nil
  876. }
  877. func walkfunc(filePath string, info os.FileInfo, err error) error {
  878. if info == nil{
  879. return nil
  880. }
  881. if info.IsDir()==false{
  882. //历史文件不扫描
  883. if path.Ext(filePath)==".commit" {
  884. return nil
  885. }
  886. sh := shell.NewShell(config.GobalIpfsUrl)
  887. file,err :=os.Open(filePath)
  888. if err != nil{
  889. log.Println(err)
  890. return err
  891. }
  892. defer file.Close()
  893. hash,err :=sh.Add(file)
  894. if err != nil {
  895. log.Println(err)
  896. return err
  897. }
  898. dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(getLocalFileListDir+"\\"),"",1)
  899. gobalFileMap[dir]=hash
  900. }
  901. return nil
  902. }
  903. /**
  904. 查询历史文件
  905. path 文件路径
  906. hash 历史版本文件hash
  907. */
  908. func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
  909. result := make(map[int] *commitHistory)
  910. //校验文件路径
  911. _,err :=os.Stat(filePath)
  912. if err != nil {
  913. log.Println("文件 "+filePath+"not found")
  914. return nil,errors.New("参数错误!")
  915. }
  916. if len(hash) == 0 {
  917. return result,nil
  918. }
  919. //根据hash更新文件
  920. localSh := shell.NewShell(config.GobalIpfsUrl)
  921. //连接失败判断
  922. localSh.SetTimeout(5*time.Second)
  923. ext :=path.Ext(filePath)
  924. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  925. os.Remove(commitFilePath)
  926. err = localSh.Get(hash,commitFilePath)
  927. if err!=nil {
  928. log.Println("文件"+hash+"下载失败")
  929. return result,errors.New("历史文件获取失败,请稍后重试")
  930. }
  931. //设置文件隐藏属性
  932. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  933. err =attribCmd.Run()
  934. if err != nil {
  935. log.Println("设置文件隐藏属性失败")
  936. return result,err
  937. }
  938. //解析历史版本文件
  939. contentByte,err := ioutil.ReadFile(commitFilePath)
  940. content := string(contentByte)
  941. if content==""{
  942. return result,nil
  943. }
  944. rows :=strings.Split(content,"\n")
  945. length := len(rows)
  946. var index int = 0
  947. for i:=length-2;i>=0;i--{
  948. columns := strings.Split(rows[i],"\t")
  949. commitHistoryInstance := new(commitHistory)
  950. commitHistoryInstance.ParentHash =columns[0]
  951. commitHistoryInstance.CurrentHash=columns[1]
  952. commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
  953. commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
  954. commitHistoryInstance.Creator=columns[2]
  955. commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
  956. commitHistoryInstance.Note = columns[4]
  957. result[index]=commitHistoryInstance
  958. index++
  959. }
  960. return result,nil
  961. }
  962. /**
  963. 设定某个历史版本为里程碑版本
  964. @param filePath 文件绝对路径
  965. @param commitHistoryHash 历史版本管理文件hash
  966. @param hash 文件hash
  967. @param milestone 是否是里程碑
  968. */
  969. func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
  970. //result := make(map[int] *commitHistory)
  971. //校验文件路径
  972. _,err :=os.Stat(filePath)
  973. if err != nil {
  974. log.Println("文件 "+filePath+"not found")
  975. return "",errors.New("参数错误!")
  976. }
  977. if len(commitHistoryHash) == 0 {
  978. log.Println("参数hash must not empty")
  979. return "",errors.New("参数错误!")
  980. }
  981. //根据hash更新文件
  982. localSh := shell.NewShell(config.GobalIpfsUrl)
  983. //连接失败判断
  984. localSh.SetTimeout(5*time.Second)
  985. ext :=path.Ext(filePath)
  986. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  987. os.Remove(commitFilePath)
  988. err = localSh.Get(commitHistoryHash,commitFilePath)
  989. if err!=nil {
  990. log.Println("文件"+commitHistoryHash+"下载失败")
  991. return "",errors.New("历史文件获取失败,请稍后重试")
  992. }
  993. //设置文件隐藏属性
  994. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  995. err =attribCmd.Run()
  996. if err != nil {
  997. log.Println("设置文件隐藏属性失败")
  998. return "",err
  999. }
  1000. //解析历史版本文件
  1001. contentByte,err := ioutil.ReadFile(commitFilePath)
  1002. content := string(contentByte)
  1003. if content==""{
  1004. return "",nil
  1005. }
  1006. rows :=strings.Split(content,"\n")
  1007. length := len(rows)
  1008. resultString :=""
  1009. for i:=0;i<length-1;i++{
  1010. columns := strings.Split(rows[i],"\t")
  1011. if columns[1]==hash{
  1012. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
  1013. continue
  1014. }
  1015. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
  1016. }
  1017. os.Remove(commitFilePath)
  1018. fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
  1019. w := bufio.NewWriter(fw)
  1020. w.WriteString(resultString)
  1021. if err != nil {
  1022. log.Println(err)
  1023. return "",err
  1024. }
  1025. w.Flush()
  1026. fw.Close()
  1027. addFile,err :=os.Open(commitFilePath)
  1028. if err != nil{
  1029. log.Println(err)
  1030. return "",err
  1031. }
  1032. defer addFile.Close()
  1033. //add 历史管理文件
  1034. historyHash,err:=localSh.Add(addFile)
  1035. if err != nil {
  1036. log.Println("历史版本管理文件上传失败")
  1037. return "",err
  1038. }
  1039. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  1040. serverSh.SetTimeout(5*time.Second)
  1041. err = serverSh.Pin(historyHash)
  1042. if err != nil {
  1043. log.Println("历史版本管理文件备份失败")
  1044. return "",err
  1045. }
  1046. return historyHash,nil
  1047. }
  1048. /*
  1049. 提交历史
  1050. */
  1051. type commitHistory struct {
  1052. ParentHash string `json:"parentHash"`
  1053. CurrentHash string `json:"currentHash"`
  1054. Creator string `json:"creator"`
  1055. CreateTime int64 `json:"createTime"`
  1056. Note string `json:"note"`
  1057. Version int `json:"version"`
  1058. Milestone bool `json:"milestone"`
  1059. }