文件同步
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

1638 行
41 KiB

  1. package handle
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "fts/config"
  11. "fts/etcdclient"
  12. "fts/nsqclient"
  13. "github.com/gorilla/websocket"
  14. _ "github.com/ipfs/go-ipfs-api"
  15. shell "github.com/ipfs/go-ipfs-api"
  16. "io"
  17. "io/ioutil"
  18. "log"
  19. "net/http"
  20. "os"
  21. "os/exec"
  22. "path"
  23. "path/filepath"
  24. "strconv"
  25. "strings"
  26. "time"
  27. )
  28. //登陆账号
  29. var gobalLoginUserName string
  30. //登陆账号Id
  31. var gobalLoginUserId string
  32. //key:filepath,value:hash
  33. var gobalFileMap = make(map[string] string)
  34. var gobalFileDownLoadingMap = make(map[string] int)
  35. //手动上传文件,非自动上传
  36. var goabalAddFileMap = make(map[string] int)
  37. //本地项目空间目录
  38. var gobalLocalProjectDir string
  39. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  40. //全局消息通知管道
  41. var GobalMessageNotify = make(chan string,1000)
  42. var ipfsPath=os.Getenv("IPFS-PATH")
  43. /**
  44. 文件上传下载进度
  45. */
  46. type processStruct struct {
  47. Size string `json:"size"`
  48. CurrentSize string `json:"currentSize"`
  49. Unit string `json:"unit"`
  50. CurrentUnit string `json:"currentUnit"`
  51. Process float64 `json:"process"`
  52. Hash string `json:"hash"`
  53. CommitHistoryHash string `json:"commitHistoryHash"`
  54. Version int `json:"version"`
  55. }
  56. /**
  57. 初始化本地客户端配置,包括ipfs网关、引导节点
  58. @param ipfsApi ipfs网关 例如:http://192.168.1.1:5001
  59. @param ipfsBootstrap ipfs引导节点,多个用;分割 例如:/dns/www.lockingos.org/tcp/4001/p2p/12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP
  60. */
  61. func InitClientConfig(ipfsApi,ipfsBootstrap string) error{
  62. //空格路径处理
  63. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  64. config.ServerIpfsUrl = ipfsApi
  65. log.Println("配置客户端网关:"+config.ServerIpfsUrl)
  66. ipfsBootstraps := strings.Split(ipfsBootstrap,";")
  67. for _, simpleBootstrap := range ipfsBootstraps {
  68. log.Println("配置引导节点:"+simpleBootstrap)
  69. cmd := exec.Command(ipfsPath,"bootstrap", "add", simpleBootstrap)
  70. err :=cmd.Run()
  71. if err!=nil{
  72. return err
  73. }
  74. }
  75. return nil
  76. }
  77. /**
  78. 初始化本地工作目录
  79. @param userName 用户登陆账号
  80. @param userId 用户ID
  81. @param projectName 项目名称
  82. */
  83. func InitLocalWorkSpace(conn *websocket.Conn, userName, userId, projectName string) (error){
  84. //空格路径处理
  85. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  86. //初始化当前登陆用户信息
  87. gobalLoginUserName = userName
  88. gobalLoginUserId = userId
  89. //初始化本地工作空间绝对路径
  90. gobalLocalProjectDir = fmt.Sprint(config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName)
  91. // 检查本地目录是否存在
  92. _,err := os.Stat(gobalLocalProjectDir)
  93. if err != nil {
  94. //创建文件目录
  95. os.MkdirAll(gobalLocalProjectDir, os.ModePerm)
  96. }
  97. log.Println("进入项目空间:"+gobalLocalProjectDir)
  98. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(gobalLocalProjectDir))); err != nil {
  99. log.Println(err)
  100. return err
  101. }
  102. return nil
  103. }
  104. //工作空间增加文件监听事件
  105. func watchWalkfunc(filePath string, info os.FileInfo, err error) error {
  106. if info == nil{
  107. return nil
  108. }
  109. if info.IsDir()==true{
  110. //config.GobalWatch.Remove(filePath)
  111. err = config.GobalWatch.Add(filePath)
  112. if err != nil {
  113. log.Println(err)
  114. return err
  115. }
  116. }
  117. return nil
  118. }
  119. /**
  120. 下载指令
  121. @param hash ipfs哈希值
  122. @param projectName 项目名称
  123. @para fileName 文件名称
  124. @param dir 云文件目录
  125. */
  126. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, nodeDir string) error{
  127. //检查文件目录是否存在,不存在则创建
  128. fileDir := gobalLocalProjectDir+"\\"+nodeDir
  129. _,err := os.Stat(fileDir)
  130. if err != nil {
  131. //创建文件目录
  132. err = os.MkdirAll(fileDir, os.ModePerm)
  133. if err!=nil{
  134. log.Println(err)
  135. return err
  136. }
  137. }
  138. //下载启动标识,有下载进度则设置为true
  139. var downloading bool = false
  140. //检测文件打开状态
  141. tfile,err := os.OpenFile(fmt.Sprint(fileDir+"\\"+fileName),os.O_RDWR,1)
  142. if err != nil && (!os.IsNotExist(err)) {
  143. log.Println("文件被占用,请关闭打开的软件")
  144. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  145. return err
  146. }
  147. return err
  148. }
  149. defer tfile.Close()
  150. //正在下载标识,标识用于不监测更新改动
  151. gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=1
  152. //构建本地cmd执行 ipfs get
  153. progress := make(chan string,10000)
  154. var stdout, stderr []byte
  155. var errStdout, errStderr error
  156. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(fileDir+"\\"+fileName))
  157. stdoutIn, _ := cmd.StdoutPipe()
  158. stderrIn, _ := cmd.StderrPipe()
  159. cmd.Start()
  160. go func() {
  161. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  162. }()
  163. go func() {
  164. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  165. }()
  166. log.Println("资源连接中...")
  167. //异步定时读取进度反馈给前端,每500ms返回一次进度
  168. go func(){
  169. millSeconds := time.Now().UnixNano() / 1e6
  170. for content := range progress { // 通道关闭后会退出for range循环
  171. log.Println(">>>"+content)
  172. current :=time.Now().UnixNano() / 1e6
  173. if !downloading{
  174. log.Println("资源连接成功,下载中...")
  175. }
  176. downloading = true
  177. if current-millSeconds>500{
  178. projson,err := contentToJSONByte(content)
  179. if projson==nil && err==nil{
  180. continue
  181. }
  182. if err != nil {
  183. log.Printf("json.Marshal error %s\n", err)
  184. }
  185. //设置下载启动标识和下载时间戳
  186. millSeconds = current
  187. //反馈前端
  188. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  189. log.Println(err)
  190. break
  191. }
  192. }
  193. if strings.Index(content,"100.00%")!=-1{
  194. projson,err := contentToJSONByte(content)
  195. if projson==nil && err==nil{
  196. continue
  197. }
  198. if err != nil {
  199. log.Printf("json.Marshal error %s\n", err)
  200. }
  201. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  202. log.Println(err)
  203. }
  204. break
  205. }
  206. }
  207. }()
  208. //设置30秒连接超时,30秒未启动下载则下载失败
  209. go func() {
  210. index :=0
  211. for true{
  212. //启动下载则不做超时判断
  213. if downloading==true{
  214. return
  215. }
  216. index++
  217. time.Sleep(time.Duration(1)*time.Second)
  218. if downloading==false && index==30{
  219. err = cmd.Process.Kill()
  220. log.Println("资源连接超时(30s),下载进程被终止")
  221. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  222. return
  223. }
  224. return
  225. }
  226. }
  227. }()
  228. //等待下载执行完成
  229. err = cmd.Wait()
  230. if err != nil {
  231. log.Printf("cmd.Run() failed with %s\n", err)
  232. }
  233. if errStdout != nil || errStderr != nil {
  234. log.Printf("failed to capture stdout or stderr\n")
  235. }
  236. outStr := string(stdout)
  237. log.Printf("out:%s", outStr)
  238. //更新Etcd数据库的文件key对应hash值
  239. time.Sleep(200*time.Millisecond)
  240. key := gobalLoginUserName+"\\"+projectName+"\\"+nodeDir+"\\"+fileName
  241. err = etcdclient.ReplaceInto(key,hash+";0")
  242. if err != nil {
  243. log.Println(err)
  244. return err
  245. }
  246. //发送消息至文件变更订阅
  247. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  248. //下载完成反馈
  249. log.Printf("叮,资源文件[ %v ]下载完成",fileName)
  250. defer time.Sleep(5*time.Second);gobalFileDownLoadingMap[fmt.Sprint(fileDir+"\\"+fileName)]=0
  251. defer close(progress)
  252. return nil
  253. }
  254. //读取解析cmd返回文件上传或下载进度信息
  255. func contentToJSONByte(content string) ([]byte,error){
  256. sts :=strings.Split(content," ")
  257. if len(sts)<8{
  258. log.Println("字符长度小于8")
  259. return nil,nil
  260. }
  261. var processFloat float64
  262. if (len(sts)==9 || len(sts)==8){
  263. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  264. }else{
  265. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  266. }
  267. if processFloat==0{
  268. //log.Println("当前进度0")
  269. return nil,nil
  270. }
  271. pro :=&processStruct{
  272. Size:sts[4],
  273. CurrentSize: sts[1],
  274. Unit: sts[2],
  275. CurrentUnit: sts[5],
  276. Process: processFloat,
  277. Hash: "",
  278. }
  279. projson,err :=json.Marshal(pro)
  280. return projson,err
  281. }
  282. /**
  283. 上传本地文件
  284. @param absolutePath 文件本地绝对路径
  285. @param fileName 文件名称
  286. @param projectName 项目名称
  287. @param dir 云文件目录
  288. @param currentHistoryHash 当前文件的历史版本管理文件hash
  289. @param note 备注
  290. @param creator 创建人
  291. @param milestone 是否事里程碑
  292. */
  293. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir,currentHistoryHash,note,creator string,milestone bool) error{
  294. //本地文件目录
  295. fileDir := gobalLocalProjectDir+"\\"+dir
  296. //检查目录
  297. _,err := os.Stat(fileDir)
  298. if err != nil {
  299. //创建文件目录
  300. err = os.MkdirAll(fileDir, os.ModePerm)
  301. if err!=nil{
  302. return err
  303. }
  304. }
  305. //检测文件打开状态
  306. tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  307. if err != nil {
  308. log.Println("文件被占用,请关闭打开的软件")
  309. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  310. return err
  311. }
  312. return err
  313. }
  314. defer tfile.Close()
  315. serverSh := shell.NewShell(config.ServerIpfsUrl)
  316. //serverSh.SetTimeout(time.Duration(30)*time.Second)
  317. //log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  318. //检测引导节点是否连接成功
  319. isUp := serverSh.IsUp()
  320. if !isUp {
  321. log.Println("备份节点网络连接不通!")
  322. if conn==nil{
  323. return errors.New("备份节点连失联")
  324. }else{
  325. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  326. return err
  327. }
  328. }
  329. return nil
  330. }
  331. //上传启动标识,有上传进度则设置为true
  332. var uploading bool=false
  333. //cmd执行ipfs add
  334. cmd := exec.Command(ipfsPath, "add",absolutePath)
  335. uploadProgress := make(chan string,10000)
  336. var stdout, stderr []byte
  337. var errStdout, errStderr error
  338. stdoutIn, _ := cmd.StdoutPipe()
  339. stderrIn, _ := cmd.StderrPipe()
  340. cmd.Start()
  341. go func() {
  342. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  343. }()
  344. go func() {
  345. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  346. }()
  347. //异步给前端反馈上传进度
  348. go func(){
  349. first := true
  350. millSeconds := time.Now().UnixNano() / 1e6
  351. current :=time.Now().UnixNano() / 1e6
  352. for content := range uploadProgress { // 通道关闭后会退出for range循环
  353. current =time.Now().UnixNano() / 1e6
  354. if first {
  355. projson,err := contentToJSONByte(content)
  356. if projson==nil && err==nil{
  357. continue
  358. }
  359. if err != nil {
  360. log.Println("json.Marshal error %s\n", err)
  361. }
  362. //设置上传启动标识为true
  363. uploading=true
  364. if conn!=nil{
  365. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  366. break
  367. }
  368. }
  369. millSeconds = current
  370. first=false
  371. }
  372. if current-millSeconds>500{
  373. projson,err := contentToJSONByte(content)
  374. if projson==nil && err==nil{
  375. continue
  376. }
  377. if err != nil {
  378. log.Println("json.Marshal error %s\n", err)
  379. }
  380. uploading=true
  381. if conn!=nil{
  382. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  383. break
  384. }
  385. }
  386. millSeconds = current
  387. }
  388. if (strings.Index(content,"90.00%")!=-1){
  389. projson,err := contentToJSONByte(content)
  390. if projson==nil && err==nil{
  391. continue
  392. }
  393. if err != nil {
  394. log.Println("json.Marshal error %s\n", err)
  395. }
  396. uploading=true
  397. if conn!=nil{
  398. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  399. break
  400. }
  401. }
  402. break
  403. }
  404. }
  405. }()
  406. log.Println("资源上传中...")
  407. //上传未启动超时时间30s
  408. go func() {
  409. index :=0
  410. for true{
  411. if uploading==true{
  412. return
  413. }
  414. index++
  415. time.Sleep(time.Duration(1)*time.Second)
  416. if uploading==false && index==30{
  417. err = cmd.Process.Kill()
  418. log.Println("资源连接超时(30s),上传进程被终止")
  419. if conn!=nil{
  420. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  421. return
  422. }
  423. }
  424. return
  425. }
  426. }
  427. }()
  428. //等待执行完成
  429. err = cmd.Wait()
  430. if err != nil {
  431. log.Println("cmd.Run() failed with %s\n", err)
  432. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  433. return err
  434. }
  435. return err
  436. }
  437. if errStdout != nil || errStderr != nil {
  438. log.Println("failed to capture stdout or stderr\n")
  439. }
  440. outStr := string(stdout)
  441. fileHash := strings.Split(outStr," ")[1]
  442. log.Printf("out:%s", outStr)
  443. defer close(uploadProgress)
  444. //ipfs provide
  445. cmd = exec.Command(ipfsPath,"dht","provide",fileHash)
  446. err = cmd.Run()
  447. if err != nil {
  448. log.Println(err)
  449. if conn!=nil{
  450. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  451. return err
  452. }
  453. }
  454. return err
  455. }
  456. //判断备份节点对等节点是否已添加连接
  457. sh := shell.NewShell(config.GobalIpfsUrl)
  458. idOut,err :=sh.ID()
  459. localId :=idOut.ID
  460. swarmConnInfos,err :=serverSh.SwarmPeers(context.Background())
  461. hasConnect := false
  462. for _,swarmconn := range swarmConnInfos.Peers {
  463. if swarmconn.Peer==localId{
  464. hasConnect=true
  465. break
  466. }
  467. }
  468. if !hasConnect{
  469. log.Println("中继处理")
  470. swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
  471. errT := serverSh.SwarmConnect(context.Background(),swarmConnectAddr)
  472. if errT!=nil{
  473. log.Println("中继失败,引导节点备份失败")
  474. log.Println(err)
  475. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  476. return err
  477. }
  478. }
  479. }
  480. //后台备份
  481. go func() {
  482. err = serverSh.Pin(fileHash)
  483. if err!=nil{
  484. log.Printf("资源[ %v ]备份节点备份失败", fileName)
  485. log.Println(err)
  486. }
  487. log.Printf("资源[ %v ]备份节点备份成功", fileName)
  488. }()
  489. //文件不存在则进行本地文件夹拷贝
  490. if !fileExist(fmt.Sprint((fileDir+"\\"+fileName))) {
  491. //记录新增文件,新增文件不做post请求,前端自行post
  492. goabalAddFileMap[fileName] = 1
  493. err = sh.Get(fileHash,fmt.Sprint((fileDir+"\\"+fileName)))
  494. if err != nil {
  495. log.Println(err)
  496. return err
  497. }
  498. }
  499. //构建历史版本记录、写本地历史版本管理文件,上传至ipfs
  500. filenameall := path.Base(fileName)
  501. filesuffix := path.Ext(fileName)
  502. fileprefix := filenameall[0:len(filenameall) - len(filesuffix)]
  503. commitFilePath := fileDir+"\\"+fileprefix+".commit"
  504. commitVersion,commitHistoryHash,err := commitRecord(commitFilePath,currentHistoryHash,fileHash,note,creator,milestone)
  505. if err != nil {
  506. log.Printf("资源[ %v ]历史版本记录失败", fileName)
  507. log.Println(err)
  508. if conn!=nil{
  509. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  510. return err
  511. }
  512. }
  513. return err
  514. }
  515. //读取文件属性,构建100%进度对象
  516. objectStat,err :=sh.ObjectStat(fileHash)
  517. if err != nil {
  518. log.Println(err)
  519. return err
  520. }
  521. prog := new(processStruct)
  522. prog.Hash=fileHash
  523. prog.Process=100.00
  524. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  525. prog.CommitHistoryHash=commitHistoryHash
  526. prog.Version=commitVersion
  527. projson,err :=json.Marshal(prog)
  528. if conn!=nil{
  529. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  530. log.Println(err)
  531. return err
  532. }
  533. }
  534. //更新Etcd数据库hash
  535. key := gobalLoginUserName+"\\"+projectName+"\\"+dir+"\\"+fileName
  536. err = etcdclient.ReplaceInto(key,prog.Hash+";0")
  537. if err != nil {
  538. log.Println(err)
  539. return err
  540. }
  541. //自动协同逻辑
  542. //if conn==nil{
  543. // folderName := strings.Split(dir,"\\")[0]
  544. // var relativePath string
  545. // if len(strings.Split(dir, "\\"))==1{
  546. // relativePath = ""
  547. // }else{
  548. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  549. // }
  550. // size,_ := strconv.ParseInt(prog.Size,10,64)
  551. // err = postUpdateFile(projectName,folderName,relativePath,fileHash, fileName, commitHistoryHash, currentHistoryHash, gobalLoginUserId, size, commitVersion)
  552. // if err!=nil{
  553. // return err
  554. // }
  555. //}
  556. //发送文件至文件变更订阅
  557. config.GobalWatchChannelMap[config.LocalWorkSpaceDir+gobalLoginUserName+"\\"+projectName] <- ";"
  558. log.Printf("叮,资源文件[ %v ]上传完成",fileName)
  559. return nil
  560. }
  561. //文件信息入参
  562. type FileParam struct {
  563. Digest string `json:"digest"` //md5(ProjectName|FolderName|RelativePath|FileName|FileVersion|UserName)
  564. ProjectName string `json:"projectName"`
  565. FolderName string `json:"folderName"`
  566. RelativePath string `json:"relativePath"`
  567. IpfsCid string `json:"ipfsCid"`
  568. FileName string `json:"fileName"`
  569. FileSize int64 `json:"fileSize,string"`
  570. FileVersion int `json:"fileVersion"`
  571. UserId int64 `json:"userId,string"`
  572. HistoryCurrentIpfsCid string `json:"historyCurrentIpfsCid"`
  573. HistoryPreIpfsCid string `json:"historyPreIpfsCid"`
  574. }
  575. //获取文件的历史版本管理文件最新hash值
  576. func postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath string)(string,error){
  577. url:=config.ServerUrl+"/api/pms/sdk/queryFileHistoryCurrentCid"
  578. contentType := "application/json"
  579. fileParam :=FileParam{
  580. ProjectName: projectName,
  581. FolderName: folderName,
  582. RelativePath: relativePath,
  583. FileName: fileName,
  584. IpfsCid: "",
  585. FileSize: 0,
  586. FileVersion: 0,
  587. UserId: 0,
  588. HistoryPreIpfsCid: "",
  589. HistoryCurrentIpfsCid: "",
  590. }
  591. text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,fileParam.IpfsCid,fileName,fileParam.FileSize,fileParam.FileVersion,fileParam.UserId,fileParam.HistoryCurrentIpfsCid,fileParam.HistoryPreIpfsCid)
  592. textByte := []byte(text)
  593. md5Byte := md5.Sum(textByte)
  594. digest := fmt.Sprintf("%x", md5Byte)
  595. fileParam.Digest=digest
  596. jsonData,err :=json.Marshal(fileParam)
  597. if err!=nil{
  598. log.Printf("json序列化化错误!")
  599. return "",err
  600. }
  601. //log.Print(string(jsonData[:]))
  602. resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
  603. if err != nil {
  604. fmt.Println("post failed, err:%v\n", err)
  605. return "",err
  606. }
  607. defer resp.Body.Close()
  608. b, err := ioutil.ReadAll(resp.Body)
  609. if err != nil {
  610. fmt.Println("get resp failed,err:%v\n", err)
  611. return "",err
  612. }
  613. //log.Printf("post response:%v", string(b))
  614. result := make(map[string] interface{})
  615. err=json.Unmarshal(b,&result)
  616. if err!=nil{
  617. log.Printf("字符串%v反序列化出错", string(b[:]))
  618. }
  619. if result["Msg"].(string)==""{
  620. log.Printf("资源[ %v ]历史版本管理文件Hash:%v",fileName,result["Data"].(string))
  621. return result["Data"].(string),nil
  622. }
  623. return "",errors.New(result["Msg"].(string))
  624. }
  625. //更新文件记录
  626. func postUpdateFile(projectName, folderName, relativePath, ipfsCid, fileName, historyCurrentIpfsCid, historyPreIpfsCid,userId string, fileSize int64, fileVersion int)(err error){
  627. url:=config.ServerUrl+"/api/pms/sdk/updateFile"
  628. contentType := "application/json"
  629. intUserId,_:=strconv.ParseInt(userId,10,64)
  630. fileParam :=FileParam{
  631. ProjectName: projectName,
  632. FolderName: folderName,
  633. RelativePath: relativePath,
  634. IpfsCid: ipfsCid,
  635. FileName: fileName,
  636. FileSize: fileSize,
  637. FileVersion: fileVersion,
  638. UserId: intUserId,
  639. HistoryCurrentIpfsCid: historyCurrentIpfsCid,
  640. HistoryPreIpfsCid: historyPreIpfsCid,
  641. }
  642. text:=fmt.Sprintf("%v|%v|%v|%v|%v|%v|%v|%v|%v|%v",projectName,folderName,relativePath,ipfsCid,fileName,fileSize,fileVersion,userId,historyCurrentIpfsCid,historyPreIpfsCid)
  643. textByte := []byte(text)
  644. md5Byte := md5.Sum(textByte)
  645. digest := fmt.Sprintf("%x", md5Byte)
  646. fileParam.Digest=digest
  647. jsonData,err :=json.Marshal(fileParam)
  648. if err!=nil{
  649. log.Printf("json序列化化错误!")
  650. return err
  651. }
  652. resp, err := http.Post(url, contentType, bytes.NewReader(jsonData))
  653. if err != nil {
  654. fmt.Println("post failed, err:%v\n", err)
  655. return err
  656. }
  657. defer resp.Body.Close()
  658. b, err := ioutil.ReadAll(resp.Body)
  659. if err != nil {
  660. fmt.Println("get resp failed,err:%v\n", err)
  661. return err
  662. }
  663. //log.Printf("post response:%v", string(b))
  664. result := make(map[string] interface{})
  665. err=json.Unmarshal(b,&result)
  666. if err!=nil{
  667. log.Printf("字符串%v反序列化出错", string(b[:]))
  668. return err
  669. }
  670. log.Printf("资源[ %v ]服务记录更新成功",fileName)
  671. return nil
  672. }
  673. /**
  674. 记录提交记录
  675. */
  676. func commitRecord(path,currentHistoryHash,hash,note,creator string, milestone bool) (int,string,error){
  677. commitHistory := new(commitHistory)
  678. //历史文件不存在则创建
  679. localSh :=shell.NewShell(config.GobalIpfsUrl)
  680. localSh.SetTimeout(10*time.Second)
  681. if len(currentHistoryHash)!=0 {
  682. os.Remove(path)
  683. err := localSh.Get(currentHistoryHash,path)
  684. if err != nil {
  685. log.Println("历史版本管理文件下载失败")
  686. return -1,"",err
  687. }
  688. }
  689. //初始化历史管理文件
  690. exist := fileExist(path)
  691. if !exist {
  692. commitFile,err := os.Create(path)
  693. if err != nil {
  694. log.Println("历史版本管理文件创建失败")
  695. return -1,"",err
  696. }
  697. commitFile.Close()
  698. }
  699. //设置文件隐藏属性
  700. attribCmd :=exec.Command("attrib","+h",path)
  701. err :=attribCmd.Run()
  702. if err != nil {
  703. log.Println("设置文件隐藏属性失败")
  704. return -1,"",err
  705. }
  706. //读取历史管理文件
  707. content ,err :=ioutil.ReadFile(path)
  708. if len(content)!=0{
  709. rows :=strings.Split(string(content),"\n")
  710. endRow :=rows[len(rows)-2]
  711. columns :=strings.Split(endRow,"\t")
  712. commitHistory.Version,_=strconv.Atoi(columns[6])
  713. commitHistory.Version++
  714. commitHistory.ParentHash=columns[1]
  715. }else{
  716. commitHistory.Version=1
  717. commitHistory.ParentHash="0000000000000000000000000000000000000000"
  718. }
  719. commitHistory.CurrentHash=hash
  720. commitHistory.Milestone = milestone
  721. commitHistory.Creator = creator
  722. commitHistory.Note = note
  723. commitHistory.CreateTime=time.Now().Unix()
  724. if commitHistory.ParentHash==commitHistory.CurrentHash{
  725. if commitHistory.Version>1{
  726. commitHistory.Version--
  727. }
  728. return commitHistory.Version,currentHistoryHash,nil
  729. }
  730. file,err :=os.OpenFile(path,os.O_APPEND,0666)
  731. if err != nil{
  732. log.Println(err)
  733. return -1,"",err
  734. }
  735. //写入历史管理文件
  736. w :=bufio.NewWriter(file)
  737. writeContent:=fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t",commitHistory.ParentHash,commitHistory.CurrentHash,commitHistory.Creator,commitHistory.CreateTime,commitHistory.Note,commitHistory.Milestone,commitHistory.Version)
  738. fmt.Fprintln(w,writeContent)
  739. err = w.Flush()
  740. if err != nil {
  741. log.Println("历史版本管理文件写入失败")
  742. return -1,"",err
  743. }
  744. file.Close()
  745. addFile,err :=os.Open(path)
  746. if err != nil{
  747. log.Println(err)
  748. return -1,"",err
  749. }
  750. defer addFile.Close()
  751. //add 历史管理文件
  752. historyHash,err:=localSh.Add(addFile)
  753. if err != nil {
  754. log.Println("历史版本管理文件上传失败")
  755. return -1,"",err
  756. }
  757. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  758. serverSh.SetTimeout(30*time.Second)
  759. err = serverSh.Pin(historyHash)
  760. if err != nil {
  761. log.Println("历史版本管理文件备份失败")
  762. return -1,"",err
  763. }
  764. return commitHistory.Version,historyHash,nil
  765. }
  766. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  767. var out []byte
  768. buf := make([]byte, 1024, 1024)
  769. for {
  770. n, err := r.Read(buf[:])
  771. if n > 0 {
  772. d := buf[:n]
  773. out = append(out, d...)
  774. progress <- string(d)
  775. }
  776. if err != nil {
  777. // Read returns io.EOF at the end of file, which is not an error for us
  778. if err == io.EOF {
  779. err = nil
  780. }
  781. return out, err
  782. }
  783. }
  784. // never reached
  785. panic(true)
  786. return nil, nil
  787. }
  788. /**
  789. 单个文件信息
  790. */
  791. type simpleFileInfo struct {
  792. Name string `json:"name" `
  793. Extension string `json:"extension"`
  794. RelativePath string `json:"relativePath"`
  795. AbsolutePath string `json:"absolutePath"`
  796. }
  797. var gobalFolderFileMap map[string] *simpleFileInfo
  798. var gobalRelativePath string
  799. /**
  800. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  801. @param id 文件id
  802. */
  803. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  804. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  805. fileInfo,err :=os.Stat(absolutePath)
  806. if err!=nil{
  807. log.Println(err)
  808. return err
  809. }
  810. log.Println(filepath.Dir(absolutePath))
  811. //单个文件处理
  812. if !fileInfo.IsDir() {
  813. simpleFileInfo := new(simpleFileInfo)
  814. simpleFileInfo.Name=fileInfo.Name()
  815. simpleFileInfo.Extension=path.Ext(absolutePath)
  816. simpleFileInfo.RelativePath=""
  817. simpleFileInfo.AbsolutePath=absolutePath
  818. gobalFolderFileMap[absolutePath]=simpleFileInfo
  819. bytes,err :=json.Marshal(gobalFolderFileMap)
  820. if err != nil {
  821. log.Println(err)
  822. return err
  823. }
  824. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  825. log.Println(err)
  826. return err
  827. }
  828. return nil
  829. }
  830. //文件目录处理
  831. gobalRelativePath = filepath.Dir(absolutePath)
  832. err =filepath.Walk(absolutePath, myWalkfunc)
  833. if err != nil {
  834. log.Println(err)
  835. return err
  836. }
  837. bytes,err :=json.Marshal(gobalFolderFileMap)
  838. if err != nil {
  839. log.Println(err)
  840. return err
  841. }
  842. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  843. log.Println(err)
  844. return err
  845. }
  846. return nil
  847. }
  848. func myWalkfunc(path string, info os.FileInfo, err error) error {
  849. if info.IsDir()==false{
  850. simpleFileInfo := new(simpleFileInfo)
  851. simpleFileInfo.Name=info.Name()
  852. simpleFileInfo.Extension=filepath.Ext(path)
  853. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  854. simpleFileInfo.AbsolutePath=path
  855. gobalFolderFileMap[path]=simpleFileInfo
  856. return nil
  857. }
  858. return nil
  859. }
  860. /**
  861. 本地文件是否存在
  862. */
  863. func fileExist(path string) bool {
  864. _, err := os.Lstat(path)
  865. return !os.IsNotExist(err)
  866. }
  867. /**
  868. 获取本地文件列表
  869. */
  870. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  871. projectPath := gobalLocalProjectDir
  872. //log.Println("切换文件列表:"+projectPath)
  873. keyPrefix := gobalLoginUserName+"\\"+projectName+"\\"
  874. //添加监控
  875. err := filepath.Walk(projectPath,watchWalkfunc)
  876. if err != nil {
  877. log.Println(err)
  878. return err
  879. }
  880. //初始化通道
  881. if config.GobalWatchChannelMap[projectPath] != nil {
  882. close(config.GobalWatchChannelMap[projectPath])
  883. }
  884. config.GobalWatchChannelMap[projectPath]=make(chan string,100)
  885. log.Println("添加文件监控:"+projectPath)
  886. //定期校验缓存的本地文件状态
  887. dataMapa,err := etcdclient.QueryWithPrefix(keyPrefix)
  888. if err != nil {
  889. log.Println(err)
  890. }
  891. if dataMapa!=nil && len(dataMapa)>0{
  892. for k,_ := range dataMapa {
  893. if !fileExist(config.LocalWorkSpaceDir+k){
  894. err = etcdclient.DeleteWithPrefix(k)
  895. if err != nil {
  896. log.Println(err)
  897. }
  898. }
  899. }
  900. }
  901. //优先etcd查询
  902. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  903. if err != nil {
  904. log.Println(err)
  905. return err
  906. }
  907. if dataMap==nil || len(dataMap)==0{
  908. // 不存在则初始化进etcd
  909. err =filepath.Walk(gobalLocalProjectDir,walkfunc)
  910. //路径错误
  911. if err != nil {
  912. log.Println(err)
  913. if err := conn.WriteMessage(websocket.TextMessage, []byte("{}")); err != nil {
  914. log.Println(err)
  915. return err
  916. }
  917. }
  918. mapByte,err:=json.Marshal(gobalFileMap)
  919. if err != nil {
  920. log.Println(err)
  921. return err
  922. }
  923. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  924. log.Println(err)
  925. return err
  926. }
  927. cacheMap := make(map[string] string)
  928. for k,v := range gobalFileMap {
  929. k := strings.Replace(k,config.LocalWorkSpaceDir,"",1)
  930. cacheMap[k]=v
  931. }
  932. //异步缓存
  933. //go func() {
  934. err = etcdclient.BatchAdd(cacheMap)
  935. if err != nil {
  936. log.Println(err)
  937. }
  938. //}()
  939. //log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  940. //清空gobalFileMap
  941. gobalFileMap = make(map[string] string)
  942. }
  943. err=sendFileListFromEtcd(keyPrefix,projectName,conn)
  944. if err != nil {
  945. log.Println(err)
  946. return err
  947. }
  948. for actionAndModifyFilePathStr :=range config.GobalWatchChannelMap[gobalLocalProjectDir] {
  949. //log.Println(actionAndModifyFilePathStr)
  950. actionAndModifyFilePath := strings.Split(actionAndModifyFilePathStr,";")
  951. queryKey := strings.Replace(actionAndModifyFilePath[1],config.LocalWorkSpaceDir,"",1)
  952. //当前登陆用户判断
  953. if gobalLoginUserName != strings.Split(queryKey,"\\")[0] && actionAndModifyFilePathStr!=";"{
  954. log.Printf("非法用户修改%v", actionAndModifyFilePathStr)
  955. continue
  956. }
  957. if actionAndModifyFilePath[0]=="remove"{
  958. queryMap,err :=etcdclient.QueryWithPrefix(queryKey)
  959. if len(queryMap)==0{
  960. continue
  961. }
  962. err = etcdclient.DeleteWithPrefix(queryKey)
  963. if err != nil {
  964. log.Println(err)
  965. }
  966. }else if actionAndModifyFilePath[0]=="write"{
  967. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  968. if err != nil {
  969. log.Println(err)
  970. continue
  971. }
  972. if len(querymap)==0{
  973. continue
  974. }
  975. //更新判断
  976. if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
  977. continue
  978. }
  979. oldValue := strings.Split(querymap[queryKey],";")
  980. newValue := oldValue[0]+";" +"1"
  981. err = etcdclient.ReplaceInto(queryKey,newValue)
  982. if err!=nil{
  983. log.Println(err)
  984. continue
  985. }
  986. log.Printf("文件变更 [ %v ] write", actionAndModifyFilePathStr)
  987. //保存即同步逻辑,如果非新增文件则自动post
  988. //获取文件的历史版本管理文件hash
  989. //filePath := actionAndModifyFilePath[1]
  990. //fileName :=filepath.Base(filePath)
  991. //folderName := strings.Split(queryKey,"\\")[2]
  992. //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
  993. //var relativePath string
  994. //if len(strings.Split(dir, "\\"))==1{
  995. // relativePath = ""
  996. //}else{
  997. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  998. //}
  999. //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
  1000. //if err!=nil{
  1001. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1002. // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
  1003. // continue
  1004. //}
  1005. //
  1006. ////自动更新文件
  1007. //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
  1008. //if err!=nil{
  1009. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1010. // log.Printf("UploadCommand 返回失败,%v",err.Error())
  1011. // //记录修改状态
  1012. // newValue := oldValue[0]+";" +"1"
  1013. // err = etcdclient.ReplaceInto(queryKey,newValue)
  1014. // if err!=nil{
  1015. // log.Println(err)
  1016. // continue
  1017. // }
  1018. // continue
  1019. //}
  1020. //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
  1021. //continue
  1022. }else if actionAndModifyFilePath[0]=="create"{
  1023. querymap,err := etcdclient.QueryWithPrefix(queryKey)
  1024. if err != nil {
  1025. log.Println(err)
  1026. continue
  1027. }
  1028. if len(querymap)==0{
  1029. continue
  1030. }
  1031. //更新判断
  1032. if gobalFileDownLoadingMap[actionAndModifyFilePath[1]]==1{
  1033. continue
  1034. }
  1035. oldValue := strings.Split(querymap[queryKey],";")
  1036. newValue := oldValue[0]+";" +"1"
  1037. err = etcdclient.ReplaceInto(queryKey,newValue)
  1038. if err!=nil{
  1039. log.Println(err)
  1040. continue
  1041. }
  1042. log.Printf("文件变更 [ %v ] create", actionAndModifyFilePathStr)
  1043. //如果非新增文件则自动post
  1044. //if goabalAddFileMap[]
  1045. //获取文件的历史版本管理文件hash
  1046. //filePath := actionAndModifyFilePath[1]
  1047. //fileName :=filepath.Base(filePath)
  1048. //folderName := strings.Split(queryKey,"\\")[2]
  1049. //dir := strings.Replace(strings.Replace(queryKey,"\\"+fileName,"",1),strings.Split(queryKey,"\\")[0]+"\\"+strings.Split(queryKey,"\\")[1]+"\\","",1)
  1050. //var relativePath string
  1051. //if len(strings.Split(dir, "\\"))==1{
  1052. // relativePath = ""
  1053. //}else{
  1054. // relativePath = strings.Replace(relativePath, folderName+"\\","",1)
  1055. //}
  1056. //historyCurrentHash,err :=postGetHistoryCurrentHash(projectName, folderName, fileName, relativePath)
  1057. //if err!=nil{
  1058. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1059. // log.Printf("postGetHistoryCurrentHash 返回失败,%v",err.Error())
  1060. // continue
  1061. //}
  1062. //
  1063. ////自动更新文件
  1064. //err =UploadCommand(nil,filePath,fileName,projectName,dir,historyCurrentHash,"",gobalLoginUserName,false)
  1065. //if err!=nil{
  1066. // GobalMessageNotify <- fmt.Sprintf("文件更新失败\n%v",fileName)
  1067. // log.Printf("UploadCommand 返回失败,%v",err.Error())
  1068. // //记录修改状态
  1069. // newValue := oldValue[0]+";" +"1"
  1070. // err = etcdclient.ReplaceInto(queryKey,newValue)
  1071. // if err!=nil{
  1072. // log.Println(err)
  1073. // continue
  1074. // }
  1075. // continue
  1076. //}
  1077. //GobalMessageNotify <- fmt.Sprintf("文件更新成功\n%v",fileName)
  1078. //continue
  1079. }
  1080. err = sendFileListFromEtcd(keyPrefix,projectName,conn)
  1081. if err != nil {
  1082. log.Println(err)
  1083. return err
  1084. }
  1085. }
  1086. return nil
  1087. }
  1088. func sendFileListFromEtcd(keyPrefix,projectName string,conn *websocket.Conn) error{
  1089. dataMap,err := etcdclient.QueryWithPrefix(keyPrefix)
  1090. if err != nil {
  1091. log.Println(err)
  1092. return err
  1093. }
  1094. if dataMap!=nil && len(dataMap)>0{
  1095. cacheMap := make(map[string] string)
  1096. for k,v := range dataMap {
  1097. //历史数据加默认值
  1098. if len(strings.Split(v, ";"))==1{
  1099. v=v+";0"
  1100. }
  1101. cacheMap[strings.Replace(k,gobalLoginUserName+"\\"+projectName+"\\","",1)]=v
  1102. }
  1103. mapByte,err:=json.Marshal(cacheMap)
  1104. if err != nil {
  1105. log.Println(err)
  1106. return err
  1107. }
  1108. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  1109. log.Println(err)
  1110. return err
  1111. }
  1112. return nil
  1113. }else{
  1114. log.Println("未查询到数据,keyPrefix:"+keyPrefix+",projectName:"+projectName)
  1115. }
  1116. return nil
  1117. }
  1118. /**
  1119. 打开方式
  1120. */
  1121. func OpenFileWith(filePath string) error{
  1122. //判断文件有效性
  1123. _,err := os.Stat(filePath)
  1124. if err!=nil{
  1125. return err
  1126. }
  1127. //filePath = strings.Replace(filePath," ","~1",1)
  1128. cmd := exec.Command("rundll32.exe","shell32.dll,OpenAs_RunDLL",filePath);
  1129. err =cmd.Run()
  1130. if err!=nil{
  1131. log.Println(err)
  1132. return err
  1133. }
  1134. return nil
  1135. }
  1136. /**
  1137. 手动检查软件更新
  1138. 0:不强制更新
  1139. 1:强制更新
  1140. */
  1141. func CheckForUpdates(forceUpdate string) error{
  1142. tszdir :=os.Getenv("TSZDIR")
  1143. //空格路径处理
  1144. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\stop.vbs"
  1145. //判断文件有效性
  1146. _,err := os.Stat(tszdir+config.UpdaterName)
  1147. if err!=nil{
  1148. return err
  1149. }
  1150. cmd := exec.Command(tszdir+config.UpdaterName,"/justcheck");
  1151. err =cmd.Run()
  1152. if err!=nil{
  1153. log.Println(err)
  1154. return err
  1155. }
  1156. cmd = exec.Command(tszdir+config.UpdaterName,"/checknow");
  1157. err =cmd.Run()
  1158. if err!=nil{
  1159. log.Println(err)
  1160. return err
  1161. }
  1162. //检测到更新 kill所有客户端进程
  1163. log.Println("close all process")
  1164. cmd = exec.Command("cmd.exe","/c",ipfsPath);
  1165. err =cmd.Run()
  1166. if err!=nil{
  1167. log.Println(err)
  1168. return err
  1169. }
  1170. return nil
  1171. }
  1172. func walkfunc(filePath string, info os.FileInfo, err error) error {
  1173. if info == nil{
  1174. return nil
  1175. }
  1176. if info.IsDir()==false{
  1177. //历史文件不扫描
  1178. if path.Ext(filePath)==".commit" {
  1179. return nil
  1180. }
  1181. sh := shell.NewShell(config.GobalIpfsUrl)
  1182. file,err :=os.Open(filePath)
  1183. if err != nil{
  1184. log.Println(err)
  1185. return err
  1186. }
  1187. defer file.Close()
  1188. hash,err :=sh.Add(file)
  1189. if err != nil {
  1190. log.Println(err)
  1191. return err
  1192. }
  1193. dir :=strings.Replace(fmt.Sprint(filePath),fmt.Sprint(gobalLocalProjectDir+"\\"),"",1)
  1194. gobalFileMap[dir]=hash
  1195. }
  1196. return nil
  1197. }
  1198. /**
  1199. 查询历史文件
  1200. path 文件路径
  1201. hash 历史版本文件hash
  1202. */
  1203. func QueryCommitHistory(filePath,hash string) (map[int] *commitHistory,error){
  1204. result := make(map[int] *commitHistory)
  1205. //校验文件路径
  1206. _,err :=os.Stat(filePath)
  1207. if err != nil {
  1208. log.Println("文件 "+filePath+"not found")
  1209. return nil,errors.New("参数错误!")
  1210. }
  1211. if len(hash) == 0 {
  1212. return result,nil
  1213. }
  1214. //根据hash更新文件
  1215. localSh := shell.NewShell(config.GobalIpfsUrl)
  1216. //连接失败判断
  1217. localSh.SetTimeout(5*time.Second)
  1218. ext :=path.Ext(filePath)
  1219. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  1220. os.Remove(commitFilePath)
  1221. err = localSh.Get(hash,commitFilePath)
  1222. if err!=nil {
  1223. log.Println("文件"+hash+"下载失败")
  1224. return result,errors.New("历史文件获取失败,请稍后重试")
  1225. }
  1226. //设置文件隐藏属性
  1227. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1228. err =attribCmd.Run()
  1229. if err != nil {
  1230. log.Println("设置文件隐藏属性失败")
  1231. return result,err
  1232. }
  1233. //解析历史版本文件
  1234. contentByte,err := ioutil.ReadFile(commitFilePath)
  1235. content := string(contentByte)
  1236. if content==""{
  1237. return result,nil
  1238. }
  1239. rows :=strings.Split(content,"\n")
  1240. length := len(rows)
  1241. var index int = 0
  1242. for i:=length-2;i>=0;i--{
  1243. columns := strings.Split(rows[i],"\t")
  1244. commitHistoryInstance := new(commitHistory)
  1245. commitHistoryInstance.ParentHash =columns[0]
  1246. commitHistoryInstance.CurrentHash=columns[1]
  1247. commitHistoryInstance.Version,_=strconv.Atoi(columns[6])
  1248. commitHistoryInstance.Milestone,_=strconv.ParseBool(columns[5])
  1249. commitHistoryInstance.Creator=columns[2]
  1250. commitHistoryInstance.CreateTime,_=strconv.ParseInt(columns[3], 10, 64)
  1251. commitHistoryInstance.Note = columns[4]
  1252. result[index]=commitHistoryInstance
  1253. index++
  1254. }
  1255. return result,nil
  1256. }
  1257. /**
  1258. 设定某个历史版本为里程碑版本
  1259. @param filePath 文件绝对路径
  1260. @param commitHistoryHash 历史版本管理文件hash
  1261. @param hash 文件hash
  1262. @param milestone 是否是里程碑
  1263. */
  1264. func EditCommitHistoryMilestoneHandler(filePath,commitHistoryHash,hash string,milestone bool) (string,error){
  1265. //result := make(map[int] *commitHistory)
  1266. //校验文件路径
  1267. _,err :=os.Stat(filePath)
  1268. if err != nil {
  1269. log.Println("文件 "+filePath+"not found")
  1270. return "",errors.New("参数错误!")
  1271. }
  1272. if len(commitHistoryHash) == 0 {
  1273. log.Println("参数hash must not empty")
  1274. return "",errors.New("参数错误!")
  1275. }
  1276. //根据hash更新文件
  1277. localSh := shell.NewShell(config.GobalIpfsUrl)
  1278. //连接失败判断
  1279. localSh.SetTimeout(5*time.Second)
  1280. ext :=path.Ext(filePath)
  1281. commitFilePath :=strings.Replace(filePath,ext,".commit",1)
  1282. os.Remove(commitFilePath)
  1283. err = localSh.Get(commitHistoryHash,commitFilePath)
  1284. if err!=nil {
  1285. log.Println("文件"+commitHistoryHash+"下载失败")
  1286. return "",errors.New("历史文件获取失败,请稍后重试")
  1287. }
  1288. //设置文件隐藏属性
  1289. attribCmd :=exec.Command("attrib","+h",commitFilePath)
  1290. err =attribCmd.Run()
  1291. if err != nil {
  1292. log.Println("设置文件隐藏属性失败")
  1293. return "",err
  1294. }
  1295. //解析历史版本文件
  1296. contentByte,err := ioutil.ReadFile(commitFilePath)
  1297. content := string(contentByte)
  1298. if content==""{
  1299. return "",nil
  1300. }
  1301. rows :=strings.Split(content,"\n")
  1302. length := len(rows)
  1303. resultString :=""
  1304. for i:=0;i<length-1;i++{
  1305. columns := strings.Split(rows[i],"\t")
  1306. if columns[1]==hash{
  1307. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],milestone,columns[6])
  1308. continue
  1309. }
  1310. resultString=resultString+fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\t%v\t\n",columns[0],columns[1],columns[2],columns[3],columns[4],columns[5],columns[6])
  1311. }
  1312. os.Remove(commitFilePath)
  1313. fw, err := os.OpenFile(commitFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)//os.O_TRUNC清空文件重新写入,否则原文件内容可能残留
  1314. w := bufio.NewWriter(fw)
  1315. w.WriteString(resultString)
  1316. if err != nil {
  1317. log.Println(err)
  1318. return "",err
  1319. }
  1320. w.Flush()
  1321. fw.Close()
  1322. addFile,err :=os.Open(commitFilePath)
  1323. if err != nil{
  1324. log.Println(err)
  1325. return "",err
  1326. }
  1327. defer addFile.Close()
  1328. //add 历史管理文件
  1329. historyHash,err:=localSh.Add(addFile)
  1330. if err != nil {
  1331. log.Println("历史版本管理文件上传失败")
  1332. return "",err
  1333. }
  1334. serverSh :=shell.NewShell(config.ServerIpfsUrl)
  1335. serverSh.SetTimeout(5*time.Second)
  1336. err = serverSh.Pin(historyHash)
  1337. if err != nil {
  1338. log.Println("历史版本管理文件备份失败")
  1339. return "",err
  1340. }
  1341. return historyHash,nil
  1342. }
  1343. /*
  1344. 提交历史
  1345. */
  1346. type commitHistory struct {
  1347. ParentHash string `json:"parentHash"`
  1348. CurrentHash string `json:"currentHash"`
  1349. Creator string `json:"creator"`
  1350. CreateTime int64 `json:"createTime"`
  1351. Note string `json:"note"`
  1352. Version int `json:"version"`
  1353. Milestone bool `json:"milestone"`
  1354. }
  1355. /**
  1356. 消息通知
  1357. @param userId 用户ID
  1358. */
  1359. func MessageNotify(conn *websocket.Conn, userId string) (error){
  1360. msgKey :=fmt.Sprintf("lockingMsg\\%v",userId)
  1361. //返回全量通知消息列表
  1362. err :=queryEtcdToWebSocket(conn, msgKey)
  1363. if err!=nil{
  1364. log.Println(err)
  1365. return err
  1366. }
  1367. //消费通知消息到本地
  1368. nsqclient.Consumers(config.NsqTopic,(config.NsqChanelPrefix+userId),config.NsqAddr)
  1369. for message := range nsqclient.MsgQueue {
  1370. messOb :=nsqclient.LockingMsg{}
  1371. err:=json.Unmarshal([]byte(message),&messOb)
  1372. if err!=nil{
  1373. log.Println(err)
  1374. continue
  1375. }
  1376. for _, acceptUserId := range messOb.UserIds {
  1377. //log.Println(strconv.FormatInt(acceptUserId,10)+">>>"+userId)
  1378. if strconv.FormatInt(acceptUserId,10)==userId{
  1379. messagekey := msgKey+"\\"+strconv.FormatInt(messOb.Id,10)
  1380. err =etcdclient.ReplaceInto(messagekey,message)
  1381. if err!=nil{
  1382. nsqclient.MsgQueue <- message
  1383. log.Println(err)
  1384. }
  1385. err = queryEtcdToWebSocket(conn, messagekey)
  1386. if err!=nil{
  1387. nsqclient.MsgQueue <- message
  1388. log.Println(err)
  1389. }
  1390. }
  1391. }
  1392. }
  1393. return nil
  1394. }
  1395. /**
  1396. 消息通知
  1397. @param userId 用户ID
  1398. */
  1399. func MessageMarkReadHandler(conn *websocket.Conn, userId,messageId string) (err error){
  1400. msgKey :=fmt.Sprintf("lockingMsg\\%v\\%v",userId,messageId)
  1401. err = etcdclient.DeleteWithPrefix(msgKey)
  1402. return err
  1403. }
  1404. /**
  1405. 查询etcd对应key值发送给前端
  1406. */
  1407. func queryEtcdToWebSocket(conn *websocket.Conn,etcdKeyPrefix string) (err error){
  1408. msgs,err := etcdclient.QueryWithPrefix(etcdKeyPrefix)
  1409. if err!=nil{
  1410. log.Println(err)
  1411. return err
  1412. }
  1413. mapByte,err :=json.Marshal(msgs)
  1414. if err!=nil{
  1415. log.Println(err)
  1416. return err
  1417. }
  1418. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  1419. log.Println(err)
  1420. return err
  1421. }
  1422. return nil
  1423. }