I’m building an automated workflow using Google Cloud Functions that runs every few minutes with Cloud Scheduler. The process should connect to an SFTP server to grab CSV files, convert them to JSON format, and then update records in a database using API calls.
I’ve put everything in one main.py file with the required dependencies. The entry point is my process_data function:
import os
import paramiko
import pandas as pd
import requests
import json
def fetch_files_from_server(folder_path, file_names):
'''
Connects to remote SFTP and downloads CSV files
'''
# Environment variables
HOST = os.environ.get('SFTP_HOST')
USER = os.environ.get('SFTP_USER')
PASS = os.environ.get('SFTP_PASS')
PORT = int(os.environ.get('SFTP_PORT'))
downloaded_files = []
transport = paramiko.Transport((HOST, PORT))
transport.connect(username=USER, password=PASS)
client = paramiko.SFTPClient.from_transport(transport)
print('SFTP connection established')
for csv_file in file_names:
file_data = client.open(folder_path + csv_file)
dataframe = pd.read_csv(file_data)
downloaded_files.append(dataframe)
client.close()
transport.close()
print("SFTP disconnected")
return downloaded_files
def build_record_list(dataframe):
'''
Creates structured data from dataframe for API updates
'''
records = [
{
'record_id': 'rec123ABC',
'data': {
'count': int(dataframe.iloc[0,2] + dataframe.iloc[5,2] + dataframe.iloc[10,2])
}
},
{
'record_id': 'rec456DEF',
'data': {
'count': int(dataframe.iloc[1,2] + dataframe.iloc[6,2] + dataframe.iloc[11,2])
}
},
{
'record_id': 'rec789GHI',
'data': {
'count': int(dataframe.iloc[2,2] + dataframe.iloc[7,2] + dataframe.iloc[12,2])
}
}
]
return records
def update_database(record_list):
'''
Sends PATCH request to update database records
'''
API_URL = os.environ.get('DATABASE_URL')
API_TOKEN = os.environ.get('DATABASE_TOKEN')
request_headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
result = requests.patch(API_URL + '/records', headers=request_headers, data=json.dumps({'records': record_list}))
if result.status_code == 200:
print("Database updated successfully")
else:
print("Update failed: " + json.dumps(result.json()))
def process_data(request):
# Configuration
server_folder = '/data/exports/'
target_files = ['report1.csv', 'report2.csv']
# Execute pipeline
file_data = fetch_files_from_server(folder_path=server_folder, file_names=target_files)
return 'Processing complete'
But I keep getting a 500 Internal Server Error. The logs show:
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/flask/app.py", line 2073, in wsgi_app
response = self.full_dispatch_request()
What could be causing this error in my Cloud Function?