文件同步
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

638 rivejä
14 KiB

  1. package handle
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "fts/config"
  6. "github.com/gorilla/websocket"
  7. _ "github.com/ipfs/go-ipfs-api"
  8. shell "github.com/ipfs/go-ipfs-api"
  9. "io"
  10. "log"
  11. "os"
  12. "os/exec"
  13. "path"
  14. "path/filepath"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var gobalLoginUserId string
  20. //key:filepath,value:hash
  21. var gobalFileMap = make(map[string] string)
  22. var gobalFileUpdateTimeMap = make(map[string] string)
  23. //var gobalFileChangeMap = make(map[string] string)
  24. var getLocalFileListDir string
  25. var gobalSubscriptionFileChangeSwitch int =0 //订阅文件变更开关
  26. var ipfsPath=os.Getenv("IPFS-PATH")
  27. /**
  28. 文件上传下载进度
  29. */
  30. type processStruct struct {
  31. Size string `json:"size"`
  32. CurrentSize string `json:"currentSize"`
  33. Unit string `json:"unit"`
  34. CurrentUnit string `json:"currentUnit"`
  35. Process float64 `json:"process"`
  36. Hash string `json:"hash"`
  37. }
  38. func main() {
  39. //config.InitConfig()
  40. //InitLocalWorkSpace("320872793405132801","test1")
  41. //
  42. //DownCommand("QmTp2hEo8eXRp6wg7jXv1BLCMh5a4F3B7buAUZNZUu772j","testOne","hello.txt","a/b/")
  43. //
  44. //UploadCommand("C:\\Users\\yuan_rh\\Downloads\\QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","QmRzN7uW6HCVAkGMXNWv3rC9dqPJijvEgvtW6DKsQQE8Js","testOne","a/b/")
  45. //
  46. //GetLocalFileList("testOne")
  47. }
  48. /**
  49. 初始化本地工作目录
  50. @param userId 用户ID
  51. @param projectName 项目名称
  52. */
  53. func InitLocalWorkSpace(conn *websocket.Conn,userId,projectName string) (error){
  54. //空格路径处理
  55. ipfsPath=strings.Replace(os.Getenv("IPFS-PATH"),"\"","",1)+"\\ipfs.exe"
  56. //初始化当前登陆用户
  57. gobalLoginUserId = userId
  58. // 检查本地目录是否存在
  59. var projectPath = config.LocalWorkSpaceDir +"\\"+userId+"\\"+projectName
  60. _,err := os.Stat(projectPath)
  61. if err != nil {
  62. //创建文件目录
  63. os.MkdirAll(projectPath, os.ModePerm)
  64. /*os.MkdirAll(projectPath+"\\我的文件", os.ModePerm)
  65. os.MkdirAll(projectPath+"\\工作文件", os.ModePerm)
  66. os.MkdirAll(projectPath+"\\协作文件", os.ModePerm)
  67. os.MkdirAll(projectPath+"\\公共文件", os.ModePerm)*/
  68. }
  69. log.Println("切换本地工作目录至 "+projectPath)
  70. if err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(projectPath))); err != nil {
  71. log.Println(err)
  72. return err
  73. }
  74. return nil
  75. }
  76. /**
  77. 下载指令
  78. @param hash ipfs哈希值
  79. @param projectName 项目名称
  80. @para fileName 文件名称
  81. @param dir 云文件目录
  82. */
  83. func DownCommand(conn *websocket.Conn, hash, projectName, fileName, dir string) error{
  84. absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
  85. //检查目录
  86. _,err := os.Stat(absoluteDir)
  87. if err != nil {
  88. //创建文件目录
  89. err = os.MkdirAll(absoluteDir, os.ModePerm)
  90. if err!=nil{
  91. log.Println(err)
  92. return err
  93. }
  94. }
  95. var downloading bool = false
  96. //检测文件打开状态
  97. tfile,err := os.OpenFile(fmt.Sprint(absoluteDir+"\\"+fileName),os.O_RDWR,1)
  98. if err != nil && (!os.IsNotExist(err)) {
  99. log.Println("文件被占用,请关闭打开的软件")
  100. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  101. return err
  102. }
  103. return err
  104. }
  105. defer tfile.Close()
  106. //serverSh := shell.NewShell(config.ServerIpfsUrl)
  107. ////检测引导节点是否连接成功
  108. //isUp := serverSh.IsUp()
  109. //if !isUp {
  110. // if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  111. // return err
  112. // }
  113. // return nil
  114. //}
  115. cmd := exec.Command(ipfsPath,"get", hash,"-o",fmt.Sprint(absoluteDir+"\\"+fileName))
  116. progress := make(chan string,10000)
  117. var stdout, stderr []byte
  118. var errStdout, errStderr error
  119. stdoutIn, _ := cmd.StdoutPipe()
  120. stderrIn, _ := cmd.StderrPipe()
  121. cmd.Start()
  122. go func() {
  123. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
  124. }()
  125. go func() {
  126. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
  127. }()
  128. go func(){
  129. millSeconds := time.Now().UnixNano() / 1e6
  130. for content := range progress { // 通道关闭后会退出for range循环
  131. current :=time.Now().UnixNano() / 1e6
  132. if current-millSeconds>500{
  133. projson,err := contentToJSONByte(content)
  134. if projson==nil && err==nil{
  135. continue
  136. }
  137. if err != nil {
  138. log.Printf("json.Marshal error %s\n", err)
  139. }
  140. millSeconds = current
  141. downloading = true
  142. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  143. log.Println(err)
  144. break
  145. }
  146. }
  147. if strings.Index(content,"100.00%")!=-1{
  148. projson,err := contentToJSONByte(content)
  149. if projson==nil && err==nil{
  150. continue
  151. }
  152. if err != nil {
  153. log.Printf("json.Marshal error %s\n", err)
  154. }
  155. downloading = true
  156. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  157. panic(err)
  158. }
  159. break
  160. }
  161. }
  162. }()
  163. log.Println("下载资源连接中...")
  164. //设置30秒连接超时
  165. go func() {
  166. index :=0
  167. for true{
  168. index++
  169. if downloading==true{
  170. return
  171. }
  172. time.Sleep(time.Duration(1)*time.Second)
  173. if downloading==false && index==30{
  174. err = cmd.Process.Kill()
  175. log.Println("进程连接超时30s已被Kill")
  176. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  177. return
  178. }
  179. return
  180. }
  181. }
  182. }()
  183. err = cmd.Wait()
  184. if err != nil {
  185. log.Printf("cmd.Run() failed with %s\n", err)
  186. }
  187. if errStdout != nil || errStderr != nil {
  188. log.Printf("failed to capture stdout or stderr\n")
  189. }
  190. outStr := string(stdout)
  191. log.Printf("out:%s", outStr)
  192. if err==nil{
  193. log.Println("下载成功")
  194. }
  195. //time.Sleep(time.Duration(6)*time.Second)
  196. defer close(progress)
  197. return nil
  198. }
  199. func contentToJSONByte(content string) ([]byte,error){
  200. sts :=strings.Split(content," ")
  201. if len(sts)<8{
  202. log.Println("字符长度小于8")
  203. return nil,nil
  204. }
  205. var processFloat float64
  206. if (len(sts)==9 || len(sts)==8){
  207. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
  208. }else{
  209. processFloat,_ =strconv.ParseFloat(strings.Replace(sts[8],"%","",1), 64)
  210. }
  211. if processFloat==0{
  212. //log.Println("当前进度0")
  213. return nil,nil
  214. }
  215. pro :=&processStruct{
  216. Size:sts[4],
  217. CurrentSize: sts[1],
  218. Unit: sts[2],
  219. CurrentUnit: sts[5],
  220. Process: processFloat,
  221. Hash: "",
  222. }
  223. projson,err :=json.Marshal(pro)
  224. return projson,err
  225. }
  226. /**
  227. 上传本地文件
  228. @param absolutePath 文件本地绝对路径
  229. @param fileName 文件名称
  230. @param projectName 项目名称
  231. @param dir 云文件目录
  232. */
  233. func UploadCommand(conn *websocket.Conn,absolutePath,fileName,projectName,dir string) error{
  234. //本地拷贝文件
  235. absoluteDir := config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\"+dir
  236. //检查目录
  237. _,err := os.Stat(absoluteDir)
  238. if err != nil {
  239. //创建文件目录
  240. err = os.MkdirAll(absoluteDir, os.ModePerm)
  241. if err!=nil{
  242. return err
  243. }
  244. }
  245. //检测文件打开状态
  246. tfile,err := os.OpenFile(absolutePath,os.O_RDWR,1)
  247. if err != nil {
  248. log.Println("文件被占用,请关闭打开的软件")
  249. if err := conn.WriteMessage(websocket.TextMessage, []byte("-2")); err != nil {
  250. return err
  251. }
  252. return err
  253. }
  254. defer tfile.Close()
  255. serverSh := shell.NewShell(config.ServerIpfsUrl)
  256. serverSh.SetTimeout(time.Duration(30)*time.Second)
  257. log.Println("检测引导节点存活情况"+config.ServerIpfsUrl)
  258. //检测引导节点是否连接成功
  259. isUp := serverSh.IsUp()
  260. if !isUp {
  261. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  262. return err
  263. }
  264. return nil
  265. }
  266. var uploading bool=false
  267. cmd := exec.Command(ipfsPath, "add",absolutePath)
  268. uploadProgress := make(chan string,10000)
  269. var stdout, stderr []byte
  270. var errStdout, errStderr error
  271. stdoutIn, _ := cmd.StdoutPipe()
  272. stderrIn, _ := cmd.StderrPipe()
  273. cmd.Start()
  274. go func() {
  275. stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
  276. }()
  277. go func() {
  278. stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
  279. }()
  280. go func(){
  281. millSeconds := time.Now().UnixNano() / 1e6
  282. for content := range uploadProgress { // 通道关闭后会退出for range循环
  283. current :=time.Now().UnixNano() / 1e6
  284. if current-millSeconds>500{
  285. projson,err := contentToJSONByte(content)
  286. if projson==nil && err==nil{
  287. continue
  288. }
  289. if err != nil {
  290. log.Println("json.Marshal error %s\n", err)
  291. }
  292. uploading=true
  293. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  294. break
  295. }
  296. millSeconds = current
  297. }
  298. if strings.Index(content,"100.00%")!=-1{
  299. projson,err := contentToJSONByte(content)
  300. if projson==nil && err==nil{
  301. continue
  302. }
  303. if err != nil {
  304. log.Println("json.Marshal error %s\n", err)
  305. }
  306. uploading=true
  307. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  308. panic(err)
  309. }
  310. break
  311. }
  312. }
  313. }()
  314. log.Println("文件上传中...")
  315. //设置30秒连接超时
  316. go func() {
  317. index :=0
  318. for true{
  319. index++
  320. if uploading==true{
  321. return
  322. }
  323. time.Sleep(time.Duration(1)*time.Second)
  324. if uploading==false && index==30{
  325. err = cmd.Process.Kill()
  326. log.Println("进程连接超时30s已被Kill")
  327. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  328. return
  329. }
  330. return
  331. }
  332. }
  333. }()
  334. err = cmd.Wait()
  335. if err != nil {
  336. log.Println("cmd.Run() failed with %s\n", err)
  337. }
  338. if errStdout != nil || errStderr != nil {
  339. log.Println("failed to capture stdout or stderr\n")
  340. }
  341. outStr := string(stdout)
  342. log.Printf("out:%s", outStr)
  343. /*sh.Get(hash,absoluteDir+fileName)
  344. if err != nil {
  345. return "",err
  346. }*/
  347. defer close(uploadProgress)
  348. prog := new(processStruct)
  349. prog.Hash=strings.Split(outStr," ")[1]
  350. prog.Process=100.00
  351. sh := shell.NewShell(config.GobalIpfsUrl)
  352. sh.SetTimeout(time.Duration(30)*time.Second)
  353. objectStat,err :=sh.ObjectStat(prog.Hash)
  354. if err != nil {
  355. log.Println(err)
  356. return err
  357. }
  358. prog.Size=strconv.Itoa(objectStat.CumulativeSize)
  359. projson,err :=json.Marshal(prog)
  360. serverSh.SetTimeout(time.Duration(30)*time.Second)
  361. err = serverSh.Pin(prog.Hash)
  362. if err != nil {
  363. log.Println("引导节点备份失败")
  364. log.Println(err)
  365. if err := conn.WriteMessage(websocket.TextMessage, []byte("-1")); err != nil {
  366. return err
  367. }
  368. return err
  369. }
  370. if err := conn.WriteMessage(websocket.TextMessage, projson); err != nil {
  371. log.Println(err)
  372. }
  373. log.Println("引导节点文件备份成功")
  374. //本地文件夹拷贝
  375. err = sh.Get(prog.Hash,fmt.Sprint((absoluteDir+"\\"+fileName)))
  376. if err != nil {
  377. log.Println(err)
  378. return err
  379. }
  380. log.Println("上传成功")
  381. return nil
  382. }
  383. func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
  384. var out []byte
  385. buf := make([]byte, 1024, 1024)
  386. for {
  387. n, err := r.Read(buf[:])
  388. if n > 0 {
  389. d := buf[:n]
  390. out = append(out, d...)
  391. progress <- string(d)
  392. }
  393. if err != nil {
  394. // Read returns io.EOF at the end of file, which is not an error for us
  395. if err == io.EOF {
  396. err = nil
  397. }
  398. return out, err
  399. }
  400. }
  401. // never reached
  402. panic(true)
  403. return nil, nil
  404. }
  405. /**
  406. 单个文件信息
  407. */
  408. type simpleFileInfo struct {
  409. Name string `json:"name" `
  410. Extension string `json:"extension"`
  411. RelativePath string `json:"relativePath"`
  412. AbsolutePath string `json:"absolutePath"`
  413. }
  414. var gobalFolderFileMap map[string] *simpleFileInfo
  415. var gobalRelativePath string
  416. /**
  417. 获取指定目录或文件的文件信息,如果是目录递归获取文件信息
  418. @param id 文件id
  419. */
  420. func GetFolderFileInfo(conn *websocket.Conn,absolutePath string) error{
  421. gobalFolderFileMap = make(map[string] *simpleFileInfo)
  422. fileInfo,err :=os.Stat(absolutePath)
  423. if err!=nil{
  424. log.Println(err)
  425. return err
  426. }
  427. log.Println(filepath.Dir(absolutePath))
  428. //单个文件处理
  429. if !fileInfo.IsDir() {
  430. simpleFileInfo := new(simpleFileInfo)
  431. simpleFileInfo.Name=fileInfo.Name()
  432. simpleFileInfo.Extension=path.Ext(absolutePath)
  433. simpleFileInfo.RelativePath=""
  434. simpleFileInfo.AbsolutePath=absolutePath
  435. gobalFolderFileMap[absolutePath]=simpleFileInfo
  436. bytes,err :=json.Marshal(gobalFolderFileMap)
  437. if err != nil {
  438. log.Println(err)
  439. return err
  440. }
  441. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  442. log.Println(err)
  443. return err
  444. }
  445. return nil
  446. }
  447. //文件目录处理
  448. gobalRelativePath = filepath.Dir(absolutePath)
  449. err =filepath.Walk(absolutePath, myWalkfunc)
  450. if err != nil {
  451. log.Println(err)
  452. return err
  453. }
  454. bytes,err :=json.Marshal(gobalFolderFileMap)
  455. if err != nil {
  456. log.Println(err)
  457. return err
  458. }
  459. if err := conn.WriteMessage(websocket.TextMessage, bytes); err != nil {
  460. log.Println(err)
  461. return err
  462. }
  463. return nil
  464. }
  465. func myWalkfunc(path string, info os.FileInfo, err error) error {
  466. if info.IsDir()==false{
  467. simpleFileInfo := new(simpleFileInfo)
  468. simpleFileInfo.Name=info.Name()
  469. simpleFileInfo.Extension=filepath.Ext(path)
  470. simpleFileInfo.RelativePath=filepath.Dir(strings.Replace(path,gobalRelativePath,"",1))
  471. simpleFileInfo.AbsolutePath=path
  472. gobalFolderFileMap[path]=simpleFileInfo
  473. return nil
  474. }
  475. return nil
  476. }
  477. /**
  478. 获取本地文件列表
  479. */
  480. func SubscriptionFileChange(conn *websocket.Conn, projectName string) error{
  481. getLocalFileListDir = fmt.Sprint(config.LocalWorkSpaceDir+"\\"+gobalLoginUserId+"\\"+projectName+"\\")
  482. for {
  483. err :=filepath.Walk(getLocalFileListDir,walkfunc)
  484. if err != nil {
  485. log.Println(err)
  486. time.Sleep(time.Duration(1)*time.Minute)
  487. continue
  488. }
  489. mapByte,err:=json.Marshal(gobalFileMap)
  490. if err != nil {
  491. log.Println(err)
  492. return err
  493. }
  494. if err := conn.WriteMessage(websocket.TextMessage, mapByte); err != nil {
  495. log.Println(err)
  496. return err
  497. }
  498. log.Println("检查本地目录文件变更状态,执行睡眠时间1分钟")
  499. //清空gobalFileMap
  500. gobalFileMap = make(map[string] string)
  501. time.Sleep(time.Duration(1)*time.Minute)
  502. }
  503. return nil
  504. }
  505. func walkfunc(path string, info os.FileInfo, err error) error {
  506. if info.IsDir()==false{
  507. sh := shell.NewShell(config.GobalIpfsUrl)
  508. file,err :=os.Open(path)
  509. if err != nil{
  510. log.Println(err)
  511. return err
  512. }
  513. defer file.Close()
  514. hash,err :=sh.Add(file)
  515. if err != nil {
  516. log.Println(err)
  517. return err
  518. }
  519. dir :=strings.Replace(fmt.Sprint(path),fmt.Sprint(getLocalFileListDir),"",1)
  520. gobalFileMap[dir]=hash
  521. }
  522. return nil
  523. }