文件同步
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

1252 wiersze
29 KiB

  1. package handle
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "fts/config"
  8. "fts/etcdclient"
  9. "github.com/gorilla/websocket"
  10. _ "github.com/ipfs/go-ipfs-api"
  11. shell "github.com/ipfs/go-ipfs-api"
  12. "io"
  13. "io/ioutil"
  14. "log"
  15. "os"
  16. "os/exec"
  17. "path"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. var gobalLoginUserName string
  24. //key:filepath,value:hash
  25. var gobalFileMap = make(map[string] string)
  26. var gobalFileUpdateTimeMap = make(map[string] string)
  27. //var gobalFileChangeMap = make(map[string] string)
  28. var getLocalFileListDir string
  29. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  30. var ipfsPath=os.Getenv("IPFS-PATH")
  31. /**
  32. 文件上传下载进度
  33. */
  34. type processStruct struct {
  35. Size string `json:"size"`
  36. CurrentSize string `json:"currentSize"`
  37. Unit string `json:"unit"`
  38. CurrentUnit string `json:"currentUnit"`
  39. Process float64 `json:"process"`
  40. Hash string `json:"hash"`
  41. CommitHistoryHash string `json:"commitHistoryHash"`
  42. }
  43. func main() {
  44. //config.InitConfig()
  45. //InitLocalWorkSpace("320872793405132801","test1")
  46. //
  47. //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
  48. //
  49. //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
  50. //
  51. //GetLocalFileList("testOne")
  52. }
  53. /**
  54. 初始化本地工作目录
  55. @param userId 用户ID
  56. @param projectName 项目名称
  57. */
  58. func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){
  59. //空格路径处理
  60. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  61. //初始化当前登陆用户
  62. gobalLoginUserName = userName
  63. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  64. // 检查本地目录是否存在
  65. var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName
  66. _,err := os.Stat(projectPath)
  67. if err != nil {
  68. //创建文件目录
  69. os.MkdirAll(projectPath, os.ModePerm)
  70. }
  71. log.Println("切换本地工作目录至 "+projectPath)
  72. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
  73. log.Println(err)
  74. return err
  75. }
  76. return nil
  77. }
  78. func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
  79. if info == nil{
  80. return nil
  81. }
  82. if info.IsDir()==true{
  83. //config.GobalWatch.Remove(filePath)
  84. err = config.GobalWatch.Add(filePath)
  85. if err != nil {
  86. log.Println(err)
  87. return err
  88. }
  89. }
  90. return nil
  91. }
  92. /**
  93. 初始化本地客户端配置
  94. */
  95. func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
  96. //空格路径处理
  97. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  98. config.ServerIpfsUrl = ipfsApi
  99. log.Println("初始化客户端配置项IPFS—API:"+config.ServerIpfsUrl)
  100. ipfsBootstraps := strings.Split(ipfsBootstrap,";")
  101. for _, simpleBootstrap := range ipfsBootstraps {
  102. log.Println("增加引导节点:"+simpleBootstrap)
  103. cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
  104. err :=cmd.Run()
  105. if err!=nil{
  106. return err
  107. }
  108. }
  109. return nil
  110. }
  111. /**
  112. 下载指令
  113. @param hash ipfs哈希值
  114. @param projectName 项目名称
  115. @para fileName 文件名称
  116. @param dir 云文件目录
  117. */
  118. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
  119. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  120. //检查目录
  121. _,err := os.Stat(absoluteDir)
  122. if err != nil {
  123. //创建文件目录
  124. err = os.MkdirAll(absoluteDir, os.ModePerm)
  125. if err!=nil{
  126. log.Println(err)
  127. return err
  128. }
  129. }
  130. var downloading bool = false
  131. //检测文件打开状态
  132. //tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1)
  133. //
  134. //if err != nil && (!os.IsNotExist(err)) {
  135. //
  136. // log.Println("文件被占用,请关闭打开的软件")
  137. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  138. // return err
  139. // }
  140. // return err
  141. //}
  142. //defer tfile.Close()
  143. //serverSh := shell.NewShell(config.ServerIpfsUrl)
  144. ////检测引导节点是否连接成功
  145. //isUp := serverSh.IsUp()
  146. //if !isUp {
  147. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  148. // return err
  149. // }
  150. // return nil
  151. //}
  152. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
  153. progress := make(chan string,10000)
  154. var stdout, stderr []byte
  155. var errStdout, errStderr error
  156. stdoutIn, _ := cmd.StdoutPipe()
  157. stderrIn, _ := cmd.StderrPipe()
  158. cmd.Start()
  159. go func() {
  160. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  161. }()
  162. go func() {
  163. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  164. }()
  165. go func(){
  166. millSeconds := time.Now().UnixNano() / 1e6
  167. for content := range progress { // 通道关闭后会退出for range循环
  168. current :=time.Now().UnixNano() / 1e6
  169. if current-millSeconds>500{
  170. projson,err := contentToJSONByte(content)
  171. if projson==nil && err==nil{
  172. continue
  173. }
  174. if err != nil {
  175. log.Printf("json.Marshal error %s\n", err)
  176. }
  177. millSeconds = current
  178. downloading = true
  179. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  180. log.Println(err)
  181. break
  182. }
  183. }
  184. if strings.Index(content,"100.00%")!=-1{
  185. projson,err := contentToJSONByte(content)
  186. if projson==nil && err==nil{
  187. continue
  188. }
  189. if err != nil {
  190. log.Printf("json.Marshal error %s\n", err)
  191. }
  192. downloading = true
  193. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  194. panic(err)
  195. }
  196. break
  197. }
  198. }
  199. }()
  200. log.Println("下载资源连接中...")
  201. //设置30秒连接超时
  202. go func() {
  203. index :=0
  204. for true{
  205. index++
  206. if downloading==true{
  207. return
  208. }
  209. time.Sleep(time.Duration(1)*time.Second)
  210. if downloading==false && index==30{
  211. err = cmd.Process.Kill()
  212. log.Println("进程连接超时30s已被Kill")
  213. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  214. return
  215. }
  216. return
  217. }
  218. }
  219. }()
  220. err = cmd.Wait()
  221. if err != nil {
  222. log.Printf("cmd.Run() failed with %s\n", err)
  223. }
  224. if errStdout != nil || errStderr != nil {
  225. log.Printf("failed to capture stdout or stderr\n")
  226. }
  227. outStr := string(stdout)
  228. log.Printf("out:%s", outStr)
  229. //TODO test 更新数据库hash
  230. time.Sleep(200*time.Millisecond)
  231. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  232. err = etcdclient.ReplaceInto(key,hash)
  233. if err != nil {
  234. log.Println(err)
  235. return err
  236. }
  237. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  238. if err==nil{
  239. log.Println("下载成功")
  240. }
  241. //time.Sleep(time.Duration(6)*time.Second)
  242. defer close(progress)
  243. return nil
  244. }
  245. func contentToJSONByte(content string) ([]byte,error){
  246. sts :=strings.Split(content," ")
  247. if len(sts)<8{
  248. log.Println("字符长度小于8")
  249. return nil,nil
  250. }
  251. var processFloat float64
  252. if (len(sts)==9 || len(sts)==8){
  253. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  254. }else{
  255. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  256. }
  257. if processFloat==0{
  258. //log.Println("当前进度0")
  259. return nil,nil
  260. }
  261. pro :=&processStruct{
  262. Size:sts[4],
  263. CurrentSize: sts[1],
  264. Unit: sts[2],
  265. CurrentUnit: sts[5],
  266. Process: processFloat,
  267. Hash: "",
  268. }
  269. projson,err :=json.Marshal(pro)
  270. return projson,err
  271. }
  272. /**
  273. 上传本地文件
  274. @param absolutePath 文件本地绝对路径
  275. @param fileName 文件名称
  276. @param projectName 项目名称
  277. @param dir 云文件目录
  278. */
  279. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
  280. //本地拷贝文件
  281. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  282. //检查目录
  283. _,err := os.Stat(absoluteDir)
  284. if err != nil {
  285. //创建文件目录
  286. err = os.MkdirAll(absoluteDir, os.ModePerm)
  287. if err!=nil{
  288. return err
  289. }
  290. }
  291. //检测文件打开状态
  292. ///*tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  293. //if err != nil {
  294. // log.Println("文件被占用,请关闭打开的软件")
  295. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  296. // return err
  297. // }
  298. // return err
  299. //}
  300. //defer tfile.Close()*/
  301. serverSh := shell.NewShell(config.ServerIpfsUrl)
  302. //serverSh.SetTimeout(time.Duration(30)*time.Second)
  303. log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  304. //检测引导节点是否连接成功
  305. isUp := serverSh.IsUp()
  306. if !isUp {
  307. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  308. return err
  309. }
  310. return nil
  311. }
  312. var uploading bool=false
  313. cmd := exec.Command(ipfsPath, "add",absolutePath)
  314. uploadProgress := make(chan string,10000)
  315. var stdout, stderr []byte
  316. var errStdout, errStderr error
  317. stdoutIn, _ := cmd.StdoutPipe()
  318. stderrIn, _ := cmd.StderrPipe()
  319. cmd.Start()
  320. go func() {
  321. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  322. }()
  323. go func() {
  324. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  325. }()
  326. go func(){
  327. millSeconds := time.Now().UnixNano() / 1e6
  328. for content := range uploadProgress { // 通道关闭后会退出for range循环
  329. current :=time.Now().UnixNano() / 1e6
  330. if current-millSeconds>500{
  331. projson,err := contentToJSONByte(content)
  332. if projson==nil && err==nil{
  333. continue
  334. }
  335. if err != nil {
  336. log.Println("json.Marshal error %s\n", err)
  337. }
  338. uploading=true
  339. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  340. break
  341. }
  342. millSeconds = current
  343. }
  344. if strings.Index(content,"100.00%")!=-1{
  345. projson,err := contentToJSONByte(content)
  346. if projson==nil && err==nil{
  347. continue
  348. }
  349. if err != nil {
  350. log.Println("json.Marshal error %s\n", err)
  351. }
  352. uploading=true
  353. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  354. panic(err)
  355. }
  356. break
  357. }
  358. }
  359. }()
  360. log.Println("文件上传中...")
  361. //设置30秒连接超时
  362. go func() {
  363. index :=0
  364. for true{
  365. index++
  366. if uploading==true{
  367. return
  368. }
  369. time.Sleep(time.Duration(1)*time.Second)
  370. if uploading==false && index==30{
  371. err = cmd.Process.Kill()
  372. log.Println("进程连接超时30s已被Kill")
  373. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  374. return
  375. }
  376. return
  377. }
  378. }
  379. }()
  380. err = cmd.Wait()
  381. if err != nil {
  382. log.Println("cmd.Run() failed with %s\n", err)
  383. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  384. return err
  385. }
  386. return err
  387. }
  388. if errStdout != nil || errStderr != nil {
  389. log.Println("failed to capture stdout or stderr\n")
  390. }
  391. outStr := string(stdout)
  392. log.Printf("out:%s", outStr)
  393. /*sh.Get(hash,absoluteDir+fileName)
  394. if err != nil {
  395. return "",err
  396. }*/
  397. defer close(uploadProgress)
  398. prog := new(processStruct)
  399. prog.Hash=strings.Split(outStr," ")[1]
  400. prog.Process=100.00
  401. sh := shell.NewShell(config.GobalIpfsUrl)
  402. //sh.SetTimeout(time.Duration(30)*time.Second)
  403. objectStat,err :=sh.ObjectStat(prog.Hash)
  404. if err != nil {
  405. log.Println(err)
  406. return err
  407. }
  408. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  409. //
  410. cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash)
  411. err = cmd.Run()
  412. if err != nil {
  413. log.Println(err)
  414. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  415. return err
  416. }
  417. return err
  418. }
  419. //serverSh.SetTimeout(time.Duration(600)*time.Second)
  420. err = serverSh.Pin(prog.Hash)
  421. if err != nil {
  422. log.Println("引导节点备份失败")
  423. log.Println(err)
  424. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  425. return err
  426. }
  427. return err
  428. }
  429. log.Println("引导节点文件备份成功")
  430. //文件不存在则进行本地文件夹拷贝
  431. if !fileExist(fmt.Sprint((absoluteDir+"\\"+fileName))) {
  432. err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
  433. if err != nil {
  434. log.Println(err)
  435. return err
  436. }
  437. }
  438. //记录历史
  439. filenameall := path.Base(fileName)
  440. filesuffix := path.Ext(fileName)
  441. fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
  442. commitFilePath := absoluteDir+"\\"+fileprefix+".commit"
  443. commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,prog.Hash,note,creator,milestone)
  444. if err != nil {
  445. log.Println("历史文件写入失败")
  446. log.Println(err)
  447. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  448. return err
  449. }
  450. return err
  451. }
  452. prog.CommitHistoryHash=commitHistoryHash
  453. projson,err :=json.Marshal(prog)
  454. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  455. log.Println(err)
  456. return err
  457. }
  458. //更新数据库hash
  459. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  460. err = etcdclient.ReplaceInto(key,prog.Hash+";0")
  461. if err != nil {
  462. log.Println(err)
  463. return err
  464. }
  465. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  466. log.Println("上传成功")
  467. return nil
  468. }
  469. /**
  470. 记录提交记录
  471. */
  472. func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (string,error){
  473. commitHistory := new(commitHistory)
  474. //历史文件不存在则创建
  475. localSh :=shell.NewShell(config.GobalIpfsUrl)
  476. localSh.SetTimeout(5*time.Second)
  477. if len(currentHistoryHash)!=0 {
  478. os.Remove(path)
  479. err := localSh.Get(currentHistoryHash,path)
  480. if err != nil {
  481. log.Println("历史版本管理文件下载失败")
  482. return "",err
  483. }
  484. }
  485. //初始化历史管理文件
  486. exist := fileExist(path)
  487. if !exist {
  488. commitFile,err := os.Create(path)
  489. if err != nil {
  490. log.Println("历史版本管理文件创建失败")
  491. return "",err
  492. }
  493. commitFile.Close()
  494. }
  495. //设置文件隐藏属性
  496. attribCmd :=exec.Command("attrib","+h",path)
  497. err :=attribCmd.Run()
  498. if err != nil {
  499. log.Println("设置文件隐藏属性失败")
  500. return "",err
  501. }
  502. //读取历史管理文件
  503. content ,err :=ioutil.ReadFile(path)
  504. if len(content)!=0{
  505. rows :=strings.Split(string(content),"\n")
  506. endRow :=rows[len(rows)-2]
  507. columns :=strings.Split(endRow,"\t")
  508. commitHistory.Version,_=strconv.Atoi(columns[6])
  509. commitHistory.Version++
  510. commitHistory.ParentHash=columns[1]
  511. }else{
  512. commitHistory.Version=1
  513. commitHistory.ParentHash="0000000000000000000000000000000000000000"
  514. }
  515. commitHistory.CurrentHash=hash
  516. commitHistory.Milestone = milestone
  517. commitHistory.Creator = creator
  518. commitHistory.Note = note
  519. commitHistory.CreateTime=time.Now().Unix()
  520. if commitHistory.ParentHash==commitHistory.CurrentHash{
  521. return commitHistory.CurrentHash,nil
  522. }
  523. file,err :=os.OpenFile(path,os.O_APPEND,0666)
  524. if err != nil{
  525. log.Println(err)
  526. return "",err
  527. }
  528. //写入历史管理文件
  529. w :=bufio.NewWriter(file)
  530. writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
  531. fmt.Fprintln(w,writeContent)
  532. err = w.Flush()
  533. if err != nil {
  534. log.Println("历史版本管理文件写入失败")
  535. return "",err
  536. }
  537. file.Close()
  538. addFile,err :=os.Open(path)
  539. if err != nil{
  540. log.Println(err)
  541. return "",err
  542. }
  543. defer addFile.Close()
  544. //add 历史管理文件
  545. historyHash,err:=localSh.Add(addFile)
  546. if err != nil {
  547. log.Println("历史版本管理文件上传失败")
  548. return "",err
  549. }
  550. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  551. serverSh.SetTimeout(5*time.Second)
  552. err = serverSh.Pin(historyHash)
  553. if err != nil {
  554. log.Println("历史版本管理文件备份失败")
  555. return "",err
  556. }
  557. return historyHash,nil
  558. }
  559. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  560. var out []byte
  561. buf := make([]byte, 1024, 1024)
  562. for {
  563. n, err := r.Read(buf[:])
  564. if n > 0 {
  565. d := buf[:n]
  566. out = append(out, d...)
  567. progress <- string(d)
  568. }
  569. if err != nil {
  570. // Read returns io.EOF at the end of file, which is not an error for us
  571. if err == io.EOF {
  572. err = nil
  573. }
  574. return out, err
  575. }
  576. }
  577. // never reached
  578. panic(true)
  579. return nil, nil
  580. }
  581. /**
  582. 单个文件信息
  583. */
  584. type simpleFileInfo struct {
  585. Name string `json:"name" `
  586. Extension string `json:"extension"`
  587. RelativePath string `json:"relativePath"`
  588. AbsolutePath string `json:"absolutePath"`
  589. }
  590. var gobalFolderFileMap map[string] *simpleFileInfo
  591. var gobalRelativePath string
  592. /**
  593. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  594. @param id 文件id
  595. */
  596. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  597. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  598. fileInfo,err :=os.Stat(absolutePath)
  599. if err!=nil{
  600. log.Println(err)
  601. return err
  602. }
  603. log.Println(filepath.Dir(absolutePath))
  604. //单个文件处理
  605. if !fileInfo.IsDir() {
  606. simpleFileInfo := new(simpleFileInfo)
  607. simpleFileInfo.Name=fileInfo.Name()
  608. simpleFileInfo.Extension=path.Ext(absolutePath)
  609. simpleFileInfo.RelativePath=""
  610. simpleFileInfo.AbsolutePath=absolutePath
  611. gobalFolderFileMap[absolutePath]=simpleFileInfo
  612. bytes,err :=json.Marshal(gobalFolderFileMap)
  613. if err != nil {
  614. log.Println(err)
  615. return err
  616. }
  617. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  618. log.Println(err)
  619. return err
  620. }
  621. return nil
  622. }
  623. //文件目录处理
  624. gobalRelativePath = filepath.Dir(absolutePath)
  625. err =filepath.Walk(absolutePath, myWalkfunc)
  626. if err != nil {
  627. log.Println(err)
  628. return err
  629. }
  630. bytes,err :=json.Marshal(gobalFolderFileMap)
  631. if err != nil {
  632. log.Println(err)
  633. return err
  634. }
  635. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  636. log.Println(err)
  637. return err
  638. }
  639. return nil
  640. }
  641. func myWalkfunc(path string, info os.FileInfo, err error) error {
  642. if info.IsDir()==false{
  643. simpleFileInfo := new(simpleFileInfo)
  644. simpleFileInfo.Name=info.Name()
  645. simpleFileInfo.Extension=filepath.Ext(path)
  646. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  647. simpleFileInfo.AbsolutePath=path
  648. gobalFolderFileMap[path]=simpleFileInfo
  649. return nil
  650. }
  651. return nil
  652. }
  653. /**
  654. 本地文件是否存在
  655. */
  656. func fileExist(path string) bool {
  657. _, err := os.Lstat(path)
  658. return !os.IsNotExist(err)
  659. }
  660. /**
  661. 获取本地文件列表
  662. */
  663. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  664. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  665. projectPath := getLocalFileListDir
  666. log.Println("切换文件列表:"+projectPath)
  667. keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
  668. //添加监控
  669. err := filepath.Walk(projectPath,watchWalkfunc)
  670. if err != nil {
  671. log.Println(err)
  672. return err
  673. }
  674. //初始化通道
  675. if config.GobalWatchChannelMap[projectPath] != nil {
  676. close(config.GobalWatchChannelMap[projectPath])
  677. }
  678. config.GobalWatchChannelMap[projectPath]=make(chan string,100)
  679. log.Println(projectPath+"添加文件监控")
  680. log.Println(config.GobalWatchChannelMap[projectPath])
  681. //定期校验缓存的本地文件状态
  682. dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix)
  683. if err != nil {
  684. log.Println(err)
  685. }
  686. if dataMapa!=nil && len(dataMapa)>0{
  687. for k,_ := range dataMapa {
  688. if !fileExist(config.LocalWorkSpaceDir+k){
  689. err = etcdclient.DeleteWithPrefix(k)
  690. if err != nil {
  691. log.Println(err)
  692. }
  693. }
  694. }
  695. }
  696. //优先etcd查询
  697. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  698. if err != nil {
  699. log.Println(err)
  700. return err
  701. }
  702. if dataMap==nil || len(dataMap)==0{
  703. // 不存在则初始化进etcd
  704. err =filepath.Walk(getLocalFileListDir,walkfunc)
  705. //路径错误
  706. if err != nil {
  707. log.Println(err)
  708. if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
  709. log.Println(err)
  710. return err
  711. }
  712. }
  713. mapByte,err:=json.Marshal(gobalFileMap)
  714. if err != nil {
  715. log.Println(err)
  716. return err
  717. }
  718. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  719. log.Println(err)
  720. return err
  721. }
  722. cacheMap := make(map[string] string)
  723. for k,v := range gobalFileMap {
  724. k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
  725. cacheMap[k]=v
  726. }
  727. //异步缓存
  728. //go func() {
  729. err = etcdclient.BatchAdd(cacheMap)
  730. if err != nil {
  731. log.Println(err)
  732. }
  733. //}()
  734. //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  735. //清空gobalFileMap
  736. gobalFileMap = make(map[string] string)
  737. }
  738. err=sendFileListFromEtcd(keyPrefix,projectName,conn)
  739. if err != nil {
  740. log.Println(err)
  741. return err
  742. }
  743. //ch :=config.GobalWatchChannelMap[getLocalFileListDir]
  744. log.Println(config.GobalWatchChannelMap[getLocalFileListDir])
  745. for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[getLocalFileListDir] {
  746. log.Println(actionAndModifyFilePathStr)
  747. actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
  748. if actionAndModifyFilePath[0]=="remove"{
  749. k := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  750. queryMap,err :=etcdclient.QueryWithPrefix(k)
  751. if len(queryMap)==0{
  752. continue
  753. }
  754. err = etcdclient.DeleteWithPrefix(k)
  755. if err != nil {
  756. log.Println(err)
  757. }
  758. }else if actionAndModifyFilePath[0]=="write"{
  759. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  760. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  761. if err != nil {
  762. log.Println(err)
  763. continue
  764. }
  765. if len(querymap)==0{
  766. continue
  767. }
  768. oldValue := strings.Split(querymap[queryKey],";")
  769. newValue := oldValue[0]+";" +"1"
  770. err = etcdclient.ReplaceInto(queryKey,newValue)
  771. if err!=nil{
  772. log.Println(err)
  773. continue
  774. }
  775. }else if actionAndModifyFilePath[0]=="create"{
  776. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  777. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  778. if err != nil {
  779. log.Println(err)
  780. continue
  781. }
  782. if len(querymap)==0{
  783. continue
  784. }
  785. oldValue := strings.Split(querymap[queryKey],";")
  786. newValue := oldValue[0]+";" +"1"
  787. err = etcdclient.ReplaceInto(queryKey,newValue)
  788. if err!=nil{
  789. log.Println(err)
  790. continue
  791. }
  792. }
  793. err = sendFileListFromEtcd(keyPrefix,projectName,conn)
  794. if err != nil {
  795. log.Println(err)
  796. return err
  797. }
  798. }
  799. return nil
  800. }
  801. func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
  802. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  803. if err != nil {
  804. log.Println(err)
  805. return err
  806. }
  807. if dataMap!=nil && len(dataMap)>0{
  808. cacheMap := make(map[string] string)
  809. for k,v := range dataMap {
  810. //历史数据加默认值
  811. if len(strings.Split(v, ";"))==1{
  812. v=v+";0"
  813. }
  814. cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
  815. }
  816. mapByte,err:=json.Marshal(cacheMap)
  817. if err != nil {
  818. log.Println(err)
  819. return err
  820. }
  821. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  822. log.Println(err)
  823. return err
  824. }
  825. return nil
  826. }else{
  827. log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName)
  828. }
  829. return nil
  830. }
  831. /**
  832. 打开方式
  833. */
  834. func OpenFileWith(filePath string) error{
  835. //判断文件有效性
  836. _,err := os.Stat(filePath)
  837. if err!=nil{
  838. return err
  839. }
  840. //filePath = strings.Replace(filePath," ","~1",1)
  841. cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
  842. err =cmd.Run()
  843. if err!=nil{
  844. log.Println(err)
  845. return err
  846. }
  847. return nil
  848. }
  849. /**
  850. 手动检查软件更新
  851. 0:不强制更新
  852. 1:强制更新
  853. */
  854. func CheckForUpdates(forceUpdate string) error{
  855. tszdir :=os.Getenv("TSZDIR")
  856. //空格路径处理
  857. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
  858. //判断文件有效性
  859. _,err := os.Stat(tszdir+config.UpdaterName)
  860. if err!=nil{
  861. return err
  862. }
  863. cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
  864. err =cmd.Run()
  865. if err!=nil{
  866. log.Println(err)
  867. return err
  868. }
  869. cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
  870. err =cmd.Run()
  871. if err!=nil{
  872. log.Println(err)
  873. return err
  874. }
  875. //检测到更新 kill所有客户端进程
  876. log.Println("close all process")
  877. cmd = exec.Command("cmd.exe","/c",ipfsPath);
  878. err =cmd.Run()
  879. if err!=nil{
  880. log.Println(err)
  881. return err
  882. }
  883. return nil
  884. }
  885. func walkfunc(filePath string, info os.FileInfo, err error) error {
  886. if info == nil{
  887. return nil
  888. }
  889. if info.IsDir()==false{
  890. //历史文件不扫描
  891. if path.Ext(filePath)==".commit" {
  892. return nil
  893. }
  894. sh := shell.NewShell(config.GobalIpfsUrl)
  895. file,err :=os.Open(filePath)
  896. if err != nil{
  897. log.Println(err)
  898. return err
  899. }
  900. defer file.Close()
  901. hash,err :=sh.Add(file)
  902. if err != nil {
  903. log.Println(err)
  904. return err
  905. }
  906. dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(getLocalFileListDir+"\\"),"",1)
  907. gobalFileMap[dir]=hash
  908. }
  909. return nil
  910. }
  911. /**
  912. 查询历史文件
  913. path 文件路径
  914. hash 历史版本文件hash
  915. */
  916. func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
  917. result := make(map[int] *commitHistory)
  918. //校验文件路径
  919. _,err :=os.Stat(filePath)
  920. if err != nil {
  921. log.Println("文件 "+filePath+"not found")
  922. return nil,errors.New("参数错误!")
  923. }
  924. if len(hash) == 0 {
  925. return result,nil
  926. }
  927. //根据hash更新文件
  928. localSh := shell.NewShell(config.GobalIpfsUrl)
  929. //连接失败判断
  930. localSh.SetTimeout(5*time.Second)
  931. ext :=path.Ext(filePath)
  932. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  933. os.Remove(commitFilePath)
  934. err = localSh.Get(hash,commitFilePath)
  935. if err!=nil {
  936. log.Println("文件"+hash+"下载失败")
  937. return result,errors.New("历史文件获取失败,请稍后重试")
  938. }
  939. //设置文件隐藏属性
  940. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  941. err =attribCmd.Run()
  942. if err != nil {
  943. log.Println("设置文件隐藏属性失败")
  944. return result,err
  945. }
  946. //解析历史版本文件
  947. contentByte,err := ioutil.ReadFile(commitFilePath)
  948. content := string(contentByte)
  949. if content==""{
  950. return result,nil
  951. }
  952. rows :=strings.Split(content,"\n")
  953. length := len(rows)
  954. var index int = 0
  955. for i:=length-2;i>=0;i--{
  956. columns := strings.Split(rows[i],"\t")
  957. commitHistoryInstance := new(commitHistory)
  958. commitHistoryInstance.ParentHash =columns[0]
  959. commitHistoryInstance.CurrentHash=columns[1]
  960. commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
  961. commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
  962. commitHistoryInstance.Creator=columns[2]
  963. commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
  964. commitHistoryInstance.Note = columns[4]
  965. result[index]=commitHistoryInstance
  966. index++
  967. }
  968. return result,nil
  969. }
  970. /**
  971. 设定某个历史版本为里程碑版本
  972. @param filePath 文件绝对路径
  973. @param commitHistoryHash 历史版本管理文件hash
  974. @param hash 文件hash
  975. @param milestone 是否是里程碑
  976. */
  977. func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
  978. //result := make(map[int] *commitHistory)
  979. //校验文件路径
  980. _,err :=os.Stat(filePath)
  981. if err != nil {
  982. log.Println("文件 "+filePath+"not found")
  983. return "",errors.New("参数错误!")
  984. }
  985. if len(commitHistoryHash) == 0 {
  986. log.Println("参数hash must not empty")
  987. return "",errors.New("参数错误!")
  988. }
  989. //根据hash更新文件
  990. localSh := shell.NewShell(config.GobalIpfsUrl)
  991. //连接失败判断
  992. localSh.SetTimeout(5*time.Second)
  993. ext :=path.Ext(filePath)
  994. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  995. os.Remove(commitFilePath)
  996. err = localSh.Get(commitHistoryHash,commitFilePath)
  997. if err!=nil {
  998. log.Println("文件"+commitHistoryHash+"下载失败")
  999. return "",errors.New("历史文件获取失败,请稍后重试")
  1000. }
  1001. //设置文件隐藏属性
  1002. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1003. err =attribCmd.Run()
  1004. if err != nil {
  1005. log.Println("设置文件隐藏属性失败")
  1006. return "",err
  1007. }
  1008. //解析历史版本文件
  1009. contentByte,err := ioutil.ReadFile(commitFilePath)
  1010. content := string(contentByte)
  1011. if content==""{
  1012. return "",nil
  1013. }
  1014. rows :=strings.Split(content,"\n")
  1015. length := len(rows)
  1016. resultString :=""
  1017. for i:=0;i<length-1;i++{
  1018. columns := strings.Split(rows[i],"\t")
  1019. if columns[1]==hash{
  1020. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
  1021. continue
  1022. }
  1023. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
  1024. }
  1025. os.Remove(commitFilePath)
  1026. fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
  1027. w := bufio.NewWriter(fw)
  1028. w.WriteString(resultString)
  1029. if err != nil {
  1030. log.Println(err)
  1031. return "",err
  1032. }
  1033. w.Flush()
  1034. fw.Close()
  1035. addFile,err :=os.Open(commitFilePath)
  1036. if err != nil{
  1037. log.Println(err)
  1038. return "",err
  1039. }
  1040. defer addFile.Close()
  1041. //add 历史管理文件
  1042. historyHash,err:=localSh.Add(addFile)
  1043. if err != nil {
  1044. log.Println("历史版本管理文件上传失败")
  1045. return "",err
  1046. }
  1047. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  1048. serverSh.SetTimeout(5*time.Second)
  1049. err = serverSh.Pin(historyHash)
  1050. if err != nil {
  1051. log.Println("历史版本管理文件备份失败")
  1052. return "",err
  1053. }
  1054. return historyHash,nil
  1055. }
  1056. /*
  1057. 提交历史
  1058. */
  1059. type commitHistory struct {
  1060. ParentHash string `json:"parentHash"`
  1061. CurrentHash string `json:"currentHash"`
  1062. Creator string `json:"creator"`
  1063. CreateTime int64 `json:"createTime"`
  1064. Note string `json:"note"`
  1065. Version int `json:"version"`
  1066. Milestone bool `json:"milestone"`
  1067. }