I’m building an automated workflow using Google Cloud Functions and Cloud Scheduler to run every few minutes. The process connects to an FTP server, downloads CSV files, converts them to JSON format, and then updates database records via API calls.
My cloud function has an entry point called main_handler and here’s the code:
import os
import ftplib
import pandas as pd
import requests
import json
def fetch_files_from_server(folder_path, file_names):
'''
Connects to FTP server and retrieves CSV files as dataframes
Args:
folder_path: directory path on server
file_names: list of files to download
Returns:
retrieved_data: list of dataframes
'''
# Get connection details from environment
SERVER_HOST = os.environ.get('SERVER_HOST')
SERVER_USER = os.environ.get('SERVER_USER')
SERVER_PASS = os.environ.get('SERVER_PASS')
SERVER_PORT = int(os.environ.get('SERVER_PORT'))
retrieved_data = []
with ftplib.FTP() as ftp:
ftp.connect(SERVER_HOST, SERVER_PORT)
ftp.login(SERVER_USER, SERVER_PASS)
print('FTP connection established')
for csv_file in file_names:
file_path = folder_path + csv_file
df = pd.read_csv(file_path)
retrieved_data.append(df)
print("FTP connection closed")
return retrieved_data
def process_data_to_records(dataframe):
'''
Transforms dataframe into record format for API updates
Args:
dataframe: input pandas dataframe
Returns:
records: formatted list of dictionaries
'''
records = [
{
'record_id': 'rec123ABC',
'data': {
'count': int(dataframe.iloc[1,2] + dataframe.iloc[5,2] + dataframe.iloc[9,2])
}
},
{
'record_id': 'rec456DEF',
'data': {
'count': int(dataframe.iloc[2,2] + dataframe.iloc[6,2] + dataframe.iloc[10,2])
}
},
{
'record_id': 'rec789GHI',
'data': {
'count': int(dataframe.iloc[3,2] + dataframe.iloc[7,2] + dataframe.iloc[11,2])
}
}
]
return records
def update_database_records(record_list):
'''
Sends PATCH requests to update database records
Args:
record_list: list of record dictionaries
'''
API_ENDPOINT = os.environ.get('API_ENDPOINT')
API_TOKEN = os.environ.get('API_TOKEN')
request_headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
result = requests.patch(API_ENDPOINT + '/update', headers=request_headers, data=json.dumps({'records': record_list}))
if result.status_code == 200:
print("Database update successful")
else:
print("Database update failed: " + json.dumps(result.json()))
def main_handler(request):
source_folder = "/data/files/"
target_files = ["data1.csv", "data2.csv"]
file_data = fetch_files_from_server(folder_path=source_folder, file_names=target_files)
if request.args and 'msg' in request.args:
return request.args.get('msg')
else:
return 'Function executed successfully'
I keep getting a 500 Internal Server Error when testing this. The error log shows issues with Flask’s request handling but I can’t figure out what’s wrong. Has anyone encountered similar problems with cloud functions that process files and make API calls?