|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846 |
- package handler
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/gogo/protobuf/sortkeys"
- "github.com/gorilla/websocket"
- ipfs "github.com/ipfs/go-ipfs-api"
- "github.com/pkg/errors"
- "io"
- "locking-kit-server/consts"
- "locking-kit-server/db"
- "locking-kit-server/env"
- "log"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- )
-
- /**
- * @author yuanrh
- * @description websocket服务核心逻辑
- * @date 2021/6/28 11:11
- **/
-
- var TaskToWebChanel chan map[string] interface{}
-
- var TaskToDownloadOrUploadChanel chan map[string] interface{}
-
- var MessageToWebChanel chan map[string] interface{}
-
- type ProjLockingMsg struct {
- Id int64 ` description:"主键ID" json:"Id,string"`
- UserId int64 ` description:"用户ID" json:"UserId,string"`
- Title string `description:"消息标题"`
- Body string `description:"消息主体"`
- Status int8 `description:"消息状态 0:未读,1:已读"`
- Type int8 `description:"消息类型 "`
- Parameter string `description:"参数"`
- UnixTime int64 `description:"unix时间戳" json:"UnixTime,string"`
- CreateUserId int64 `description:"创建者" json:"CreateUserId,string"`
- CreateTime time.Time `description:"创建时间"`
- ModifyUserId int64 `description:"修改者" json:"ModifyUserId,string"`
- ModifyTime time.Time `description:"修改时间"`
- }
-
-
- //监听任务同步情况
- func SubscriptionTaskSyncHandler(conn *websocket.Conn) (err error){
-
- if env.LoginStatus == 0{
- return nil
- }
-
- //获取全部任务,包括 同步中、已同步、同步异常
- prefix := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC)
- task,err := db.QueryWithPrefix(prefix)
- if err != nil{
- log.Printf("监听同步任务失败,%v", err)
- return err
- }
-
- var taskIds []int64
-
- //排序
- for k, _ := range task {
-
- taskId,err :=strconv.ParseInt(strings.Split(k, "/")[3], 10, 64)
- if err!=nil{
- log.Println(err)
- }
-
- taskIds = append(taskIds, taskId)
- }
- sortkeys.Int64s(taskIds)
-
- for _,taskId := range taskIds {
-
- key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId)
- data,err := taskStringToMap(task[key])
- if err !=nil{
- log.Println(err)
- continue
- }
- TaskToWebChanel <- data
- }
-
- //加入下载队列
- for index,taskId := range taskIds {
-
- key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,taskId)
- data,err := taskStringToMap(task[key])
- if err !=nil{
- log.Println(err)
- continue
- }
-
- //解析开机下载任务
- //优先下载中
- if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_ING {
- TaskToDownloadOrUploadChanel <- data
- continue
- }
-
- if data[consts.TASK_SYNC_STATUS] == consts.TASK_SYNC_STATUS_WAIT {
- TaskToDownloadOrUploadChanel <- data
- }
-
- //
- if len(taskIds)>1000 && index<(len(taskIds)-1001){
- err = db.DeleteWithPrefix(key)
- log.Println(err)
- }
- }
-
- //备份
- go func() {
- for true {
- syncUploadToBackUpNode()
- time.Sleep(2 * time.Minute)
- }
- }()
-
- //开启异步下载/上传任务
- syncDownloadOrUpload()
-
- //任务同步信道
- for v := range TaskToWebChanel {
-
- value := CopyMap(v)
- data,err := json.Marshal(value)
- if err!=nil{
- log.Println(err)
- }
-
- if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
- log.Printf("监听同步任务失败,%v", err)
- return err
- }
- }
-
- return nil
- }
-
- //任务对象字符串转map对象
- func taskStringToMap(task string)(data map[string] interface{}, err error){
-
- err = json.Unmarshal([]byte(task),&data)
- if err != nil{
- log.Println(err)
- return data,err
- }
- return data,err
- }
-
- //处理下载、上传队列
- func syncDownloadOrUpload(){
- go func() {
- log.Print("开启异步处理下载、上传队列...")
- for v := range TaskToDownloadOrUploadChanel {
- my := CopyMap(v)
- //延迟2秒进入下载
- time.Sleep(2*time.Second)
- if my[consts.TASK_TYPE]==consts.TASK_TYPE_DOWNLOAD{
- downLoadTask(my)
- continue
- }
- upLoadTask(my)
- }
- }()
- }
-
- //执行单个上传任务
- func upLoadTask(task map[string] interface{}){
-
- remoteIpfsApi := ipfs.NewShell(env.IpfsApi)
-
- isUp := remoteIpfsApi.IsUp()
- if !isUp {
- log.Println("盒子节点网络连接不通!")
- //TaskToDownloadOrUploadChanel <- task
- //return
- }
-
- //上传启动标识,有上传进度则设置为true
- var uploading bool=false
-
- //cmd ipfs get
- cmd := exec.Command(env.IpfsPath, "add", task[consts.TASK_ABSOLUTE_PATH].(string))
- uploadProgress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, uploadProgress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, uploadProgress)
- }()
-
- //上传
- updateTaskProgress(task, 0)
-
- //异步给前端反馈上传进度
- go func(){
- first := true
- //millSeconds := time.Now().UnixNano() / 1e6
- //current :=time.Now().UnixNano() / 1e6
- for content := range uploadProgress { // 通道关闭后会退出for range循环
- //current =time.Now().UnixNano() / 1e6
- if first {
- pro,err := contentToJSONByte(content)
-
- if err!=nil{
- log.Print(err)
- continue
- }
-
- if pro.Process == 0{
- continue
- }
-
- //设置上传启动标识为true
- updateTaskProgress(task, pro.Process)
- uploading=true
- //millSeconds = current
- first=false
- }
-
- //if current-millSeconds>500{
- pro,err := contentToJSONByte(content)
-
- if err!=nil{
- log.Print(err)
- continue
- }
-
- if pro.Process == 0{
- continue
- }
-
- uploading=true
- updateTaskProgress(task, pro.Process)
- //millSeconds = current
- //}
- }
- }()
-
- log.Println("资源上传中...")
- //
- //设置30秒连接超时,30秒未启动上传则上传失败
- go func() {
- index :=0
- for true{
- if uploading==true{
- return
- }
- index++
- time.Sleep(time.Duration(1)*time.Second)
- if uploading==false && index==30{
- log.Println("资源连接超时(30s),上传进程被终止")
- err := cmd.Process.Kill()
- if err!=nil{
- log.Println(err)
- }
- updateTaskStatusToFail(task)
- return
- }
-
- if index>30{
- return
- }
- }
- }()
-
- //等待执行完成
- err := cmd.Wait()
- log.Println("add 执行完成")
- if err != nil {
- log.Println("cmd.Run() failed with %s\n", err)
- updateTaskStatusToFail(task)
- return
- }
- if errStdout != nil || errStderr != nil {
- log.Println("failed to capture stdout or stderr\n")
- updateTaskStatusToFail(task)
- return
- }
- outStr := string(stdout)
- fileHash := strings.Split(outStr," ")[1]
- task["IpfsCid"] = fileHash
- log.Printf("out:%s", outStr)
- defer close(uploadProgress)
-
- //ipfs provide
- cmd = exec.Command(env.IpfsPath,"dht","provide",fileHash)
- err = cmd.Run()
- if err != nil {
- log.Println(err)
- return
- }
-
- //加入ipfs盒子节点备份队列
- task[consts.TASK_IPFS_API]= env.IpfsApi
- err = insertFileNodeBackupTask(task)
- if err!=nil{
- log.Println(err)
- }
-
- //更新远程服务中心文件对象Hash值和历史记录
- var id string
- var projId string
- var folderId string
- var modifyUserId string
- var fileSzie string
- log.Println(task)
- if task["Id"]!=nil {
- id = task["Id"].(string)
- }
- projId = task["ProjId"].(string)
- folderId = task["FolderId"].(string)
- modifyUserId = task["ModifyUserId"].(string)
- fileSzie = strconv.FormatInt(interfaceToInt64(task["FileSize"]),10)
-
- returnData, err := replaceIntoArchiveById(id, projId, folderId, task["ArchName"].(string), task["Extension"].(string), task["IpfsCid"].(string), fileSzie, task["RelativePath"].(string), modifyUserId, interfaceToInt(task["Version"]))
- if err !=nil{
- log.Printf("replaceIntoArchiveById err %v", err)
- updateTaskStatusToFail(task)
- if err.Error()=="版本冲突"{
- sendConflictNotifyToWeb(2, task)
- }
- return
- }
-
- if returnData!=nil{
- //远程对象重置
- remoteTask := returnData
- task["ModifyName"] = remoteTask["ModifyName"].(string)
- //更新工作空间文件对象
- replceIntoWorkSpaceFileObject(task[consts.TASK_ABSOLUTE_PATH].(string),remoteTask, false)
- }
-
- //完成上传
- updateTaskProgress(task, 100.00)
-
- log.Printf("叮,资源文件[ %v ]上传完成",task[consts.TASK_ABSOLUTE_PATH].(string))
- }
-
- //异步处理备份节点
- func syncUploadToBackUpNode(){
-
- key := fmt.Sprintf("/%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK)
- records,err := db.QueryWithPrefix(key)
- if err != nil{
- log.Println()
- }
-
- if records==nil || len(records)==0{
- return
- }
-
- for k, v := range records {
-
- var task = make(map[string] interface{})
- err := json.Unmarshal([]byte(v),&task)
- if err !=nil{
- log.Println(err)
- }
-
- if task[consts.TASK_IPFS_API]==""{
- log.Println("ipfs api 未配置")
- }
-
- //备份
- remoteIpfsApi := ipfs.NewShell(env.IpfsApi)
- isUp := remoteIpfsApi.IsUp()
- if !isUp {
- log.Println("盒子节点网络连接不通!")
- continue
- }
-
- //判断备份节点对等节点是否已添加连接
- localsh := ipfs.NewShell("localhost:5001")
- idOut,err :=localsh.ID()
- localId :=idOut.ID
- swarmConnInfos,err :=remoteIpfsApi.SwarmPeers(context.Background())
- hasConnect := false
- for _,swarmconn := range swarmConnInfos.Peers {
- if swarmconn.Peer==localId{
- hasConnect=true
- break
- }
- }
-
- if !hasConnect{
- log.Println("中继处理")
- swarmConnectAddr :="/ipfs/"+"12D3KooWER8uoGrzeLuJHTXoTMnR4jjHfpcA6ZcpXBaQNJvG5jMP"+"/p2p-circuit/ipfs/"+localId
- errT := remoteIpfsApi.SwarmConnect(context.Background(),swarmConnectAddr)
- if errT!=nil{
- log.Printf("中继失败,引导节点备份失败 %v", err)
- }
- }
-
- err = remoteIpfsApi.Pin(task["IpfsCid"].(string))
- if err != nil{
- log.Println(err)
- }
- log.Printf("文件对象 %v 备份成功", v)
-
- //移除
- err = db.DeleteWithPrefix(k)
- if err != nil{
- log.Println(err)
- }
- }
-
- }
-
- func CopyMap(source map[string] interface{} ) map[string] interface{}{
-
- target := make(map[string] interface{})
- for k, v := range source {
- target[k]=v
- }
- return target
- }
-
- //新增文件节点备份任务
- func insertFileNodeBackupTask(task map[string] interface{}) error{
-
- myTask := CopyMap(task)
- //key=userId:TASK_SYNC_STATUS_WAIT:taskId
- archiveByte,err := json.Marshal(myTask)
- if err !=nil{
- log.Println(err)
- return err
- }
- // key -> /userid/TASK_SYNC/417367746536689664
- key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_FILE_NODE_BACK,myTask[consts.TASK_ID].(string))
- err = db.ReplaceInto(key,string(archiveByte))
- if err !=nil{
- log.Println(err)
- return err
- }
-
- return nil
- }
-
- //执行单个下载任务
- func downLoadTask(task map[string] interface{}){
-
- //检查文件目录是否存在,不存在则创建
- directory := fmt.Sprintf("%v%v%v%v%v%v%v%v%v",env.WorkSpace,string(os.PathSeparator),env.CurrentUserPhone,string(os.PathSeparator),task["ProjName"],string(os.PathSeparator),task["NodeName"],string(os.PathSeparator),task["RelativePath"])
- _,err := os.Stat(directory)
- if err != nil {
- //创建文件目录
- err = os.MkdirAll(directory, os.ModePerm)
- if err!=nil{
- log.Println(err)
- updateTaskStatusToFail(task)
- return
- }
- }
-
- //判断文件是否冲突(本地文件存在,且已修改未提交)
- filePath := filepath.Clean(fmt.Sprintf("%v%v%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string),task["Extension"].(string)))
- orginFilePath := filePath
- conflict,err := isConflict(orginFilePath)
- if err!=nil{
- log.Println(err)
- updateTaskStatusToFail(task)
- return
- }
- log.Printf("文件冲突:%v", conflict)
- if conflict {
- //文件名_冲突文件_作者_时间戳
- modifyUnix := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(task["ModifyTime"].(string),"-",""),":",""),"+0800",""),"T","")
- filePath = filepath.Clean(fmt.Sprintf("%v%v%v_冲突文件_%v_%v.%v",directory,string(os.PathSeparator),task["ArchName"].(string), task["ModifyName"].(string), modifyUnix, task["Extension"].(string)))
- }
-
- //临时文件命名.locking
- tempFilePath :=filePath+".locking"
-
- //下载启动标识,有下载进度则设置为true
- var downloading bool = false
-
- //cmd ipfs get
- progress := make(chan string,10000)
- var stdout, stderr []byte
- var errStdout, errStderr error
- cmd := exec.Command(env.IpfsPath,"get", task["IpfsCid"].(string),"-o", tempFilePath)
- stdoutIn, _ := cmd.StdoutPipe()
- stderrIn, _ := cmd.StderrPipe()
- cmd.Start()
- go func() {
- stdout, errStdout = copyAndCapture(os.Stdout, stdoutIn, progress)
- }()
- go func() {
- stderr, errStderr = copyAndCapture(os.Stderr, stderrIn, progress)
- }()
-
- //开始下载
- updateTaskProgress(task, 0)
-
- //异步读取进度
- //异步定时读取进度反馈给前端,每500ms返回一次进度
- go func(){
- //millSeconds := time.Now().UnixNano() / 1e6
- for content := range progress { // 通道关闭后会退出for range循环
- //current :=time.Now().UnixNano() / 1e6
- if !downloading{
- log.Printf("%v.%v %v 资源连接成功,下载中...",task["ArchName"],task["Extension"], task["IpfsCid"])
- }
- downloading = true
- //current-millSeconds>500 &&
- if strings.Index(content,"100.00%")==-1{
- pro,err := contentToJSONByte(content)
-
- if err!=nil{
- log.Print(err)
- continue
- }
-
- if pro.Process == 0{
- pro.Process = 1
- }
-
- //设置下载启动标识和下载时间戳
- //millSeconds = current
-
- //更新进度、状态
- updateTaskProgress(task, pro.Process)
- break
- }
-
- }
- }()
-
- //设置30秒连接超时,30秒未启动下载则下载失败
- go func() {
- index :=0
- for true{
- //启动下载则不做超时判断
- if downloading==true{
- return
- }
- index++
- time.Sleep(time.Duration(1)*time.Second)
- if downloading==false && index==30{
- err = cmd.Process.Kill()
- log.Println("资源连接超时(30s),下载进程被终止")
- updateTaskStatusToFail(task)
- return
- }
-
- if index>30{
- return
- }
- }
- }()
-
- //等待下载执行完成
- err = cmd.Wait()
- if err != nil {
- updateTaskStatusToFail(task)
- log.Printf("cmd.Run() failed with %s\n", err)
- return
- }
- if errStdout != nil || errStderr != nil {
- updateTaskStatusToFail(task)
- log.Printf("failed to capture stdout or stderr\n")
- return
- }
- outStr := string(stdout)
- log.Printf("下载成功,:%s", outStr)
-
- //临时文件重命名
- err = os.Rename(tempFilePath, filePath)
- if err!=nil{
- log.Println(err)
- }
- time.Sleep(1*time.Second)
-
- //完成下载
- task[consts.TASK_ABSOLUTE_PATH] = orginFilePath
- updateTaskProgress(task, 100.00)
-
- //更新工作空间文件对象
- replceIntoWorkSpaceFileObject(orginFilePath,task, conflict)
-
- //冲突发送,通知
- if conflict {
- sendConflictNotifyToWeb(3, task)
- }
-
- defer close(progress)
- }
- // type 2:上传冲突,3:下载冲突
- func sendConflictNotifyToWeb(msgType int64, param map[string] interface{}){
-
- var lockingMsg = make(map[string] interface{})
- lockingMsg["UserId"]=env.CurrentUserId
- lockingMsg["Title"]="文件冲突"
- lockingMsg["Body"]="文件冲突"
- lockingMsg["Status"]=0
- lockingMsg["Type"]= msgType
- lockingMsg["Parameter"]=param
-
- MessageToWebChanel <- lockingMsg
- }
-
- //在线通知
- func ListeningRemoteMessage(){
- log.Println("开启在线通知线程")
- for true {
-
- unixTime := strconv.FormatInt(time.Now().Unix(),10)
- time.Sleep(3*time.Minute)
- data,err := GetLockingMsgListByFilter(env.CurrentUserId,unixTime)
- if err != nil{
- log.Printf("消息通知失败! %v", err)
- continue
- }
-
- if data==nil{
- log.Println("未查询到通知消息")
- continue
- }
-
- //遍历消息
- for _, msg := range data {
- MessageToWebChanel <- msg.(map[string] interface{})
- }
- }
- }
-
- //是否存在文件冲突
- func isConflict(filePath string) (isConflict bool,err error){
-
- key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath)
- taskMap,err := db.QueryWithPrefix(key)
- if err != nil{
- log.Println(err)
- return false,err
- }
-
- //不存在则否
- taskString := taskMap[key]
- if len(taskString)==0{
- return false,err
- }
-
- var task map[string] interface{}
- err = json.Unmarshal([]byte(taskString),&task)
- if err != nil{
- log.Println(err)
- return false,err
- }
-
- return task[consts.TASK_IS_MODIFY].(bool),nil
- }
-
- //更新任务进度
- func updateTaskProgress(task map[string] interface{}, progress float64){
-
- task[consts.TASK_SYNC_PROGRESS]= progress
- if progress==100.00{
- task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_FINISH
- }else{
- task[consts.TASK_SYNC_STATUS]= consts.TASK_SYNC_STATUS_ING
- }
-
- taskByte,err := json.Marshal(task)
- if err !=nil{
- log.Printf("序列化失败 %v \n", err)
- }
- // key -> /userid/TASK_SYNC/417367746536689664
- key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
- err = db.ReplaceInto(key,string(taskByte))
- if err !=nil{
- log.Printf("etcd put 失败 %v \n", err)
- }
-
- TaskToWebChanel <- task
- }
-
- //更新任务状态为失败
- func updateTaskStatusToFail(task map[string] interface{}){
-
- task[consts.TASK_SYNC_STATUS] = consts.TASK_SYNC_STATUS_FAIL
-
- taskByte,err := json.Marshal(task)
- if err !=nil{
- log.Printf("序列化失败 %v \n", err)
- }
- // key -> /userid/TASK_SYNC/417367746536689664
- key := fmt.Sprintf("/%v%v%v",env.CurrentUserPhone, consts.ETCD_DIRECTOR_TASK_SYNC,task[consts.TASK_ID])
- err = db.ReplaceInto(key,string(taskByte))
- if err !=nil{
- log.Printf("etcd put 失败 %v \n", err)
- }
- TaskToWebChanel <- task
- }
-
- func interfaceToInt64(id interface{}) int64{
- switch id.(type) {
- case float64:
- return int64(id.(float64))
- default:
- return id.(int64)
- }
- }
-
- func interfaceToInt(id interface{}) int{
- switch id.(type) {
- case float64:
- return int(id.(float64))
- default:
- return id.(int)
- }
- }
-
- /**
- 文件上传下载进度
- */
- type processStruct struct {
- Size string `json:"size"`
- CurrentSize string `json:"currentSize"`
- Unit string `json:"unit"`
- CurrentUnit string `json:"currentUnit"`
- Process float64 `json:"process"`
- Hash string `json:"hash"`
- CommitHistoryHash string `json:"commitHistoryHash"`
- Version int `json:"version"`
- }
-
- //更新工作空间文件对象
- func replceIntoWorkSpaceFileObject(filePath string,task map[string] interface{}, conflict bool){
-
- //冲突文件不做标记
- if strings.Index(filePath,"_冲突文件_")>-1{
- return
- }
-
- key := fmt.Sprintf("/%v%v%v", env.CurrentUserPhone, consts.ETCD_DIRECTOR_WORKSPACE_FILE_OBJECT,filePath)
- if !conflict {
- task[consts.TASK_IS_MODIFY] = false
- }else{
- task[consts.TASK_IS_MODIFY] = true
- }
- taskByte,err := json.Marshal(task)
- if err !=nil{
- log.Println(err)
- return
- }
- err = db.ReplaceInto(key, string(taskByte))
- if err !=nil{
- log.Println(err)
- }
- }
-
- //读取解析cmd返回文件上传或下载进度信息
- func contentToJSONByte(content string) (pro processStruct, err error){
-
- sts :=strings.Split(content," ")
- //log.Printf("content:%v,length:%v", content, len(sts))
- if len(sts)<8{
- log.Println("字符长度小于8")
- log.Println(content)
- return pro,errors.New("字符长度小于8")
- }
- //for i, v := range sts {
- // log.Printf("%v -> %v", i, v)
- //}
- var processFloat float64
- //var err error
- //if (len(sts)==9 || len(sts)==8){
- //processFloat,err =strconv.ParseFloat(strings.Replace(sts[7],"%","",1), 64)
- //}else if (len(sts)==10 || len(sts)==11){
- //processFloat,err =strconv.ParseFloat(strings.Replace(sts[9],"%","",1), 64)
- //}else{
- //processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64)
- //}
- if strings.Index(content,"m")>-1{
- processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-2],"%","",1), 64)
- }else{
- processFloat,err =strconv.ParseFloat(strings.Replace(sts[len(sts)-1],"%","",1), 64)
- }
-
- if err !=nil{
- log.Printf("%v, length:%v, err:%v",content, len(sts), err)
- //for i, v := range sts {
- // log.Printf("%v -> %v", i, v)
- //}
- return pro,err
- }
-
- if processFloat==0{
- //log.Println("当前进度0")
- pro.Process = 0
- return pro,err
- }
-
- pro.Size = sts[4]
- pro.CurrentSize = sts[1]
- pro.Unit = sts[2]
- pro.CurrentUnit = sts[5]
- pro.Process = processFloat
-
- return pro,err
- }
-
- func copyAndCapture(w io.Writer, r io.Reader, progress chan string) ([]byte, error) {
- var out []byte
- buf := make([]byte, 1024, 1024)
- for {
- n, err := r.Read(buf[:])
- if n > 0 {
- d := buf[:n]
- out = append(out, d...)
- if strings.Index(string(d),"Saving")>-1{
- continue
- }
- progress <- string(d)
- }
- if err != nil {
- //log.Println(err)
- // Read returns io.EOF at the end of file, which is not an error for us
- if err == io.EOF {
- err = nil
- }
- return out, err
- }
- }
- // never reached
- panic(true)
- return nil, nil
- }
-
-
|