Cloud Function pipeline fails: retrieving files from SFTP, converting to JSON, and updating database records

I’m building an automated workflow using Google Cloud Functions that runs every few minutes with Cloud Scheduler. The process should connect to an SFTP server to grab CSV files, convert them to JSON format, and then update records in a database using API calls.

I’ve put everything in one main.py file with the required dependencies. The entry point is my process_data function:

import os
import paramiko
import pandas as pd
import requests
import json

def fetch_files_from_server(folder_path, file_names):
    '''
    Connects to remote SFTP and downloads CSV files
    '''
    # Environment variables
    HOST = os.environ.get('SFTP_HOST')
    USER = os.environ.get('SFTP_USER')
    PASS = os.environ.get('SFTP_PASS')
    PORT = int(os.environ.get('SFTP_PORT'))
    
    downloaded_files = []
    
    transport = paramiko.Transport((HOST, PORT))
    transport.connect(username=USER, password=PASS)
    client = paramiko.SFTPClient.from_transport(transport)
    
    print('SFTP connection established')
    
    for csv_file in file_names:
        file_data = client.open(folder_path + csv_file)
        dataframe = pd.read_csv(file_data)
        downloaded_files.append(dataframe)
    
    client.close()
    transport.close()
    print("SFTP disconnected")
    
    return downloaded_files

def build_record_list(dataframe):
    '''
    Creates structured data from dataframe for API updates
    '''
    records = [
        {
            'record_id': 'rec123ABC',
            'data': {
                'count': int(dataframe.iloc[0,2] + dataframe.iloc[5,2] + dataframe.iloc[10,2])
            }
        },
        {
            'record_id': 'rec456DEF', 
            'data': {
                'count': int(dataframe.iloc[1,2] + dataframe.iloc[6,2] + dataframe.iloc[11,2])
            }
        },
        {
            'record_id': 'rec789GHI',
            'data': {
                'count': int(dataframe.iloc[2,2] + dataframe.iloc[7,2] + dataframe.iloc[12,2])
            }
        }
    ]
    
    return records

def update_database(record_list):
    '''
    Sends PATCH request to update database records
    '''
    API_URL = os.environ.get('DATABASE_URL')
    API_TOKEN = os.environ.get('DATABASE_TOKEN')
    
    request_headers = {
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json"
    }
    
    result = requests.patch(API_URL + '/records', headers=request_headers, data=json.dumps({'records': record_list}))
    
    if result.status_code == 200:
        print("Database updated successfully")
    else:
        print("Update failed: " + json.dumps(result.json()))

def process_data(request):
    # Configuration
    server_folder = '/data/exports/'
    target_files = ['report1.csv', 'report2.csv']
    
    # Execute pipeline
    file_data = fetch_files_from_server(folder_path=server_folder, file_names=target_files)
    
    return 'Processing complete'

But I keep getting a 500 Internal Server Error. The logs show:

File "/layers/google.python.pip/pip/lib/python3.11/site-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()

What could be causing this error in my Cloud Function?

Found your problem immediately. You’re calling fetch_files_from_server and getting dataframes back, but you never actually use them to build records or update the database.

Your process_data function downloads the files and stops. You need to finish the pipeline:

def process_data(request):
    server_folder = '/data/exports/'
    target_files = ['report1.csv', 'report2.csv']
    
    # Execute pipeline
    file_data = fetch_files_from_server(folder_path=server_folder, file_names=target_files)
    
    # This is what you're missing:
    for dataframe in file_data:
        records = build_record_list(dataframe)
        update_database(records)
    
    return 'Processing complete'

I’ve hit this exact SFTP issue before. Your paramiko connection might be timing out or failing silently. Add error handling around the transport connection:

try:
    transport = paramiko.Transport((HOST, PORT))
    transport.connect(username=USER, password=PASS)
except Exception as e:
    print(f"SFTP connection failed: {e}")
    raise

One more thing - your build_record_list function assumes specific cell positions exist. If your CSV doesn’t have enough rows, you’ll get an IndexError. Learned this the hard way processing variable length reports.

Check your Cloud Function logs for the actual error message. The Flask traceback you’re seeing is just the wrapper, not the real problem.

you’re also missing exception handling for the pandas operations. I ran into the same timeout issues with large CSV files in cloud functions. add a timeout to your sftp connection and double-check that your environment variables are actually set - cloud functions can be weird about env vars.