First prototype

The prototype should be functional, although there might be some bugs.
It should be able to retrieve a list of thread IDs in a Google Group,
and use it to download all the whole threads as returned by the server
(in pb+json format).

Change-Id: I434321a2db96a6e983d27b69b86efd5abebe7618
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..61065af
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+threads/
+threads.txt
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..f012df5
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,3 @@
+module gomodules.avm99963.com/google-groups-takeout
+
+go 1.16
diff --git a/groups_takeout.go b/groups_takeout.go
new file mode 100644
index 0000000..2be8801
--- /dev/null
+++ b/groups_takeout.go
@@ -0,0 +1,440 @@
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"math/rand"
+	"net/http"
+	"net/url"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+)
+
+const DefaultBL = "boq_groupsfrontendserver_20220224.07_p0"
+
+// IDs for API requests:
+const RPCIDListConversations = "Dq0xse"       // /GroupsFrontendConversationService.ListConversations
+const RPCIDListConversationMessages = "H08Fi" // /GroupsFrontendConversationService.ListConversationMessages
+
+var (
+	group         = flag.String("group", "", "Email of the group you want to export.")
+	getList       = flag.Bool("getList", false, "Get a list of threads and write it to the file specified in --file (one of --getList or --getThreads is required).")
+	getThreads    = flag.Bool("getThreads", false, "Retrieve all the threads specified in the thread list passed via STDIN (one of --getList or --getThreads is required).")
+	fileName      = flag.String("file", "threads.txt", "File where thread IDs will be written when running with --getList.")
+	folderName    = flag.String("folder", "threads", "Folder where threads will be saved when running with --getThreads.")
+	authenticated = flag.Bool("authenticated", false, "Whether you want to take out the forum with authentication.")
+	cookies       = flag.String("cookies", "", "Cookies (if you want to take out the forum authenticated).")
+	fsid          = flag.String("fsid", "", "f.sid value (if you want to take out the forum authenticated).")
+	at            = flag.String("at", "", "at value (if you want to take out the forum authenticated).")
+	realCookies   []*http.Cookie
+
+	reqId = rand.Intn(999999)
+)
+
+type Request struct {
+	Rpc     string // RPC ID
+	Request string // Request encoded as a string
+}
+
+type Response struct {
+	Rpc   string // RPC ID
+	Data  string // Data
+	Index string // Order index (can be a number encoded as a string or "generic")
+	Ok    bool   // Whether the request finshed successfully and the data is thus filled
+}
+
+type ConversationListResponse struct {
+	PaginationToken string   // Next page token
+	IDs             []string // List with thread IDs
+}
+
+type ConversationMessagesResponse struct {
+	PaginationToken string // Next page token
+	Data            string // Thread data encoded as PB+JSON
+}
+
+func batchRequest(requests *[]Request) (*[]Response, error) {
+	var requestsArray [][]interface{}
+	var RPCIdsSlice []string
+	for i, r := range *requests {
+		requestArray := make([]interface{}, 4)
+		requestArray[0] = r.Rpc
+		requestArray[1] = r.Request
+		requestArray[2] = nil
+		requestArray[3] = strconv.Itoa(i + 1)
+		requestsArray = append(requestsArray, requestArray)
+		RPCIdsSlice = append(RPCIdsSlice, r.Rpc)
+	}
+	freq, err := json.Marshal(requestsArray)
+	if err != nil {
+		return nil, err
+	}
+	freqString := "[" + string(freq) + "]"
+
+	v := url.Values{}
+	v.Set("f.req", freqString)
+	if *authenticated {
+		v.Set("at", *at)
+	}
+	RPCIds := url.QueryEscape(strings.Join(RPCIdsSlice, ","))
+	reqUrl := "https://groups.google.com/_/GroupsFrontendUi/data/batchexecute?rpcids=" + RPCIds + "&bl=" + DefaultBL + "&hl=en&_reqid=" + strconv.Itoa(reqId)
+	if *authenticated {
+		reqUrl += "&f.sid=" + url.QueryEscape(*fsid)
+	}
+
+	req, err := http.NewRequest("POST", reqUrl, strings.NewReader(v.Encode()))
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+	if *authenticated {
+		for _, c := range realCookies {
+			req.AddCookie(c)
+		}
+	}
+	c := &http.Client{}
+	resp, err := c.Do(req)
+	reqId += 100000
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("Status code is %v", resp.StatusCode)
+	}
+
+	var respBody [][]interface{}
+	io.CopyN(ioutil.Discard, resp.Body, 6) // Discard first 6 bytes
+	err = json.NewDecoder(resp.Body).Decode(&respBody)
+	if err != nil {
+		return nil, err
+	}
+
+	var responses []Response
+	for _, r := range respBody {
+		if len(r) < 7 || r[0] != "wrb.fr" {
+			continue
+		}
+
+		rpc, ok1 := r[1].(string)
+		data, ok2 := r[2].(string)
+		index, ok3 := r[6].(string)
+		if !ok1 {
+			return nil, fmt.Errorf("Couldn't parse the response (expected a string with the rpc ID).")
+		}
+
+		var response Response
+		if !ok2 || !ok3 {
+			response = Response{
+				Rpc: rpc,
+				Ok:  false,
+			}
+		} else {
+			response = Response{
+				Rpc:   rpc,
+				Data:  data,
+				Index: index,
+				Ok:    true,
+			}
+		}
+		responses = append(responses, response)
+	}
+
+	return &responses, nil
+}
+
+func getConversations(group string, paginationToken string, num int) (*ConversationListResponse, error) {
+	request := make([]interface{}, 3)
+	request[0] = group
+	request[1] = num
+	request[2] = paginationToken
+	reqText, err := json.Marshal(request)
+	if err != nil {
+		return nil, err
+	}
+
+	requests := []Request{
+		Request{
+			Rpc:     RPCIDListConversations,
+			Request: string(reqText),
+		},
+	}
+
+	resp, err := batchRequest(&requests)
+	if err != nil {
+		return nil, fmt.Errorf("An error occurred while requesting the conversation list: %v\n", err)
+	}
+
+	for _, r := range *resp {
+		if r.Rpc == RPCIDListConversations {
+			if !r.Ok {
+				return nil, fmt.Errorf("The server didn't fulfill the request successfully (maybe you don't have permission to view the group?)")
+			}
+
+			var body []interface{}
+			err = json.Unmarshal([]byte(r.Data), &body)
+			if err != nil {
+				return nil, fmt.Errorf("While parsing conversation list response: %v", err)
+			}
+			if len(body) < 3 {
+				return nil, fmt.Errorf("While parsing conversation list response: body isn't long enough")
+			}
+
+			var resp ConversationListResponse
+
+			// Retrieve thread IDs
+			var IDs []string
+			threads, ok := body[2].([]interface{})
+			if !ok {
+				return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2] should be an array).")
+			}
+			for _, t := range threads {
+				ta, ok := t.([]interface{})
+				if !ok {
+					return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i] should be an array).")
+				}
+				if len(ta) < 1 {
+					return nil, fmt.Errorf("While parsing conversation list response: thread isn't long enough")
+				}
+				info, ok := ta[0].([]interface{})
+				if !ok {
+					return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i] should be an array).")
+				}
+				if len(info) < 2 {
+					return nil, fmt.Errorf("While parsing conversation list response: thread info isn't long enough")
+				}
+				threadId, ok := info[1].(string)
+				if !ok {
+					return nil, fmt.Errorf("The conversation list response doesn't comply with the protobuf model we have seen (body[2][i][0][1] should be a string).")
+				}
+				IDs = append(IDs, threadId)
+			}
+			resp.IDs = IDs
+
+			// Retrieve pagination token
+			if len(body) >= 4 {
+				paginationToken, ok := body[3].(string)
+				if ok {
+					resp.PaginationToken = paginationToken
+				}
+			}
+
+			return &resp, nil
+		}
+	}
+
+	return nil, fmt.Errorf("The server didn't return the conversations list correctly, or we couldn't find it.")
+}
+
+func getAllConversations(group string) (*[]string, error) {
+	paginationToken := ""
+	totalRetrieved := 0
+	var IDs []string
+	for {
+		resp, err := getConversations(group, paginationToken, 50)
+		if err != nil {
+			return nil, err
+		}
+		totalRetrieved += len(resp.IDs)
+		log.Printf("Retrieved %v posts (total: %v)...\n", len(resp.IDs), totalRetrieved)
+
+		IDs = append(IDs, resp.IDs...)
+
+		if resp.PaginationToken == "" {
+			break
+		}
+		paginationToken = resp.PaginationToken
+		time.Sleep(time.Second) // Sleep for a second to prevent overwhelming the server
+	}
+	return &IDs, nil
+}
+
+func getConversation(group string, id string, paginationToken string, num int) (*ConversationMessagesResponse, error) {
+	request := make([]interface{}, 4)
+	request[0] = group
+	request[1] = id
+	if paginationToken == "" {
+		request[2] = num
+		request[3] = nil
+		request = append(request, nil, 2)
+	} else {
+		request[2] = nil
+		request[3] = paginationToken
+	}
+	reqText, err := json.Marshal(request)
+	if err != nil {
+		return nil, err
+	}
+
+	requests := []Request{
+		Request{
+			Rpc:     RPCIDListConversationMessages,
+			Request: string(reqText),
+		},
+	}
+
+	resp, err := batchRequest(&requests)
+	if err != nil {
+		return nil, fmt.Errorf("An error occurred while requesting the conversation messages: %v\n", err)
+	}
+
+	for _, r := range *resp {
+		if r.Rpc == RPCIDListConversationMessages {
+			if !r.Ok {
+				return nil, fmt.Errorf("The server didn't fulfill the request successfully (maybe you don't have permission to view the group?)")
+			}
+
+			if r.Data == "" || r.Data == "[]" {
+				return nil, fmt.Errorf("No data was returned for the thread.")
+			}
+
+			var resp ConversationMessagesResponse
+			resp.Data = r.Data
+
+			// Get pagination token
+			var body []interface{}
+			err = json.Unmarshal([]byte(r.Data), &body)
+			if err != nil {
+				return nil, fmt.Errorf("While parsing conversation list response: %v", err)
+			}
+
+			if len(body) >= 4 {
+				paginationToken, ok := body[3].(string)
+				if ok {
+					resp.PaginationToken = paginationToken
+				}
+			}
+
+			return &resp, nil
+		}
+	}
+
+	return nil, fmt.Errorf("The server didn't return the conversations list correctly, or we couldn't find it.")
+}
+
+func downloadThread(group string, id string, chFailedIDs chan string, chFinishedIDs chan string) {
+	i := 0
+	paginationToken := ""
+	for {
+		resp, err := getConversation(group, id, paginationToken, 100)
+		if err != nil {
+			log.Printf("Error downloading thread %v: %v", id, err)
+			chFailedIDs <- id
+			return
+		}
+
+		name := id + ".json"
+		if i > 0 {
+			name = id + "_" + strconv.Itoa(i) + ".json"
+		}
+		fullName := *folderName + "/" + name
+
+		err = os.WriteFile(fullName, []byte(resp.Data), 0644)
+		if err != nil {
+			log.Printf("Error downloading thread %v: couldn't write file \"%v\": %v", id, fullName, err)
+		}
+
+		if resp.PaginationToken == "" {
+			break
+		}
+		paginationToken = resp.PaginationToken
+		i++
+	}
+
+	chFinishedIDs <- id
+}
+
+func queueDownloadIfNeeded(group string, chFailedIDs chan string, chFinishedIDs chan string, IDs *[]string, nextIndex *int) {
+	if *nextIndex < len(*IDs) {
+		time.Sleep(50 * time.Millisecond)
+		go downloadThread(group, (*IDs)[*nextIndex], chFailedIDs, chFinishedIDs)
+		*nextIndex++
+	}
+}
+
+func main() {
+	flag.Parse()
+	if *group == "" {
+		log.Fatalln("A Google Group wasn't provided via the \"--group\" flag.")
+	}
+
+	if (*getList && *getThreads) || (!*getList && !*getThreads) {
+		log.Fatalln("Please specify one of --getList or --getThreads (but not both).")
+	}
+
+	if *authenticated {
+		if *cookies == "" || *fsid == "" || *at == "" {
+			log.Fatalln("If you specify --authenticated, you should also specify --cookies, --fsid and --at.")
+		}
+
+		rawRequest := fmt.Sprintf("GET / HTTP/1.0\nCookie: %s\n\n", *cookies)
+		req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(rawRequest)))
+		if err == nil {
+			realCookies = req.Cookies()
+		}
+	}
+
+	if *getList {
+		log.Printf("Getting list of thread IDs for group %s...\n", *group)
+
+		file, err := os.Create(*fileName)
+		if err != nil {
+			log.Fatalf("Couldn't create file \"%v\"", *fileName)
+		}
+
+		// Get a list of conversation IDs
+		convs, err := getAllConversations(*group)
+		if err != nil {
+			log.Fatalf("Error calling getAllConversations: %v\n", err)
+		}
+
+		// Save those to the file, one by line
+		for _, id := range *convs {
+			io.WriteString(file, id+"\n")
+		}
+	}
+
+	if *getThreads {
+		log.Printf("Starting actual takeout for group %s...\n", *group)
+		scanner := bufio.NewScanner(os.Stdin)
+		var IDs []string
+		for scanner.Scan() {
+			id := scanner.Text()
+			IDs = append(IDs, id)
+		}
+		log.Printf("Total: %v threads. Beginning to download them...\n", len(IDs))
+
+		chFailedIDs := make(chan string)
+		chFinishedIDs := make(chan string)
+
+		nextIndex := -1
+		for i, id := range IDs {
+			go downloadThread(*group, id, chFailedIDs, chFinishedIDs)
+			nextIndex = i
+			if i > 10 {
+				break
+			}
+		}
+
+		failedThreads := make([]string, 0)
+		for i := 0; i < len(IDs); i++ {
+			select {
+			case id := <-chFailedIDs:
+				failedThreads = append(failedThreads, id)
+				queueDownloadIfNeeded(*group, chFailedIDs, chFinishedIDs, &IDs, &nextIndex)
+			case id := <-chFinishedIDs:
+				log.Printf("Finished downloading thread %v successfully\n", id)
+				queueDownloadIfNeeded(*group, chFailedIDs, chFinishedIDs, &IDs, &nextIndex)
+			}
+		}
+
+		log.Printf("Failed threads: %v", failedThreads)
+	}
+}