Service Integration Guide
Complete guide for integrating with all workbench services
Environment Setup
Required Python Packages
Install the following packages in your conda environment:
pip install mlflow feast pyspark kafka-python requests
Quick Start
All services are automatically configured when you install PySpark in a conda environment. The system will create necessary configuration files and set up environment variables automatically.
Service URLs
| Service | URL | Description |
|---|---|---|
| MLflow | http://mlflow:5000/ |
Model tracking and experiment management |
| Feast | http://feast-ui:8080/ |
Feature store for ML features |
| Ollama | http://ollama:11434/ |
Local LLM inference |
| vLLM | http://vllm:8000/v1 |
High-performance LLM serving |
| Spark | https://spark.workbench.relativelyobjective.com/ |
Distributed computing cluster |
| Kafka | kafka:9092 |
Streaming platform (internal) |
Conda Terminal
🆕 Automatic Kernel Registration
The workbench now automatically detects and registers new conda environments (including upgrades) as Jupyter kernels. No manual intervention required - just create or upgrade environments and they'll appear instantly in JupyterLab and VS Code.
Quick Start Commands
# Create a new environment
conda create -n my-env python=3.11 -y
# Activate the environment
conda activate my-env
# Install Jupyter kernel
conda install ipykernel -y
# Environment automatically appears in JupyterLab & VS Code!
Common Commands
conda env list- List all environmentsconda list- List packages in current environmentconda info- Show conda configurationconda search package-name- Search for packagesconda install package-name- Install packageconda remove package-name- Remove packageconda update package-name- Update packageconda env remove -n env-name- Delete environmentconda clean --all- Clean unused packagesls /shared_kernels/share/jupyter/kernels/- List registered kernels
How It Works
When you install ipykernel in any conda environment, the system automatically:
- Detects the new environment
- Creates a kernel specification
- Registers it in JupyterLab
- Registers it in VS Code
No restart needed - kernels appear immediately!
MLflow Integration
Getting Started with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# Set tracking URI
mlflow.set_tracking_uri("http://mlflow:5000/")
# Set experiment
mlflow.set_experiment("my-first-experiment")
Experiment Tracking
# Start a run
with mlflow.start_run():
# Log parameters
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)
# Log metrics
mlflow.log_metric("accuracy", model.score(X_test, y_test))
# Log model
mlflow.sklearn.log_model(model, "random-forest-model")
Model Loading
# Load a specific model version
model_uri = "runs:/{run_id}/random-forest-model"
loaded_model = mlflow.sklearn.load_model(model_uri)
# Make predictions
predictions = loaded_model.predict(X_new)
PySpark Integration
Initializing Spark Session
Connect to the Spark Cluster
To use the distributed Spark cluster, specify the master URL in your SparkSession builder. If you omit this, Spark will run in local mode and not use the cluster.
from pyspark.sql import SparkSession
# Create Spark session (connects to the cluster)
spark = SparkSession.builder \
.appName("MyApp") \
.master("spark://spark-master:7077") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
Working with Data
# Read data
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Transform data
df_transformed = df.filter(df.column > 0) \
.groupBy("category") \
.agg({"value": "sum", "count": "count"})
# Write data
df_transformed.write.mode("overwrite").parquet("output/")
Spark Configuration
The Spark session is automatically configured with optimal settings for the workbench environment. Key configurations include:
- Adaptive query execution enabled
- Dynamic partition pruning
- Optimized memory management
- Web UI accessible at the Spark service URL
- Automatic Python version alignment between Jupyter and Spark workers
- Seamless integration with conda-store environments
🆕 PySpark Environment Compatibility (2025-07 Update)
PySpark now works seamlessly with conda-store environments. The system automatically ensures Python version compatibility between Jupyter kernels and Spark workers, eliminating version mismatch errors. Environment upgrades (e.g., Python 3.10 → 3.12) are automatically handled.
Feast Integration
Feature Store Configuration
from feast import FeatureStore
# Initialize feature store
store = FeatureStore(repo_path="feature_repo")
# Or connect to remote feature store
store = FeatureStore(
repo_path="feature_repo",
config_path="feature_store.yaml"
)
Feature Retrieval
# Get features for inference
features = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate"
],
entity_rows=[{"driver_id": 1001}]
).to_dict()
print(features)
Feature Definitions
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import PushSource, RequestSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import PostgreSQLOfflineStoreConfig
from feast.infra.online_stores.redis import RedisOnlineStoreConfig
# Define entity
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
)
# Define feature view
driver_hourly_stats = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(hours=1),
schema=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
],
online=True,
source=driver_stats_source,
)
Ollama Integration
Basic Usage
import requests
import json
OLLAMA_URL = "http://ollama:11434/api"
def generate_text(prompt, model="llama2"):
response = requests.post(f"{OLLAMA_URL}/generate", json={
"model": model,
"prompt": prompt,
"stream": False
})
return response.json()["response"]
# Example usage
response = generate_text("Explain machine learning in simple terms")
print(response)
Available Models
# List available models
response = requests.get(f"{OLLAMA_URL}/tags")
models = response.json()["models"]
for model in models:
print(f"Model: {model['name']}, Size: {model['size']}")
Streaming Responses
def generate_streaming(prompt, model="llama2"):
response = requests.post(f"{OLLAMA_URL}/generate", json={
"model": model,
"prompt": prompt,
"stream": True
}, stream=True)
for line in response.iter_lines():
if line:
data = json.loads(line)
if data.get("done"):
break
yield data.get("response", "")
# Usage
for chunk in generate_streaming("Write a short story"):
print(chunk, end="", flush=True)
vLLM Integration
OpenAI-Compatible API
import requests
import json
VLLM_URL = "http://vllm:8000/v1"
def generate_text(prompt, model="llama2-7b-chat"):
response = requests.post(f"{VLLM_URL}/completions", json={
"model": model,
"prompt": prompt,
"max_tokens": 100,
"temperature": 0.7,
"stream": False
})
return response.json()["choices"][0]["text"]
# Example usage
response = generate_text("What is the capital of France?")
print(response)
Chat Completions
def chat_completion(messages, model="llama2-7b-chat"):
response = requests.post(f"{VLLM_URL}/chat/completions", json={
"model": model,
"messages": messages,
"max_tokens": 150,
"temperature": 0.7
})
return response.json()["choices"][0]["message"]["content"]
# Example conversation
messages = [
{"role": "user", "content": "Hello, how are you?"},
{"role": "assistant", "content": "I'm doing well, thank you!"},
{"role": "user", "content": "What can you help me with?"}
]
response = chat_completion(messages)
print(response)
Available Models
# List available models
response = requests.get(f"{VLLM_URL}/models")
models = response.json()["data"]
for model in models:
print(f"Model: {model['id']}")
Kafka Integration
Producer Setup
from kafka import KafkaProducer
import json
# Create producer
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
Sending Messages
# Send message
message = {"user_id": 123, "action": "login", "timestamp": "2024-01-01T12:00:00Z"}
producer.send('user_events', key='user_123', value=message)
# Flush to ensure delivery
producer.flush()
Consumer Setup
from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'user_events',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
group_id='my_consumer_group'
)
# Consume messages
for message in consumer:
print(f"Received: {message.value}")
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
Integration Examples
End-to-End ML Pipeline
import mlflow
from pyspark.sql import SparkSession
from feast import FeatureStore
import requests
# 1. Initialize services
mlflow.set_tracking_uri("http://mlflow:5000/")
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()
store = FeatureStore(repo_path="feature_repo")
# 2. Load and process data with Spark
df = spark.read.csv("data.csv", header=True, inferSchema=True)
processed_data = df.filter(df.quality > 0.5).toPandas()
# 3. Get features from Feast
features = store.get_online_features(
features=["user_features:age", "user_features:income"],
entity_rows=[{"user_id": 123}]
).to_dict()
# 4. Train model and log with MLflow
with mlflow.start_run():
# Your training code here
mlflow.log_metric("accuracy", 0.95)
mlflow.sklearn.log_model(model, "my_model")
# 5. Generate insights with vLLM
insights = requests.post("http://vllm:8000/v1/completions", json={
"model": "llama2-7b-chat",
"prompt": f"Analyze this model performance: accuracy={0.95}",
"max_tokens": 100
}).json()["choices"][0]["text"]
print(insights)
Real-time Data Processing
from kafka import KafkaProducer, KafkaConsumer
from pyspark.sql import SparkSession
import json
# Setup
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
consumer = KafkaConsumer(
'sensor_data',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
spark = SparkSession.builder.appName("Streaming").getOrCreate()
# Process streaming data
for message in consumer:
data = message.value
# Process with Spark
df = spark.createDataFrame([data])
result = df.select("*").filter(df.temperature > 25)
# Send processed data to another topic
producer.send('processed_data', value=result.toPandas().to_dict('records')[0])
producer.flush()
Troubleshooting
Connection Tests
Test connectivity to all services using the following code examples:
Quick Test Script
Run this script to test all service connections at once:
MLflow Connection
import requests
try:
response = requests.get("http://mlflow:5000/")
print(f"MLflow: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
print(f"MLflow: Error - {e}")
Feast Connection
try:
response = requests.get("http://feast-ui:8080/")
print(f"Feast: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
print(f"Feast: Error - {e}")
vLLM Connection
try:
response = requests.get("http://vllm:8000/v1/models")
print(f"vLLM: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
print(f"vLLM: Error - {e}")
Spark Connection
try:
response = requests.get("https://spark.workbench.relativelyobjective.com/")
print(f"Spark: {'Connected' if response.status_code == 200 else 'Failed'}")
except Exception as e:
print(f"Spark: Error - {e}")
Common Issues
If connections fail, check your network settings and ensure all services are running properly.
🆕 Environment & Kernel Issues (2025-07 Update)
If you encounter issues with Jupyter kernels or PySpark compatibility:
Kernel Not Appearing in JupyterLab
# Check if the watcher script is running
docker exec jupyterlab ps aux | grep utility-watch-envs
# Check watcher logs
docker exec jupyterlab tail -n 20 /scripts/utility-watch-envs.sh.log
# Manually trigger kernel registration (if needed)
docker exec jupyterlab bash -c "cd /scripts && ./utility-watch-envs.sh"
PySpark Python Version Mismatch
# Check your Jupyter kernel Python version
import sys
print(f"Python version: {sys.version}")
# Check if using the correct conda-store environment
import os
print(f"Python executable: {sys.executable}")
# Should show path like: /opt/conda_store/data/store/default/.../bin/python
Automatic Resolution
The watcher script automatically handles most environment issues. If problems persist, restart JupyterLab to pick up new kernel registrations.