文件同步
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1294 line
30 KiB

  1. package handle
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "fts/config"
  9. "fts/etcdclient"
  10. "github.com/gorilla/websocket"
  11. _ "github.com/ipfs/go-ipfs-api"
  12. shell "github.com/ipfs/go-ipfs-api"
  13. "io"
  14. "io/ioutil"
  15. "log"
  16. "os"
  17. "os/exec"
  18. "path"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "time"
  23. )
  24. var gobalLoginUserName string
  25. //key:filepath,value:hash
  26. var gobalFileMap = make(map[string] string)
  27. var gobalFileUpdateTimeMap = make(map[string] string)
  28. //var gobalFileChangeMap = make(map[string] string)
  29. var getLocalFileListDir string
  30. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  31. var ipfsPath=os.Getenv("IPFS-PATH")
  32. /**
  33. 文件上传下载进度
  34. */
  35. type processStruct struct {
  36. Size string `json:"size"`
  37. CurrentSize string `json:"currentSize"`
  38. Unit string `json:"unit"`
  39. CurrentUnit string `json:"currentUnit"`
  40. Process float64 `json:"process"`
  41. Hash string `json:"hash"`
  42. CommitHistoryHash string `json:"commitHistoryHash"`
  43. }
  44. func main() {
  45. //config.InitConfig()
  46. //InitLocalWorkSpace("320872793405132801","test1")
  47. //
  48. //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
  49. //
  50. //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
  51. //
  52. //GetLocalFileList("testOne")
  53. }
  54. /**
  55. 初始化本地工作目录
  56. @param userId 用户ID
  57. @param projectName 项目名称
  58. */
  59. func InitLocalWorkSpace(conn *websocket.Conn,userName,projectName string) (error){
  60. //空格路径处理
  61. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  62. //初始化当前登陆用户
  63. gobalLoginUserName = userName
  64. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  65. // 检查本地目录是否存在
  66. var projectPath = config.LocalWorkSpaceDir+userName+"\\"+projectName
  67. _,err := os.Stat(projectPath)
  68. if err != nil {
  69. //创建文件目录
  70. os.MkdirAll(projectPath, os.ModePerm)
  71. }
  72. log.Println("切换本地工作目录至 "+projectPath)
  73. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
  74. log.Println(err)
  75. return err
  76. }
  77. return nil
  78. }
  79. func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
  80. if info == nil{
  81. return nil
  82. }
  83. if info.IsDir()==true{
  84. //config.GobalWatch.Remove(filePath)
  85. err = config.GobalWatch.Add(filePath)
  86. if err != nil {
  87. log.Println(err)
  88. return err
  89. }
  90. }
  91. return nil
  92. }
  93. /**
  94. 初始化本地客户端配置
  95. */
  96. func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
  97. //空格路径处理
  98. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  99. config.ServerIpfsUrl = ipfsApi
  100. log.Println("初始化客户端配置项IPFS—API:"+config.ServerIpfsUrl)
  101. ipfsBootstraps := strings.Split(ipfsBootstrap,";")
  102. for _, simpleBootstrap := range ipfsBootstraps {
  103. log.Println("增加引导节点:"+simpleBootstrap)
  104. cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
  105. err :=cmd.Run()
  106. if err!=nil{
  107. return err
  108. }
  109. }
  110. return nil
  111. }
  112. /**
  113. 下载指令
  114. @param hash ipfs哈希值
  115. @param projectName 项目名称
  116. @para fileName 文件名称
  117. @param dir 云文件目录
  118. */
  119. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
  120. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  121. //检查目录
  122. _,err := os.Stat(absoluteDir)
  123. if err != nil {
  124. //创建文件目录
  125. err = os.MkdirAll(absoluteDir, os.ModePerm)
  126. if err!=nil{
  127. log.Println(err)
  128. return err
  129. }
  130. }
  131. var downloading bool = false
  132. //检测文件打开状态
  133. //tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1)
  134. //
  135. //if err != nil && (!os.IsNotExist(err)) {
  136. //
  137. // log.Println("文件被占用,请关闭打开的软件")
  138. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  139. // return err
  140. // }
  141. // return err
  142. //}
  143. //defer tfile.Close()
  144. //serverSh := shell.NewShell(config.ServerIpfsUrl)
  145. ////检测引导节点是否连接成功
  146. //isUp := serverSh.IsUp()
  147. //if !isUp {
  148. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  149. // return err
  150. // }
  151. // return nil
  152. //}
  153. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
  154. progress := make(chan string,10000)
  155. var stdout, stderr []byte
  156. var errStdout, errStderr error
  157. stdoutIn, _ := cmd.StdoutPipe()
  158. stderrIn, _ := cmd.StderrPipe()
  159. cmd.Start()
  160. go func() {
  161. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  162. }()
  163. go func() {
  164. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  165. }()
  166. go func(){
  167. millSeconds := time.Now().UnixNano() / 1e6
  168. for content := range progress { // 通道关闭后会退出for range循环
  169. current :=time.Now().UnixNano() / 1e6
  170. if current-millSeconds>500{
  171. projson,err := contentToJSONByte(content)
  172. if projson==nil && err==nil{
  173. continue
  174. }
  175. if err != nil {
  176. log.Printf("json.Marshal error %s\n", err)
  177. }
  178. millSeconds = current
  179. downloading = true
  180. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  181. log.Println(err)
  182. break
  183. }
  184. }
  185. if strings.Index(content,"100.00%")!=-1{
  186. projson,err := contentToJSONByte(content)
  187. if projson==nil && err==nil{
  188. continue
  189. }
  190. if err != nil {
  191. log.Printf("json.Marshal error %s\n", err)
  192. }
  193. downloading = true
  194. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  195. panic(err)
  196. }
  197. break
  198. }
  199. }
  200. }()
  201. log.Println("下载资源连接中...")
  202. //设置30秒连接超时
  203. go func() {
  204. index :=0
  205. for true{
  206. index++
  207. if downloading==true{
  208. return
  209. }
  210. time.Sleep(time.Duration(1)*time.Second)
  211. if downloading==false && index==30{
  212. err = cmd.Process.Kill()
  213. log.Println("进程连接超时30s已被Kill")
  214. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  215. return
  216. }
  217. return
  218. }
  219. }
  220. }()
  221. err = cmd.Wait()
  222. if err != nil {
  223. log.Printf("cmd.Run() failed with %s\n", err)
  224. }
  225. if errStdout != nil || errStderr != nil {
  226. log.Printf("failed to capture stdout or stderr\n")
  227. }
  228. outStr := string(stdout)
  229. log.Printf("out:%s", outStr)
  230. //TODO test 更新数据库hash
  231. time.Sleep(200*time.Millisecond)
  232. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  233. err = etcdclient.ReplaceInto(key,hash)
  234. if err != nil {
  235. log.Println(err)
  236. return err
  237. }
  238. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  239. if err==nil{
  240. log.Println("下载成功")
  241. }
  242. //time.Sleep(time.Duration(6)*time.Second)
  243. defer close(progress)
  244. return nil
  245. }
  246. func contentToJSONByte(content string) ([]byte,error){
  247. sts :=strings.Split(content," ")
  248. if len(sts)<8{
  249. log.Println("字符长度小于8")
  250. return nil,nil
  251. }
  252. var processFloat float64
  253. if (len(sts)==9 || len(sts)==8){
  254. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  255. }else{
  256. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  257. }
  258. if processFloat==0{
  259. //log.Println("当前进度0")
  260. return nil,nil
  261. }
  262. pro :=&processStruct{
  263. Size:sts[4],
  264. CurrentSize: sts[1],
  265. Unit: sts[2],
  266. CurrentUnit: sts[5],
  267. Process: processFloat,
  268. Hash: "",
  269. }
  270. projson,err :=json.Marshal(pro)
  271. return projson,err
  272. }
  273. /**
  274. 上传本地文件
  275. @param absolutePath 文件本地绝对路径
  276. @param fileName 文件名称
  277. @param projectName 项目名称
  278. @param dir 云文件目录
  279. */
  280. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
  281. //本地拷贝文件
  282. absoluteDir := config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName+"\\"+dir
  283. //检查目录
  284. _,err := os.Stat(absoluteDir)
  285. if err != nil {
  286. //创建文件目录
  287. err = os.MkdirAll(absoluteDir, os.ModePerm)
  288. if err!=nil{
  289. return err
  290. }
  291. }
  292. //检测文件打开状态
  293. ///*tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  294. //if err != nil {
  295. // log.Println("文件被占用,请关闭打开的软件")
  296. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  297. // return err
  298. // }
  299. // return err
  300. //}
  301. //defer tfile.Close()*/
  302. serverSh := shell.NewShell(config.ServerIpfsUrl)
  303. //serverSh.SetTimeout(time.Duration(30)*time.Second)
  304. log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  305. //检测引导节点是否连接成功
  306. isUp := serverSh.IsUp()
  307. if !isUp {
  308. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  309. return err
  310. }
  311. return nil
  312. }
  313. var uploading bool=false
  314. cmd := exec.Command(ipfsPath, "add",absolutePath)
  315. uploadProgress := make(chan string,10000)
  316. var stdout, stderr []byte
  317. var errStdout, errStderr error
  318. stdoutIn, _ := cmd.StdoutPipe()
  319. stderrIn, _ := cmd.StderrPipe()
  320. cmd.Start()
  321. go func() {
  322. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  323. }()
  324. go func() {
  325. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  326. }()
  327. go func(){
  328. first := true
  329. millSeconds := time.Now().UnixNano() / 1e6
  330. current :=time.Now().UnixNano() / 1e6
  331. for content := range uploadProgress { // 通道关闭后会退出for range循环
  332. if first {
  333. projson,err := contentToJSONByte(content)
  334. if projson==nil && err==nil{
  335. continue
  336. }
  337. if err != nil {
  338. log.Println("json.Marshal error %s\n", err)
  339. }
  340. uploading=true
  341. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  342. break
  343. }
  344. millSeconds = current
  345. first=false
  346. }
  347. if current-millSeconds>500{
  348. projson,err := contentToJSONByte(content)
  349. if projson==nil && err==nil{
  350. continue
  351. }
  352. if err != nil {
  353. log.Println("json.Marshal error %s\n", err)
  354. }
  355. uploading=true
  356. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  357. break
  358. }
  359. millSeconds = current
  360. }
  361. if (strings.Index(content,"90.00%")!=-1){
  362. projson,err := contentToJSONByte(content)
  363. if projson==nil && err==nil{
  364. continue
  365. }
  366. if err != nil {
  367. log.Println("json.Marshal error %s\n", err)
  368. }
  369. uploading=true
  370. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  371. panic(err)
  372. }
  373. break
  374. }
  375. }
  376. }()
  377. log.Println("文件上传中...")
  378. //设置30秒连接超时
  379. go func() {
  380. index :=0
  381. for true{
  382. index++
  383. if uploading==true{
  384. return
  385. }
  386. time.Sleep(time.Duration(1)*time.Second)
  387. if uploading==false && index==30{
  388. err = cmd.Process.Kill()
  389. log.Println("进程连接超时30s已被Kill")
  390. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  391. return
  392. }
  393. return
  394. }
  395. }
  396. }()
  397. err = cmd.Wait()
  398. if err != nil {
  399. log.Println("cmd.Run() failed with %s\n", err)
  400. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  401. return err
  402. }
  403. return err
  404. }
  405. if errStdout != nil || errStderr != nil {
  406. log.Println("failed to capture stdout or stderr\n")
  407. }
  408. outStr := string(stdout)
  409. log.Printf("out:%s", outStr)
  410. /*sh.Get(hash,absoluteDir+fileName)
  411. if err != nil {
  412. return "",err
  413. }*/
  414. defer close(uploadProgress)
  415. prog := new(processStruct)
  416. prog.Hash=strings.Split(outStr," ")[1]
  417. prog.Process=100.00
  418. sh := shell.NewShell(config.GobalIpfsUrl)
  419. //sh.SetTimeout(time.Duration(30)*time.Second)
  420. objectStat,err :=sh.ObjectStat(prog.Hash)
  421. if err != nil {
  422. log.Println(err)
  423. return err
  424. }
  425. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  426. //
  427. cmd = exec.Command(ipfsPath,"dht","provide",prog.Hash)
  428. err = cmd.Run()
  429. if err != nil {
  430. log.Println(err)
  431. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  432. return err
  433. }
  434. return err
  435. }
  436. //serverSh.SetTimeout(time.Duration(600)*time.Second)
  437. //判断备份节点对等节点是否已添加连接
  438. idOut,err :=sh.ID()
  439. localId :=idOut.ID
  440. swarmConnInfos,err :=serverSh.SwarmPeers(context.Background())
  441. hasConnect := false
  442. for _,swarmconn := range swarmConnInfos.Peers {
  443. if swarmconn.Peer==localId{
  444. hasConnect=true
  445. break
  446. }
  447. }
  448. if !hasConnect{
  449. log.Println("中继处理")
  450. swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
  451. errT := serverSh.SwarmConnect(context.Background(),swarmConnectAddr)
  452. if errT!=nil{
  453. log.Println("中继失败,引导节点备份失败")
  454. log.Println(err)
  455. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  456. return err
  457. }
  458. }
  459. }
  460. err = serverSh.Pin(prog.Hash)
  461. if err!=nil{
  462. log.Println("引导节点备份失败")
  463. log.Println(err)
  464. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  465. return err
  466. }
  467. }
  468. log.Println("引导节点文件备份成功")
  469. //文件不存在则进行本地文件夹拷贝
  470. if !fileExist(fmt.Sprint((absoluteDir+"\\"+fileName))) {
  471. err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
  472. if err != nil {
  473. log.Println(err)
  474. return err
  475. }
  476. }
  477. //记录历史
  478. filenameall := path.Base(fileName)
  479. filesuffix := path.Ext(fileName)
  480. fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
  481. commitFilePath := absoluteDir+"\\"+fileprefix+".commit"
  482. commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,prog.Hash,note,creator,milestone)
  483. if err != nil {
  484. log.Println("历史文件写入失败")
  485. log.Println(err)
  486. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  487. return err
  488. }
  489. return err
  490. }
  491. prog.CommitHistoryHash=commitHistoryHash
  492. projson,err :=json.Marshal(prog)
  493. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  494. log.Println(err)
  495. return err
  496. }
  497. //更新数据库hash
  498. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  499. err = etcdclient.ReplaceInto(key,prog.Hash+";0")
  500. if err != nil {
  501. log.Println(err)
  502. return err
  503. }
  504. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  505. log.Println("上传成功")
  506. return nil
  507. }
  508. /**
  509. 记录提交记录
  510. */
  511. func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (string,error){
  512. commitHistory := new(commitHistory)
  513. //历史文件不存在则创建
  514. localSh :=shell.NewShell(config.GobalIpfsUrl)
  515. localSh.SetTimeout(5*time.Second)
  516. if len(currentHistoryHash)!=0 {
  517. os.Remove(path)
  518. err := localSh.Get(currentHistoryHash,path)
  519. if err != nil {
  520. log.Println("历史版本管理文件下载失败")
  521. return "",err
  522. }
  523. }
  524. //初始化历史管理文件
  525. exist := fileExist(path)
  526. if !exist {
  527. commitFile,err := os.Create(path)
  528. if err != nil {
  529. log.Println("历史版本管理文件创建失败")
  530. return "",err
  531. }
  532. commitFile.Close()
  533. }
  534. //设置文件隐藏属性
  535. attribCmd :=exec.Command("attrib","+h",path)
  536. err :=attribCmd.Run()
  537. if err != nil {
  538. log.Println("设置文件隐藏属性失败")
  539. return "",err
  540. }
  541. //读取历史管理文件
  542. content ,err :=ioutil.ReadFile(path)
  543. if len(content)!=0{
  544. rows :=strings.Split(string(content),"\n")
  545. endRow :=rows[len(rows)-2]
  546. columns :=strings.Split(endRow,"\t")
  547. commitHistory.Version,_=strconv.Atoi(columns[6])
  548. commitHistory.Version++
  549. commitHistory.ParentHash=columns[1]
  550. }else{
  551. commitHistory.Version=1
  552. commitHistory.ParentHash="0000000000000000000000000000000000000000"
  553. }
  554. commitHistory.CurrentHash=hash
  555. commitHistory.Milestone = milestone
  556. commitHistory.Creator = creator
  557. commitHistory.Note = note
  558. commitHistory.CreateTime=time.Now().Unix()
  559. if commitHistory.ParentHash==commitHistory.CurrentHash{
  560. return commitHistory.CurrentHash,nil
  561. }
  562. file,err :=os.OpenFile(path,os.O_APPEND,0666)
  563. if err != nil{
  564. log.Println(err)
  565. return "",err
  566. }
  567. //写入历史管理文件
  568. w :=bufio.NewWriter(file)
  569. writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
  570. fmt.Fprintln(w,writeContent)
  571. err = w.Flush()
  572. if err != nil {
  573. log.Println("历史版本管理文件写入失败")
  574. return "",err
  575. }
  576. file.Close()
  577. addFile,err :=os.Open(path)
  578. if err != nil{
  579. log.Println(err)
  580. return "",err
  581. }
  582. defer addFile.Close()
  583. //add 历史管理文件
  584. historyHash,err:=localSh.Add(addFile)
  585. if err != nil {
  586. log.Println("历史版本管理文件上传失败")
  587. return "",err
  588. }
  589. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  590. serverSh.SetTimeout(60*time.Second)
  591. err = serverSh.Pin(historyHash)
  592. if err != nil {
  593. log.Println("历史版本管理文件备份失败")
  594. return "",err
  595. }
  596. return historyHash,nil
  597. }
  598. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  599. var out []byte
  600. buf := make([]byte, 1024, 1024)
  601. for {
  602. n, err := r.Read(buf[:])
  603. if n > 0 {
  604. d := buf[:n]
  605. out = append(out, d...)
  606. progress <- string(d)
  607. }
  608. if err != nil {
  609. // Read returns io.EOF at the end of file, which is not an error for us
  610. if err == io.EOF {
  611. err = nil
  612. }
  613. return out, err
  614. }
  615. }
  616. // never reached
  617. panic(true)
  618. return nil, nil
  619. }
  620. /**
  621. 单个文件信息
  622. */
  623. type simpleFileInfo struct {
  624. Name string `json:"name" `
  625. Extension string `json:"extension"`
  626. RelativePath string `json:"relativePath"`
  627. AbsolutePath string `json:"absolutePath"`
  628. }
  629. var gobalFolderFileMap map[string] *simpleFileInfo
  630. var gobalRelativePath string
  631. /**
  632. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  633. @param id 文件id
  634. */
  635. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  636. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  637. fileInfo,err :=os.Stat(absolutePath)
  638. if err!=nil{
  639. log.Println(err)
  640. return err
  641. }
  642. log.Println(filepath.Dir(absolutePath))
  643. //单个文件处理
  644. if !fileInfo.IsDir() {
  645. simpleFileInfo := new(simpleFileInfo)
  646. simpleFileInfo.Name=fileInfo.Name()
  647. simpleFileInfo.Extension=path.Ext(absolutePath)
  648. simpleFileInfo.RelativePath=""
  649. simpleFileInfo.AbsolutePath=absolutePath
  650. gobalFolderFileMap[absolutePath]=simpleFileInfo
  651. bytes,err :=json.Marshal(gobalFolderFileMap)
  652. if err != nil {
  653. log.Println(err)
  654. return err
  655. }
  656. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  657. log.Println(err)
  658. return err
  659. }
  660. return nil
  661. }
  662. //文件目录处理
  663. gobalRelativePath = filepath.Dir(absolutePath)
  664. err =filepath.Walk(absolutePath, myWalkfunc)
  665. if err != nil {
  666. log.Println(err)
  667. return err
  668. }
  669. bytes,err :=json.Marshal(gobalFolderFileMap)
  670. if err != nil {
  671. log.Println(err)
  672. return err
  673. }
  674. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  675. log.Println(err)
  676. return err
  677. }
  678. return nil
  679. }
  680. func myWalkfunc(path string, info os.FileInfo, err error) error {
  681. if info.IsDir()==false{
  682. simpleFileInfo := new(simpleFileInfo)
  683. simpleFileInfo.Name=info.Name()
  684. simpleFileInfo.Extension=filepath.Ext(path)
  685. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  686. simpleFileInfo.AbsolutePath=path
  687. gobalFolderFileMap[path]=simpleFileInfo
  688. return nil
  689. }
  690. return nil
  691. }
  692. /**
  693. 本地文件是否存在
  694. */
  695. func fileExist(path string) bool {
  696. _, err := os.Lstat(path)
  697. return !os.IsNotExist(err)
  698. }
  699. /**
  700. 获取本地文件列表
  701. */
  702. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  703. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  704. projectPath := getLocalFileListDir
  705. log.Println("切换文件列表:"+projectPath)
  706. keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
  707. //添加监控
  708. err := filepath.Walk(projectPath,watchWalkfunc)
  709. if err != nil {
  710. log.Println(err)
  711. return err
  712. }
  713. //初始化通道
  714. if config.GobalWatchChannelMap[projectPath] != nil {
  715. close(config.GobalWatchChannelMap[projectPath])
  716. }
  717. config.GobalWatchChannelMap[projectPath]=make(chan string,100)
  718. log.Println(projectPath+"添加文件监控")
  719. log.Println(config.GobalWatchChannelMap[projectPath])
  720. //定期校验缓存的本地文件状态
  721. dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix)
  722. if err != nil {
  723. log.Println(err)
  724. }
  725. if dataMapa!=nil && len(dataMapa)>0{
  726. for k,_ := range dataMapa {
  727. if !fileExist(config.LocalWorkSpaceDir+k){
  728. err = etcdclient.DeleteWithPrefix(k)
  729. if err != nil {
  730. log.Println(err)
  731. }
  732. }
  733. }
  734. }
  735. //优先etcd查询
  736. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  737. if err != nil {
  738. log.Println(err)
  739. return err
  740. }
  741. if dataMap==nil || len(dataMap)==0{
  742. // 不存在则初始化进etcd
  743. err =filepath.Walk(getLocalFileListDir,walkfunc)
  744. //路径错误
  745. if err != nil {
  746. log.Println(err)
  747. if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
  748. log.Println(err)
  749. return err
  750. }
  751. }
  752. mapByte,err:=json.Marshal(gobalFileMap)
  753. if err != nil {
  754. log.Println(err)
  755. return err
  756. }
  757. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  758. log.Println(err)
  759. return err
  760. }
  761. cacheMap := make(map[string] string)
  762. for k,v := range gobalFileMap {
  763. k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
  764. cacheMap[k]=v
  765. }
  766. //异步缓存
  767. //go func() {
  768. err = etcdclient.BatchAdd(cacheMap)
  769. if err != nil {
  770. log.Println(err)
  771. }
  772. //}()
  773. //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  774. //清空gobalFileMap
  775. gobalFileMap = make(map[string] string)
  776. }
  777. err=sendFileListFromEtcd(keyPrefix,projectName,conn)
  778. if err != nil {
  779. log.Println(err)
  780. return err
  781. }
  782. //ch :=config.GobalWatchChannelMap[getLocalFileListDir]
  783. log.Println(config.GobalWatchChannelMap[getLocalFileListDir])
  784. for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[getLocalFileListDir] {
  785. log.Println(actionAndModifyFilePathStr)
  786. actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
  787. if actionAndModifyFilePath[0]=="remove"{
  788. k := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  789. queryMap,err :=etcdclient.QueryWithPrefix(k)
  790. if len(queryMap)==0{
  791. continue
  792. }
  793. err = etcdclient.DeleteWithPrefix(k)
  794. if err != nil {
  795. log.Println(err)
  796. }
  797. }else if actionAndModifyFilePath[0]=="write"{
  798. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  799. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  800. if err != nil {
  801. log.Println(err)
  802. continue
  803. }
  804. if len(querymap)==0{
  805. continue
  806. }
  807. oldValue := strings.Split(querymap[queryKey],";")
  808. newValue := oldValue[0]+";" +"1"
  809. err = etcdclient.ReplaceInto(queryKey,newValue)
  810. if err!=nil{
  811. log.Println(err)
  812. continue
  813. }
  814. }else if actionAndModifyFilePath[0]=="create"{
  815. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  816. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  817. if err != nil {
  818. log.Println(err)
  819. continue
  820. }
  821. if len(querymap)==0{
  822. continue
  823. }
  824. oldValue := strings.Split(querymap[queryKey],";")
  825. newValue := oldValue[0]+";" +"1"
  826. err = etcdclient.ReplaceInto(queryKey,newValue)
  827. if err!=nil{
  828. log.Println(err)
  829. continue
  830. }
  831. }
  832. err = sendFileListFromEtcd(keyPrefix,projectName,conn)
  833. if err != nil {
  834. log.Println(err)
  835. return err
  836. }
  837. }
  838. return nil
  839. }
  840. func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
  841. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  842. if err != nil {
  843. log.Println(err)
  844. return err
  845. }
  846. if dataMap!=nil && len(dataMap)>0{
  847. cacheMap := make(map[string] string)
  848. for k,v := range dataMap {
  849. //历史数据加默认值
  850. if len(strings.Split(v, ";"))==1{
  851. v=v+";0"
  852. }
  853. cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
  854. }
  855. mapByte,err:=json.Marshal(cacheMap)
  856. if err != nil {
  857. log.Println(err)
  858. return err
  859. }
  860. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  861. log.Println(err)
  862. return err
  863. }
  864. return nil
  865. }else{
  866. log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName)
  867. }
  868. return nil
  869. }
  870. /**
  871. 打开方式
  872. */
  873. func OpenFileWith(filePath string) error{
  874. //判断文件有效性
  875. _,err := os.Stat(filePath)
  876. if err!=nil{
  877. return err
  878. }
  879. //filePath = strings.Replace(filePath," ","~1",1)
  880. cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
  881. err =cmd.Run()
  882. if err!=nil{
  883. log.Println(err)
  884. return err
  885. }
  886. return nil
  887. }
  888. /**
  889. 手动检查软件更新
  890. 0:不强制更新
  891. 1:强制更新
  892. */
  893. func CheckForUpdates(forceUpdate string) error{
  894. tszdir :=os.Getenv("TSZDIR")
  895. //空格路径处理
  896. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
  897. //判断文件有效性
  898. _,err := os.Stat(tszdir+config.UpdaterName)
  899. if err!=nil{
  900. return err
  901. }
  902. cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
  903. err =cmd.Run()
  904. if err!=nil{
  905. log.Println(err)
  906. return err
  907. }
  908. cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
  909. err =cmd.Run()
  910. if err!=nil{
  911. log.Println(err)
  912. return err
  913. }
  914. //检测到更新 kill所有客户端进程
  915. log.Println("close all process")
  916. cmd = exec.Command("cmd.exe","/c",ipfsPath);
  917. err =cmd.Run()
  918. if err!=nil{
  919. log.Println(err)
  920. return err
  921. }
  922. return nil
  923. }
  924. func walkfunc(filePath string, info os.FileInfo, err error) error {
  925. if info == nil{
  926. return nil
  927. }
  928. if info.IsDir()==false{
  929. //历史文件不扫描
  930. if path.Ext(filePath)==".commit" {
  931. return nil
  932. }
  933. sh := shell.NewShell(config.GobalIpfsUrl)
  934. file,err :=os.Open(filePath)
  935. if err != nil{
  936. log.Println(err)
  937. return err
  938. }
  939. defer file.Close()
  940. hash,err :=sh.Add(file)
  941. if err != nil {
  942. log.Println(err)
  943. return err
  944. }
  945. dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(getLocalFileListDir+"\\"),"",1)
  946. gobalFileMap[dir]=hash
  947. }
  948. return nil
  949. }
  950. /**
  951. 查询历史文件
  952. path 文件路径
  953. hash 历史版本文件hash
  954. */
  955. func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
  956. result := make(map[int] *commitHistory)
  957. //校验文件路径
  958. _,err :=os.Stat(filePath)
  959. if err != nil {
  960. log.Println("文件 "+filePath+"not found")
  961. return nil,errors.New("参数错误!")
  962. }
  963. if len(hash) == 0 {
  964. return result,nil
  965. }
  966. //根据hash更新文件
  967. localSh := shell.NewShell(config.GobalIpfsUrl)
  968. //连接失败判断
  969. localSh.SetTimeout(5*time.Second)
  970. ext :=path.Ext(filePath)
  971. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  972. os.Remove(commitFilePath)
  973. err = localSh.Get(hash,commitFilePath)
  974. if err!=nil {
  975. log.Println("文件"+hash+"下载失败")
  976. return result,errors.New("历史文件获取失败,请稍后重试")
  977. }
  978. //设置文件隐藏属性
  979. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  980. err =attribCmd.Run()
  981. if err != nil {
  982. log.Println("设置文件隐藏属性失败")
  983. return result,err
  984. }
  985. //解析历史版本文件
  986. contentByte,err := ioutil.ReadFile(commitFilePath)
  987. content := string(contentByte)
  988. if content==""{
  989. return result,nil
  990. }
  991. rows :=strings.Split(content,"\n")
  992. length := len(rows)
  993. var index int = 0
  994. for i:=length-2;i>=0;i--{
  995. columns := strings.Split(rows[i],"\t")
  996. commitHistoryInstance := new(commitHistory)
  997. commitHistoryInstance.ParentHash =columns[0]
  998. commitHistoryInstance.CurrentHash=columns[1]
  999. commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
  1000. commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
  1001. commitHistoryInstance.Creator=columns[2]
  1002. commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
  1003. commitHistoryInstance.Note = columns[4]
  1004. result[index]=commitHistoryInstance
  1005. index++
  1006. }
  1007. return result,nil
  1008. }
  1009. /**
  1010. 设定某个历史版本为里程碑版本
  1011. @param filePath 文件绝对路径
  1012. @param commitHistoryHash 历史版本管理文件hash
  1013. @param hash 文件hash
  1014. @param milestone 是否是里程碑
  1015. */
  1016. func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
  1017. //result := make(map[int] *commitHistory)
  1018. //校验文件路径
  1019. _,err :=os.Stat(filePath)
  1020. if err != nil {
  1021. log.Println("文件 "+filePath+"not found")
  1022. return "",errors.New("参数错误!")
  1023. }
  1024. if len(commitHistoryHash) == 0 {
  1025. log.Println("参数hash must not empty")
  1026. return "",errors.New("参数错误!")
  1027. }
  1028. //根据hash更新文件
  1029. localSh := shell.NewShell(config.GobalIpfsUrl)
  1030. //连接失败判断
  1031. localSh.SetTimeout(5*time.Second)
  1032. ext :=path.Ext(filePath)
  1033. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  1034. os.Remove(commitFilePath)
  1035. err = localSh.Get(commitHistoryHash,commitFilePath)
  1036. if err!=nil {
  1037. log.Println("文件"+commitHistoryHash+"下载失败")
  1038. return "",errors.New("历史文件获取失败,请稍后重试")
  1039. }
  1040. //设置文件隐藏属性
  1041. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1042. err =attribCmd.Run()
  1043. if err != nil {
  1044. log.Println("设置文件隐藏属性失败")
  1045. return "",err
  1046. }
  1047. //解析历史版本文件
  1048. contentByte,err := ioutil.ReadFile(commitFilePath)
  1049. content := string(contentByte)
  1050. if content==""{
  1051. return "",nil
  1052. }
  1053. rows :=strings.Split(content,"\n")
  1054. length := len(rows)
  1055. resultString :=""
  1056. for i:=0;i<length-1;i++{
  1057. columns := strings.Split(rows[i],"\t")
  1058. if columns[1]==hash{
  1059. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
  1060. continue
  1061. }
  1062. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
  1063. }
  1064. os.Remove(commitFilePath)
  1065. fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
  1066. w := bufio.NewWriter(fw)
  1067. w.WriteString(resultString)
  1068. if err != nil {
  1069. log.Println(err)
  1070. return "",err
  1071. }
  1072. w.Flush()
  1073. fw.Close()
  1074. addFile,err :=os.Open(commitFilePath)
  1075. if err != nil{
  1076. log.Println(err)
  1077. return "",err
  1078. }
  1079. defer addFile.Close()
  1080. //add 历史管理文件
  1081. historyHash,err:=localSh.Add(addFile)
  1082. if err != nil {
  1083. log.Println("历史版本管理文件上传失败")
  1084. return "",err
  1085. }
  1086. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  1087. serverSh.SetTimeout(5*time.Second)
  1088. err = serverSh.Pin(historyHash)
  1089. if err != nil {
  1090. log.Println("历史版本管理文件备份失败")
  1091. return "",err
  1092. }
  1093. return historyHash,nil
  1094. }
  1095. /*
  1096. 提交历史
  1097. */
  1098. type commitHistory struct {
  1099. ParentHash string `json:"parentHash"`
  1100. CurrentHash string `json:"currentHash"`
  1101. Creator string `json:"creator"`
  1102. CreateTime int64 `json:"createTime"`
  1103. Note string `json:"note"`
  1104. Version int `json:"version"`
  1105. Milestone bool `json:"milestone"`
  1106. }