First prototype
The prototype should be functional, although there might be some bugs.
It should be able to retrieve a list of thread IDs in a Google Group,
and use it to download all the whole threads as returned by the server
(in pb+json format).
Change-Id: I434321a2db96a6e983d27b69b86efd5abebe7618
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..61065af
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+threads/
+threads.txt
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..f012df5
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,3 @@
+module gomodules.avm99963.com/google-groups-takeout
+
+go 1.16
diff --git a/groups_takeout.go b/groups_takeout.go
new file mode 100644
index 0000000..2be8801
--- /dev/null
+++ b/groups_takeout.go
@@ -0,0 +1,440 @@
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const DefaultBL = "boq_groupsfrontendserver_20220224.07_p0"
+
+// IDs for API requests:
+const RPCIDListConversations = "Dq0xse" // /GroupsFrontendConversationService.ListConversations
+const RPCIDListConversationMessages = "H08Fi" // /GroupsFrontendConversationService.ListConversationMessages
+
+var (
+ group = flag.String("group", "", "Email of the group you want to export.")
+ getList = flag.Bool("getList", false, "Get a list of threads and write it to the file specified in --file (one of --getList or --getThreads is required).")
+ getThreads = flag.Bool("getThreads", false, "Retrieve all the threads specified in the thread list passed via STDIN (one of --getList or --getThreads is required).")
+ fileName = flag.String("file", "threads.txt", "File where thread IDs will be written when running with --getList.")
+ folderName = flag.String("folder", "threads", "Folder where threads will be saved when running with --getThreads.")
+ authenticated = flag.Bool("authenticated", false, "Whether you want to take out the forum with authentication.")
+ cookies = flag.String("cookies", "", "Cookies (if you want to take out the forum authenticated).")
+ fsid = flag.String("fsid", "", "f.sid value (if you want to take out the forum authenticated).")
+ at = flag.String("at", "", "at value (if you want to take out the forum authenticated).")
+ realCookies []*http.Cookie
+
+ reqId = rand.Intn(999999)
+)
+
+type Request struct {
+ Rpc string // RPC ID
+ Request string // Request encoded as a string
+}
+
+type Response struct {
+ Rpc string // RPC ID
+ Data string // Data
+ Index string // Order index (can be a number encoded as a string or "generic")
+ Ok bool // Whether the request finshed successfully and the data is thus filled
+}
+
+type ConversationListResponse struct {
+ PaginationToken string // Next page token
+ IDs []string // List with thread IDs
+}
+
+type ConversationMessagesResponse struct {
+ PaginationToken string // Next page token
+ Data string // Thread data encoded as PB+JSON
+}
+
+func batchRequest(requests *[]Request) (*[]Response, error) {
+ var requestsArray [][]interface{}
+ var RPCIdsSlice []string
+ for i, r := range *requests {
+ requestArray := make([]interface{}, 4)
+ requestArray[0] = r.Rpc
+ requestArray[1] = r.Request
+ requestArray[2] = nil
+ requestArray[3] = strconv.Itoa(i + 1)
+ requestsArray = append(requestsArray, requestArray)
+ RPCIdsSlice = append(RPCIdsSlice, r.Rpc)
+ }
+ freq, err := json.Marshal(requestsArray)
+ if err != nil {
+ return nil, err
+ }
+ freqString := "[" + string(freq) + "]"
+
+ v := url.Values{}
+ v.Set("f.req", freqString)
+ if *authenticated {
+ v.Set("at", *at)
+ }
+ RPCIds := url.QueryEscape(strings.Join(RPCIdsSlice, ","))
+ reqUrl := "https://groups.google.com/_/GroupsFrontendUi/data/batchexecute?rpcids=" + RPCIds + "&bl=" + DefaultBL + "&hl=en&_reqid=" + strconv.Itoa(reqId)
+ if *authenticated {
+ reqUrl += "&f.sid=" + url.QueryEscape(*fsid)
+ }
+
+ req, err := http.NewRequest("POST", reqUrl, strings.NewReader(v.Encode()))
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ if *authenticated {
+ for _, c := range realCookies {
+ req.AddCookie(c)
+ }
+ }
+ c := &http.Client{}
+ resp, err := c.Do(req)
+ reqId += 100000
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("Status code is %v", resp.StatusCode)
+ }
+
+ var respBody [][]interface{}
+ io.CopyN(ioutil.Discard, resp.Body, 6) // Discard first 6 bytes
+ err = json.NewDecoder(resp.Body).Decode(&respBody)
+ if err != nil {
+ return nil, err
+ }
+
+ var responses []Response
+ for _, r := range respBody {
+ if len(r) < 7 || r[0] != "wrb.fr" {
+ continue
+ }
+
+ rpc, ok1 := r[1].(string)
+ data, ok2 := r[2].(string)
+ index, ok3 := r[6].(string)
+ if !ok1 {
+ return nil, fmt.Errorf("Couldn't parse the response (expected a string with the rpc ID).")
+ }
+
+ var response Response
+ if !ok2 || !ok3 {
+ response = Response{
+ Rpc: rpc,
+ Ok: false,
+ }
+ } else {
+ response = Response{
+ Rpc: rpc,
+ Data: data,
+ Index: index,
+ Ok: true,
+ }
+ }
+ responses = append(responses, response)
+ }
+
+ return &responses, nil
+}
+
+func getConversations(group string, paginationToken string, num int) (*ConversationListResponse, error) {
+ request := make([]interface{}, 3)
+ request[0] = group
+ request[1] = num
+ request[2] = paginationToken
+ reqText, err := json.Marshal(request)
+ if err != nil {
+ return nil, err
+ }
+
+ requests := []Request{
+ Request{
+ Rpc: RPCIDListConversations,
+ Request: string(reqText),
+ },
+ }
+
+ resp, err := batchRequest(&requests)
+ if err != nil {
+ return nil, fmt.Errorf("An error occurred while requesting the conversation list: %v\n", err)
+ }
+
+ for _, r := range *resp {
+ if r.Rpc == RPCIDListConversations {
+ if !r.Ok {
+ return nil, fmt.Errorf("The server didn't fulfill the request successfully (maybe you don't have permission to view the group?)")
+ }
+
+ var body []interface{}
+ err = json.Unmarshal([]byte(r.Data), &body)
+ if err != nil {
+ return nil, fmt.Errorf("While parsing conversation list response: %v", err)
+ }
+ if len(body) < 3 {
+ return nil, fmt.Errorf("While parsing conversation list response: body isn't long enough")
+ }
+
+ var resp ConversationListResponse
+
+ // Retrieve thread IDs
+ var IDs []string
+ threads, ok := body[2].([]interface{})
+ if !ok {
+ return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2] should be an array).")
+ }
+ for _, t := range threads {
+ ta, ok := t.([]interface{})
+ if !ok {
+ return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i] should be an array).")
+ }
+ if len(ta) < 1 {
+ return nil, fmt.Errorf("While parsing conversation list response: thread isn't long enough")
+ }
+ info, ok := ta[0].([]interface{})
+ if !ok {
+ return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i] should be an array).")
+ }
+ if len(info) < 2 {
+ return nil, fmt.Errorf("While parsing conversation list response: thread info isn't long enough")
+ }
+ threadId, ok := info[1].(string)
+ if !ok {
+ return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i][0][1] should be a string).")
+ }
+ IDs = append(IDs, threadId)
+ }
+ resp.IDs = IDs
+
+ // Retrieve pagination token
+ if len(body) >= 4 {
+ paginationToken, ok := body[3].(string)
+ if ok {
+ resp.PaginationToken = paginationToken
+ }
+ }
+
+ return &resp, nil
+ }
+ }
+
+ return nil, fmt.Errorf("The server didn't return the conversations list correctly, or we couldn't find it.")
+}
+
+func getAllConversations(group string) (*[]string, error) {
+ paginationToken := ""
+ totalRetrieved := 0
+ var IDs []string
+ for {
+ resp, err := getConversations(group, paginationToken, 50)
+ if err != nil {
+ return nil, err
+ }
+ totalRetrieved += len(resp.IDs)
+ log.Printf("Retrieved %v posts (total: %v)...\n", len(resp.IDs), totalRetrieved)
+
+ IDs = append(IDs, resp.IDs...)
+
+ if resp.PaginationToken == "" {
+ break
+ }
+ paginationToken = resp.PaginationToken
+ time.Sleep(time.Second) // Sleep for a second to prevent overwhelming the server
+ }
+ return &IDs, nil
+}
+
+func getConversation(group string, id string, paginationToken string, num int) (*ConversationMessagesResponse, error) {
+ request := make([]interface{}, 4)
+ request[0] = group
+ request[1] = id
+ if paginationToken == "" {
+ request[2] = num
+ request[3] = nil
+ request = append(request, nil, 2)
+ } else {
+ request[2] = nil
+ request[3] = paginationToken
+ }
+ reqText, err := json.Marshal(request)
+ if err != nil {
+ return nil, err
+ }
+
+ requests := []Request{
+ Request{
+ Rpc: RPCIDListConversationMessages,
+ Request: string(reqText),
+ },
+ }
+
+ resp, err := batchRequest(&requests)
+ if err != nil {
+ return nil, fmt.Errorf("An error occurred while requesting the conversation messages: %v\n", err)
+ }
+
+ for _, r := range *resp {
+ if r.Rpc == RPCIDListConversationMessages {
+ if !r.Ok {
+ return nil, fmt.Errorf("The server didn't fulfill the request successfully (maybe you don't have permission to view the group?)")
+ }
+
+ if r.Data == "" || r.Data == "[]" {
+ return nil, fmt.Errorf("No data was returned for the thread.")
+ }
+
+ var resp ConversationMessagesResponse
+ resp.Data = r.Data
+
+ // Get pagination token
+ var body []interface{}
+ err = json.Unmarshal([]byte(r.Data), &body)
+ if err != nil {
+ return nil, fmt.Errorf("While parsing conversation list response: %v", err)
+ }
+
+ if len(body) >= 4 {
+ paginationToken, ok := body[3].(string)
+ if ok {
+ resp.PaginationToken = paginationToken
+ }
+ }
+
+ return &resp, nil
+ }
+ }
+
+ return nil, fmt.Errorf("The server didn't return the conversations list correctly, or we couldn't find it.")
+}
+
+func downloadThread(group string, id string, chFailedIDs chan string, chFinishedIDs chan string) {
+ i := 0
+ paginationToken := ""
+ for {
+ resp, err := getConversation(group, id, paginationToken, 100)
+ if err != nil {
+ log.Printf("Error downloading thread %v: %v", id, err)
+ chFailedIDs <- id
+ return
+ }
+
+ name := id + ".json"
+ if i > 0 {
+ name = id + "_" + strconv.Itoa(i) + ".json"
+ }
+ fullName := *folderName + "/" + name
+
+ err = os.WriteFile(fullName, []byte(resp.Data), 0644)
+ if err != nil {
+ log.Printf("Error downloading thread %v: couldn't write file \"%v\": %v", id, fullName, err)
+ }
+
+ if resp.PaginationToken == "" {
+ break
+ }
+ paginationToken = resp.PaginationToken
+ i++
+ }
+
+ chFinishedIDs <- id
+}
+
+func queueDownloadIfNeeded(group string, chFailedIDs chan string, chFinishedIDs chan string, IDs *[]string, nextIndex *int) {
+ if *nextIndex < len(*IDs) {
+ time.Sleep(50 * time.Millisecond)
+ go downloadThread(group, (*IDs)[*nextIndex], chFailedIDs, chFinishedIDs)
+ *nextIndex++
+ }
+}
+
+func main() {
+ flag.Parse()
+ if *group == "" {
+ log.Fatalln("A Google Group wasn't provided via the \"--group\" flag.")
+ }
+
+ if (*getList && *getThreads) || (!*getList && !*getThreads) {
+ log.Fatalln("Please specify one of --getList or --getThreads (but not both).")
+ }
+
+ if *authenticated {
+ if *cookies == "" || *fsid == "" || *at == "" {
+ log.Fatalln("If you specify --authenticated, you should also specify --cookies, --fsid and --at.")
+ }
+
+ rawRequest := fmt.Sprintf("GET / HTTP/1.0\nCookie: %s\n\n", *cookies)
+ req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(rawRequest)))
+ if err == nil {
+ realCookies = req.Cookies()
+ }
+ }
+
+ if *getList {
+ log.Printf("Getting list of thread IDs for group %s...\n", *group)
+
+ file, err := os.Create(*fileName)
+ if err != nil {
+ log.Fatalf("Couldn't create file \"%v\"", *fileName)
+ }
+
+ // Get a list of conversation IDs
+ convs, err := getAllConversations(*group)
+ if err != nil {
+ log.Fatalf("Error calling getAllConversations: %v\n", err)
+ }
+
+ // Save those to the file, one by line
+ for _, id := range *convs {
+ io.WriteString(file, id+"\n")
+ }
+ }
+
+ if *getThreads {
+ log.Printf("Starting actual takeout for group %s...\n", *group)
+ scanner := bufio.NewScanner(os.Stdin)
+ var IDs []string
+ for scanner.Scan() {
+ id := scanner.Text()
+ IDs = append(IDs, id)
+ }
+ log.Printf("Total: %v threads. Beginning to download them...\n", len(IDs))
+
+ chFailedIDs := make(chan string)
+ chFinishedIDs := make(chan string)
+
+ nextIndex := -1
+ for i, id := range IDs {
+ go downloadThread(*group, id, chFailedIDs, chFinishedIDs)
+ nextIndex = i
+ if i > 10 {
+ break
+ }
+ }
+
+ failedThreads := make([]string, 0)
+ for i := 0; i < len(IDs); i++ {
+ select {
+ case id := <-chFailedIDs:
+ failedThreads = append(failedThreads, id)
+ queueDownloadIfNeeded(*group, chFailedIDs, chFinishedIDs, &IDs, &nextIndex)
+ case id := <-chFinishedIDs:
+ log.Printf("Finished downloading thread %v successfully\n", id)
+ queueDownloadIfNeeded(*group, chFailedIDs, chFinishedIDs, &IDs, &nextIndex)
+ }
+ }
+
+ log.Printf("Failed threads: %v", failedThreads)
+ }
+}