mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-02 12:17:34 +08:00
111 lines
3.1 KiB
Go
111 lines
3.1 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
//"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"strconv"
|
|
"sync"
|
|
"os"
|
|
//"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
//"github.com/aws/aws-sdk-go/aws/credentials"
|
|
//"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
//"github.com/pingcap/tidb/config"
|
|
"github.com/pingcap/tidb/store/tikv"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var rootCmd = &cobra.Command{
|
|
Use: "hbr",
|
|
Short: "He3DB backup&restore",
|
|
Long: "Welcome to use hbr for He3DB backup&restore",
|
|
Run: runRoot,
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
var concurrency int
|
|
|
|
func init() {
|
|
rootCmd.PersistentFlags().String("access_key", "", "S3 Access Key")
|
|
rootCmd.PersistentFlags().String("secret_key", "", "S3 Secret Key")
|
|
rootCmd.PersistentFlags().String("endpoint", "", "S3 endpoint")
|
|
rootCmd.PersistentFlags().String("region", "", "S3 region")
|
|
rootCmd.PersistentFlags().String("bucket", "", "S3 bucket")
|
|
rootCmd.PersistentFlags().String("pd", "http://127.0.0.1:2379", "Tikv placement driber")
|
|
rootCmd.PersistentFlags().String("name", "", "Backup name")
|
|
rootCmd.PersistentFlags().String("archive_start_file", "000000010000000000000001", "start key of archive[included]")
|
|
rootCmd.PersistentFlags().String("archive_start_time_line", "0000000000000001", "start time line of archive[included]")
|
|
rootCmd.PersistentFlags().String("archive_start_lsn", "0000000000000000", "start lsn of archive[included]")
|
|
rootCmd.PersistentFlags().IntVar(&concurrency, "concurrency", 100, "concurrency")
|
|
}
|
|
|
|
func Execute() {
|
|
if err := rootCmd.Execute(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func runRoot(cmd *cobra.Command, args []string) {
|
|
fmt.Printf("Welcome to use hbr for He3DB backup&restore\n")
|
|
}
|
|
|
|
func s3PutKV(s3_client *s3.S3, bucket string, backup_name string, filename string, v []byte, sem chan bool) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
<-sem
|
|
}()
|
|
_, err := s3_client.PutObject(&s3.PutObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(backup_name + "/" + filename),
|
|
Body: bytes.NewReader(v),
|
|
})
|
|
if err != nil {
|
|
fmt.Printf("S3 PutObject Error!\n%v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
//fmt.Printf("S3 PutObject!\n")
|
|
}
|
|
|
|
func s3RestoreKVRaw(s3_client *s3.S3, bucket string, backup_name string, keys *s3.Object, client *tikv.RawKVClient, sem chan bool) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
<-sem
|
|
}()
|
|
|
|
out, err := s3_client.GetObject(&s3.GetObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(*keys.Key),
|
|
})
|
|
if err != nil {
|
|
fmt.Printf("S3 GetObject Error!\n%v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer out.Body.Close()
|
|
|
|
data, err := ioutil.ReadAll(out.Body)
|
|
if err != nil {
|
|
fmt.Printf("out.Body.Read!\n%v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
fmt.Printf("filename:%s\n", (*keys.Key)[len(backup_name)+1:])
|
|
|
|
ret := make([]byte, (len(*keys.Key)-len(backup_name)-1)/2)
|
|
index := 0
|
|
for i := len(backup_name) + 1; i < len(*keys.Key); i += 2 {
|
|
value, _ := strconv.ParseUint((*keys.Key)[i:i+2], 16, 8)
|
|
ret[index] = byte(0xff & value)
|
|
index++
|
|
|
|
}
|
|
|
|
if err := client.Put(ret, data); err != nil {
|
|
fmt.Printf("Tikv Set Error!\n%v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|