Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/cmd_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ var (
flagEnvFile string
flagCreateDebugSession bool
flagLocal bool
flagLocalGhServer bool

finalConfigFile string
finalConcurrency string
finalSessionToken string
finalConfigValueSource string
finalCreateDebugSession bool
finalLocal bool
finalLocalGhServer bool

finalGraphFile string
finalGraphArgs []string
Expand Down Expand Up @@ -112,6 +114,7 @@ var cmdRoot = &cobra.Command{
finalCreateDebugSession = finalCreateDebugSessionStr == "true" || finalCreateDebugSessionStr == "1"

finalLocal = flagLocal
finalLocalGhServer = flagLocalGhServer

// the block below is used to distinguish between implicit graph files (eg if defined in an env var) + graph flags
// vs explicit graph file (eg provided by positional arg) + graph flags.
Expand Down Expand Up @@ -201,6 +204,7 @@ func cmdRootRun(cmd *cobra.Command, args []string) {
OverrideSecrets: nil,
OverrideInputs: nil,
Args: finalGraphArgs,
LocalGhServer: finalLocalGhServer,
}

if core.IsSharedGraphURL(finalGraphFile) {
Expand Down Expand Up @@ -253,6 +257,7 @@ func init() {
cmdRoot.Flags().StringVar(&flagSessionToken, "session-token", "", "The session token from your browser")
cmdRoot.Flags().BoolVar(&flagCreateDebugSession, "create-debug-session", false, "Create a debug session by connecting to the web app")
cmdRoot.Flags().BoolVar(&flagLocal, "local", false, "Start a local WebSocket server for direct editor connection")
cmdRoot.Flags().BoolVar(&flagLocalGhServer, "local-gh-server", false, "Start a local server mimicking GitHub Actions artifact, cache, and OIDC services")

// disable interspersed flag parsing to allow passing arbitrary flags to graphs.
// it stops cobra from parsing flags once it hits positional argument
Expand Down
17 changes: 17 additions & 0 deletions core/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

"github.com/actionforge/actrun-cli/github/server"
"github.com/actionforge/actrun-cli/utils"
"github.com/google/uuid"

Expand All @@ -30,6 +31,7 @@ type RunOpts struct {
OverrideInputs map[string]any
OverrideEnv map[string]string
Args []string
LocalGhServer bool
}

type ActionGraph struct {
Expand Down Expand Up @@ -471,6 +473,21 @@ func RunGraph(ctx context.Context, graphName string, graphContent []byte, opts R
return CreateErr(nil, err, "failed to setup GitHub Actions environment")
}

if opts.LocalGhServer {
// RUNNER_TEMP is provided by the local editor over a 127.0.0.1-only WebSocket; not an external input.
storageDir, mkErr := os.MkdirTemp(finalEnv["RUNNER_TEMP"], "gh-server-storage-") // lgtm[go/path-injection]
if mkErr != nil {
return CreateErr(nil, mkErr, "failed to create storage directory for local GitHub Actions server")
}
rs, srvErr := server.StartServer(server.Config{StorageDir: storageDir})
if srvErr != nil {
return CreateErr(nil, srvErr, "failed to start local GitHub Actions server")
}
defer rs.Stop()
rs.InjectEnv(finalEnv)
utils.LogOut.Infof("local GitHub Actions server started at %s\n", rs.URL)
}

// Use the updated GITHUB_WORKSPACE as the working directory.
// SetupGitHubActionsEnv replaces GITHUB_WORKSPACE with a fresh temp folder.
if cwd, ok := finalEnv["GITHUB_WORKSPACE"]; ok {
Expand Down
298 changes: 298 additions & 0 deletions github/server/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
package server

import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)

type CacheEntry struct {
ID int64
Key string
Version string
Scope string
Size int64
Finalized bool
CreatedAt time.Time
}

// --- Twirp dispatcher ---

func (s *Server) handleCacheTwirp(w http.ResponseWriter, r *http.Request) {
if ct := r.Header.Get("Content-Type"); !strings.HasPrefix(ct, "application/json") {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "Content-Type must be application/json")
return
}

if _, _, err := parseJWT(r.Header.Get("Authorization")); err != nil {
writeTwirpError(w, http.StatusUnauthorized, "unauthenticated", err.Error())
return
}

method := r.PathValue("method")
switch method {
case "CreateCacheEntry":
s.handleCreateCacheEntry(w, r)
case "FinalizeCacheEntryUpload":
s.handleFinalizeCacheEntry(w, r)
case "GetCacheEntryDownloadURL":
s.handleGetCacheEntryDownloadURL(w, r)
case "DeleteCacheEntry":
s.handleDeleteCacheEntry(w, r)
default:
writeTwirpError(w, http.StatusNotFound, "not_found", fmt.Sprintf("unknown method: %s", method))
}
}

// --- Request/Response types ---

type CacheMetadata struct {
RepositoryID string `json:"repository_id"`
Scope string `json:"scope"`
}

type CreateCacheEntryRequest struct {
Key string `json:"key"`
Version string `json:"version"`
Metadata *CacheMetadata `json:"metadata,omitempty"`
}

type CreateCacheEntryResponse struct {
Ok bool `json:"ok"`
SignedUploadURL string `json:"signed_upload_url"`
}

// FlexInt64 unmarshals from both JSON numbers and JSON strings.
// Protobuf's canonical JSON encoding represents int64 as strings.
type FlexInt64 int64

func (f *FlexInt64) UnmarshalJSON(data []byte) error {
var n int64
if err := json.Unmarshal(data, &n); err == nil {
*f = FlexInt64(n)
return nil
}
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("FlexInt64: cannot unmarshal %s", string(data))
}
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return fmt.Errorf("FlexInt64: invalid int64 string %q: %w", s, err)
}
*f = FlexInt64(n)
return nil
}

type FinalizeCacheEntryRequest struct {
Key string `json:"key"`
Version string `json:"version"`
SizeBytes FlexInt64 `json:"size_bytes"`
}

type FinalizeCacheEntryResponse struct {
Ok bool `json:"ok"`
EntryID string `json:"entry_id"`
}

type GetCacheEntryDownloadURLRequest struct {
Metadata *CacheMetadata `json:"metadata,omitempty"`
Key string `json:"key"`
RestoreKeys []string `json:"restore_keys,omitempty"`
Version string `json:"version"`
}

type GetCacheEntryDownloadURLResponse struct {
Ok bool `json:"ok"`
SignedDownloadURL string `json:"signed_download_url"`
MatchedKey string `json:"matched_key"`
}

type DeleteCacheEntryRequest struct {
Key string `json:"key"`
Version string `json:"version"`
}

type DeleteCacheEntryResponse struct {
Ok bool `json:"ok"`
}

// --- RPC handlers ---

func (s *Server) handleCreateCacheEntry(w http.ResponseWriter, r *http.Request) {
var req CreateCacheEntryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "invalid JSON")
return
}
if req.Key == "" || req.Version == "" {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "key and version are required")
return
}

scope := ""
if req.Metadata != nil {
scope = req.Metadata.Scope
}
cacheKey := scope + "/" + req.Key + "/" + req.Version

s.mu.Lock()
// If entry already exists, overwrite (caches are mutable)
if existing, ok := s.caches[cacheKey]; ok {
delete(s.cacheByID, existing.ID)
delete(s.uploadMu, existing.ID)
}
id := s.nextID
s.nextID++
entry := &CacheEntry{
ID: id,
Key: req.Key,
Version: req.Version,
Scope: scope,
CreatedAt: time.Now(),
}
s.caches[cacheKey] = entry
s.cacheByID[id] = entry
s.uploadMu[id] = &sync.Mutex{}
s.mu.Unlock()

uploadURL := s.makeSignedURL("PUT", id)
writeJSON(w, http.StatusOK, CreateCacheEntryResponse{
Ok: true,
SignedUploadURL: uploadURL,
})
}

func (s *Server) handleFinalizeCacheEntry(w http.ResponseWriter, r *http.Request) {
var req FinalizeCacheEntryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "invalid JSON")
return
}

s.mu.Lock()
var found *CacheEntry
for _, entry := range s.caches {
if entry.Key == req.Key && entry.Version == req.Version {
found = entry
break
}
}
if found == nil {
s.mu.Unlock()
writeTwirpError(w, http.StatusNotFound, "not_found", "cache entry not found")
return
}
found.Size = int64(req.SizeBytes)
found.Finalized = true
s.mu.Unlock()

writeJSON(w, http.StatusOK, FinalizeCacheEntryResponse{
Ok: true,
EntryID: strconv.FormatInt(found.ID, 10),
})
}

func (s *Server) handleGetCacheEntryDownloadURL(w http.ResponseWriter, r *http.Request) {
var req GetCacheEntryDownloadURLRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "invalid JSON")
return
}

scope := ""
if req.Metadata != nil {
scope = req.Metadata.Scope
}

type match struct {
id int64
key string
}

var found *match

s.mu.RLock()
// 1. Exact match: scope + key + version
exactKey := scope + "/" + req.Key + "/" + req.Version
if entry, ok := s.caches[exactKey]; ok && entry.Finalized {
found = &match{id: entry.ID, key: entry.Key}
}

// 2. Prefix match with restore_keys
if found == nil {
for _, rk := range req.RestoreKeys {
var best *CacheEntry
for _, entry := range s.caches {
if entry.Scope != scope || entry.Version != req.Version {
continue
}
if !entry.Finalized {
continue
}
if !strings.HasPrefix(entry.Key, rk) {
continue
}
if best == nil || entry.CreatedAt.After(best.CreatedAt) {
best = entry
}
}
if best != nil {
found = &match{id: best.ID, key: best.Key}
break
}
}
}
s.mu.RUnlock()

if found != nil {
downloadURL := s.makeSignedURL("GET", found.id)
writeJSON(w, http.StatusOK, GetCacheEntryDownloadURLResponse{
Ok: true,
SignedDownloadURL: downloadURL,
MatchedKey: found.key,
})
return
}

writeTwirpError(w, http.StatusNotFound, "not_found", "cache entry not found")
}

func (s *Server) handleDeleteCacheEntry(w http.ResponseWriter, r *http.Request) {
var req DeleteCacheEntryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeTwirpError(w, http.StatusBadRequest, "invalid_argument", "invalid JSON")
return
}

s.mu.Lock()
var found *CacheEntry
var foundKey string
for k, entry := range s.caches {
if entry.Key == req.Key && entry.Version == req.Version {
found = entry
foundKey = k
break
}
}
if found == nil {
s.mu.Unlock()
writeTwirpError(w, http.StatusNotFound, "not_found", "cache entry not found")
return
}
delete(s.caches, foundKey)
delete(s.cacheByID, found.ID)
delete(s.uploadMu, found.ID)
s.mu.Unlock()

blobPath := filepath.Join(s.storageDir, fmt.Sprintf("%d.blob", found.ID))
os.Remove(blobPath)

writeJSON(w, http.StatusOK, DeleteCacheEntryResponse{Ok: true})
}
Loading
Loading