> ## Documentation Index
> Fetch the complete documentation index at: https://docs.qwairy.co/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Export

> Export GEO data to CSV, JSON, and BI tools like BigQuery or Snowflake

Export your Qwairy data for analysis in spreadsheets, databases, or BI platforms.

<Note>
  This guide uses the API client from the [Guides index](/developers/guides/index#api-client). Copy it to your project first.
</Note>

## What You'll Build

Export pipelines for:

* **CSV files**: For Excel, Google Sheets
* **JSON files**: For data pipelines
* **BigQuery**: For Google Cloud analytics
* **Snowflake**: For enterprise data warehousing

***

## Fetch All Data

First, gather data from all endpoints.

<CodeGroup>
  ```javascript JavaScript theme={null}
  async function fetchAllData(client, brandId, period = 30) {
    const [performance, competitors, sources] = await Promise.all([
      client.getPerformance(brandId, { period }),
      client.getCompetitors(brandId, { period, limit: 100 }),
      client.getSourceDomains(brandId, { period, limit: 100 }),
    ]);

    return {
      exportedAt: new Date().toISOString(),
      brandId,
      period,
      performance: {
        scores: performance.scores,
        methodology: performance.methodology,
      },
      competitors: competitors.competitors.map(c => ({
        id: c.id,
        name: c.name,
        relationship: c.relationship,
        shareOfVoice: c.shareOfVoice,
        totalMentions: c.totalMentions,
        avgPosition: c.avgPosition,
        avgSentiment: c.avgSentiment,
      })),
      sources: sources.sources.map(s => ({
        id: s.id,
        domain: s.domain,
        type: s.type,
        isSelf: s.isSelf,
        totalMentions: s.totalMentions,
        rate: s.rate,
        avgPosition: s.avgPosition,
      })),
    };
  }
  ```

  ```python Python theme={null}
  def fetch_all_data(client, brand_id: str, period: int = 30) -> dict:
      """Fetch all data for export."""
      from concurrent.futures import ThreadPoolExecutor
      from datetime import datetime

      with ThreadPoolExecutor(max_workers=3) as executor:
          perf_future = executor.submit(client.get_performance, brand_id, period=period)
          comp_future = executor.submit(client.get_competitors, brand_id, period=period, limit=100)
          src_future = executor.submit(client.get_source_domains, brand_id, period=period, limit=100)

          performance = perf_future.result()
          competitors = comp_future.result()
          sources = src_future.result()

      return {
          'exported_at': datetime.now().isoformat(),
          'brand_id': brand_id,
          'period': period,
          'performance': {
              'scores': performance['scores'],
              'methodology': performance['methodology'],
          },
          'competitors': [
              {
                  'id': c['id'],
                  'name': c['name'],
                  'relationship': c['relationship'],
                  'share_of_voice': c['shareOfVoice'],
                  'total_mentions': c['totalMentions'],
                  'avg_position': c['avgPosition'],
                  'avg_sentiment': c['avgSentiment'],
              }
              for c in competitors['competitors']
          ],
          'sources': [
              {
                  'id': s['id'],
                  'domain': s['domain'],
                  'type': s['type'],
                  'is_self': s['isSelf'],
                  'total_mentions': s['totalMentions'],
                  'rate': s['rate'],
                  'avg_position': s['avgPosition'],
              }
              for s in sources['sources']
          ],
      }
  ```
</CodeGroup>

***

## Export to CSV

<CodeGroup>
  ```javascript JavaScript theme={null}
  const fs = require('fs');

  function exportToCSV(data, outputDir = './exports') {
    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    const timestamp = data.exportedAt.split('T')[0];

    // Competitors CSV
    const competitorHeaders = ['id', 'name', 'relationship', 'shareOfVoice', 'totalMentions', 'avgPosition', 'avgSentiment'];
    const competitorRows = data.competitors.map(c => competitorHeaders.map(h => c[h]).join(','));
    fs.writeFileSync(
      `${outputDir}/competitors_${timestamp}.csv`,
      [competitorHeaders.join(','), ...competitorRows].join('\n')
    );

    // Sources CSV
    const sourceHeaders = ['id', 'domain', 'type', 'isSelf', 'totalMentions', 'rate', 'avgPosition'];
    const sourceRows = data.sources.map(s => sourceHeaders.map(h => s[h]).join(','));
    fs.writeFileSync(
      `${outputDir}/sources_${timestamp}.csv`,
      [sourceHeaders.join(','), ...sourceRows].join('\n')
    );

    // Performance CSV (single row)
    const perfHeaders = ['mentionRate', 'sourceRate', 'coverage', 'shareOfVoice', 'sentiment'];
    const perfValues = [
      data.performance.scores.mentionRate,
      data.performance.scores.sourceRate,
      data.performance.scores.coverage,
      data.performance.scores.shareOfVoice,
      data.performance.scores.sentiment,
    ];
    fs.writeFileSync(
      `${outputDir}/performance_${timestamp}.csv`,
      [perfHeaders.join(','), perfValues.join(',')].join('\n')
    );

    return {
      files: [
        `${outputDir}/competitors_${timestamp}.csv`,
        `${outputDir}/sources_${timestamp}.csv`,
        `${outputDir}/performance_${timestamp}.csv`,
      ],
    };
  }
  ```

  ```python Python theme={null}
  import csv
  import os


  def export_to_csv(data: dict, output_dir: str = './exports') -> dict:
      """Export data to CSV files."""
      os.makedirs(output_dir, exist_ok=True)
      timestamp = data['exported_at'].split('T')[0]
      files = []

      # Competitors CSV
      comp_file = f'{output_dir}/competitors_{timestamp}.csv'
      with open(comp_file, 'w', newline='') as f:
          if data['competitors']:
              writer = csv.DictWriter(f, fieldnames=data['competitors'][0].keys())
              writer.writeheader()
              writer.writerows(data['competitors'])
      files.append(comp_file)

      # Sources CSV
      src_file = f'{output_dir}/sources_{timestamp}.csv'
      with open(src_file, 'w', newline='') as f:
          if data['sources']:
              writer = csv.DictWriter(f, fieldnames=data['sources'][0].keys())
              writer.writeheader()
              writer.writerows(data['sources'])
      files.append(src_file)

      # Performance CSV
      perf_file = f'{output_dir}/performance_{timestamp}.csv'
      scores = data['performance']['scores']
      with open(perf_file, 'w', newline='') as f:
          writer = csv.DictWriter(f, fieldnames=['mention_rate', 'source_rate', 'coverage', 'share_of_voice', 'sentiment'])
          writer.writeheader()
          writer.writerow({
              'mention_rate': scores['mentionRate'],
              'source_rate': scores['sourceRate'],
              'coverage': scores['coverage'],
              'share_of_voice': scores['shareOfVoice'],
              'sentiment': scores['sentiment'],
          })
      files.append(perf_file)

      return {'files': files}
  ```
</CodeGroup>

***

## Export to JSON

<CodeGroup>
  ```javascript JavaScript theme={null}
  const fs = require('fs');

  function exportToJSON(data, outputDir = './exports') {
    if (!fs.existsSync(outputDir)) {
      fs.mkdirSync(outputDir, { recursive: true });
    }

    const timestamp = data.exportedAt.split('T')[0];
    const filepath = `${outputDir}/qwairy_export_${timestamp}.json`;

    fs.writeFileSync(filepath, JSON.stringify(data, null, 2));

    return { file: filepath, size: fs.statSync(filepath).size };
  }
  ```

  ```python Python theme={null}
  import json
  import os


  def export_to_json(data: dict, output_dir: str = './exports') -> dict:
      """Export data to JSON file."""
      os.makedirs(output_dir, exist_ok=True)
      timestamp = data['exported_at'].split('T')[0]
      filepath = f'{output_dir}/qwairy_export_{timestamp}.json'

      with open(filepath, 'w') as f:
          json.dump(data, f, indent=2)

      return {'file': filepath, 'size': os.path.getsize(filepath)}
  ```
</CodeGroup>

***

## Database Schemas

Create these tables before exporting to BigQuery or Snowflake.

<CodeGroup>
  ```sql BigQuery theme={null}
  -- Create dataset
  CREATE SCHEMA IF NOT EXISTS qwairy;

  -- Competitors table
  CREATE TABLE IF NOT EXISTS qwairy.competitors (
    id STRING NOT NULL,
    name STRING NOT NULL,
    relationship STRING,
    share_of_voice FLOAT64,
    total_mentions INT64,
    avg_position FLOAT64,
    avg_sentiment FLOAT64,
    exported_at TIMESTAMP NOT NULL,
    brand_id STRING NOT NULL
  );

  -- Sources table
  CREATE TABLE IF NOT EXISTS qwairy.sources (
    id STRING NOT NULL,
    domain STRING NOT NULL,
    type STRING,
    is_self BOOL,
    total_mentions INT64,
    rate FLOAT64,
    avg_position FLOAT64,
    exported_at TIMESTAMP NOT NULL,
    brand_id STRING NOT NULL
  );

  -- Performance table
  CREATE TABLE IF NOT EXISTS qwairy.performance (
    mentionRate FLOAT64,
    sourceRate FLOAT64,
    coverage FLOAT64,
    shareOfVoice FLOAT64,
    sentiment FLOAT64,
    methodology STRING,
    exported_at TIMESTAMP NOT NULL,
    brand_id STRING NOT NULL
  );
  ```

  ```sql Snowflake theme={null}
  -- Create schema
  CREATE SCHEMA IF NOT EXISTS QWAIRY;

  -- Competitors table
  CREATE TABLE IF NOT EXISTS QWAIRY.COMPETITORS (
    ID VARCHAR(50) NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    RELATIONSHIP VARCHAR(50),
    SHARE_OF_VOICE FLOAT,
    TOTAL_MENTIONS INTEGER,
    AVG_POSITION FLOAT,
    AVG_SENTIMENT FLOAT,
    EXPORTED_AT TIMESTAMP_NTZ NOT NULL,
    BRAND_ID VARCHAR(50) NOT NULL
  );

  -- Sources table
  CREATE TABLE IF NOT EXISTS QWAIRY.SOURCES (
    ID VARCHAR(50) NOT NULL,
    DOMAIN VARCHAR(255) NOT NULL,
    TYPE VARCHAR(50),
    IS_SELF BOOLEAN,
    TOTAL_MENTIONS INTEGER,
    RATE FLOAT,
    AVG_POSITION FLOAT,
    EXPORTED_AT TIMESTAMP_NTZ NOT NULL,
    BRAND_ID VARCHAR(50) NOT NULL
  );

  -- Performance table
  CREATE TABLE IF NOT EXISTS QWAIRY.PERFORMANCE (
    MENTION_RATE FLOAT,
    SOURCE_RATE FLOAT,
    COVERAGE FLOAT,
    SHARE_OF_VOICE FLOAT,
    SENTIMENT FLOAT,
    METHODOLOGY VARCHAR(5000),
    EXPORTED_AT TIMESTAMP_NTZ NOT NULL,
    BRAND_ID VARCHAR(50) NOT NULL
  );
  ```
</CodeGroup>

***

## Export to BigQuery

<CodeGroup>
  ```javascript JavaScript theme={null}
  const { BigQuery } = require('@google-cloud/bigquery');

  async function exportToBigQuery(data, datasetId = 'qwairy', projectId = process.env.GCP_PROJECT_ID) {
    const bigquery = new BigQuery({ projectId });

    // Create dataset if not exists
    const [datasets] = await bigquery.getDatasets();
    if (!datasets.find(d => d.id === datasetId)) {
      await bigquery.createDataset(datasetId);
    }

    const dataset = bigquery.dataset(datasetId);

    // Insert competitors
    await dataset.table('competitors').insert(
      data.competitors.map(c => ({
        ...c,
        exportedAt: data.exportedAt,
        brandId: data.brandId,
      }))
    );

    // Insert sources
    await dataset.table('sources').insert(
      data.sources.map(s => ({
        ...s,
        exportedAt: data.exportedAt,
        brandId: data.brandId,
      }))
    );

    // Insert performance
    await dataset.table('performance').insert([{
      ...data.performance.scores,
      methodology: JSON.stringify(data.performance.methodology),
      exportedAt: data.exportedAt,
      brandId: data.brandId,
    }]);

    return { dataset: datasetId, tables: ['competitors', 'sources', 'performance'] };
  }
  ```

  ```python Python theme={null}
  from google.cloud import bigquery


  def export_to_bigquery(data: dict, dataset_id: str = 'qwairy', project_id: str = None) -> dict:
      """Export data to BigQuery."""
      import os
      import json

      project_id = project_id or os.environ.get('GCP_PROJECT_ID')
      client = bigquery.Client(project=project_id)

      # Create dataset if not exists
      dataset_ref = client.dataset(dataset_id)
      try:
          client.get_dataset(dataset_ref)
      except Exception:
          client.create_dataset(dataset_ref)

      # Insert competitors
      competitors_table = f'{project_id}.{dataset_id}.competitors'
      rows = [
          {**c, 'exported_at': data['exported_at'], 'brand_id': data['brand_id']}
          for c in data['competitors']
      ]
      if rows:
          client.insert_rows_json(competitors_table, rows)

      # Insert sources
      sources_table = f'{project_id}.{dataset_id}.sources'
      rows = [
          {**s, 'exported_at': data['exported_at'], 'brand_id': data['brand_id']}
          for s in data['sources']
      ]
      if rows:
          client.insert_rows_json(sources_table, rows)

      # Insert performance
      perf_table = f'{project_id}.{dataset_id}.performance'
      perf_row = {
          **data['performance']['scores'],
          'methodology': json.dumps(data['performance']['methodology']),
          'exported_at': data['exported_at'],
          'brand_id': data['brand_id'],
      }
      client.insert_rows_json(perf_table, [perf_row])

      return {'dataset': dataset_id, 'tables': ['competitors', 'sources', 'performance']}
  ```
</CodeGroup>

***

## Export to Snowflake

<CodeGroup>
  ```javascript JavaScript theme={null}
  const snowflake = require('snowflake-sdk');

  async function exportToSnowflake(data, config) {
    const connection = snowflake.createConnection({
      account: config.account,
      username: config.username,
      password: config.password,
      warehouse: config.warehouse,
      database: config.database,
      schema: config.schema || 'QWAIRY',
    });

    await new Promise((resolve, reject) => {
      connection.connect((err) => err ? reject(err) : resolve());
    });

    const execute = (sql, binds = []) => new Promise((resolve, reject) => {
      connection.execute({ sqlText: sql, binds, complete: (err, stmt, rows) => err ? reject(err) : resolve(rows) });
    });

    // Insert competitors
    for (const c of data.competitors) {
      await execute(
        `INSERT INTO competitors (id, name, relationship, share_of_voice, total_mentions, avg_position, avg_sentiment, exported_at, brand_id)
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
        [c.id, c.name, c.relationship, c.shareOfVoice, c.totalMentions, c.avgPosition, c.avgSentiment, data.exportedAt, data.brandId]
      );
    }

    // Insert sources
    for (const s of data.sources) {
      await execute(
        `INSERT INTO sources (id, domain, type, is_self, total_mentions, rate, avg_position, exported_at, brand_id)
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
        [s.id, s.domain, s.type, s.isSelf, s.totalMentions, s.rate, s.avgPosition, data.exportedAt, data.brandId]
      );
    }

    connection.destroy();
    return { schema: config.schema || 'QWAIRY', tables: ['competitors', 'sources'] };
  }
  ```

  ```python Python theme={null}
  import snowflake.connector


  def export_to_snowflake(data: dict, config: dict) -> dict:
      """Export data to Snowflake."""
      conn = snowflake.connector.connect(
          account=config['account'],
          user=config['username'],
          password=config['password'],
          warehouse=config['warehouse'],
          database=config['database'],
          schema=config.get('schema', 'QWAIRY'),
      )
      cursor = conn.cursor()

      # Insert competitors
      for c in data['competitors']:
          cursor.execute(
              '''INSERT INTO competitors (id, name, relationship, share_of_voice, total_mentions, avg_position, avg_sentiment, exported_at, brand_id)
                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)''',
              (c['id'], c['name'], c['relationship'], c['share_of_voice'], c['total_mentions'],
               c['avg_position'], c['avg_sentiment'], data['exported_at'], data['brand_id'])
          )

      # Insert sources
      for s in data['sources']:
          cursor.execute(
              '''INSERT INTO sources (id, domain, type, is_self, total_mentions, rate, avg_position, exported_at, brand_id)
                 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)''',
              (s['id'], s['domain'], s['type'], s['is_self'], s['total_mentions'],
               s['rate'], s['avg_position'], data['exported_at'], data['brand_id'])
          )

      conn.commit()
      cursor.close()
      conn.close()

      return {'schema': config.get('schema', 'QWAIRY'), 'tables': ['competitors', 'sources']}
  ```
</CodeGroup>

***

## Usage

<CodeGroup>
  ```javascript JavaScript theme={null}
  const client = new QwairyClient(process.env.QWAIRY_API_TOKEN);

  // Fetch data
  const data = await fetchAllData(client, 'your-brand-id', 30);

  // Export to files
  const csvResult = exportToCSV(data);
  console.log('CSV files:', csvResult.files);

  const jsonResult = exportToJSON(data);
  console.log('JSON file:', jsonResult.file, `(${jsonResult.size} bytes)`);

  // Export to BigQuery (requires @google-cloud/bigquery)
  // const bqResult = await exportToBigQuery(data);
  // console.log('BigQuery tables:', bqResult.tables);
  ```

  ```python Python theme={null}
  client = QwairyClient()

  # Fetch data
  data = fetch_all_data(client, 'your-brand-id', period=30)

  # Export to files
  csv_result = export_to_csv(data)
  print(f"CSV files: {csv_result['files']}")

  json_result = export_to_json(data)
  print(f"JSON file: {json_result['file']} ({json_result['size']} bytes)")

  # Export to BigQuery (requires google-cloud-bigquery)
  # bq_result = export_to_bigquery(data)
  # print(f"BigQuery tables: {bq_result['tables']}")
  ```
</CodeGroup>

***

## Scheduling Exports

Automate daily or weekly exports:

| Platform           | Configuration                     |
| ------------------ | --------------------------------- |
| **Cron**           | `0 1 * * *` (daily at 1am)        |
| **GitHub Actions** | `schedule: cron: '0 1 * * *'`     |
| **AWS Lambda**     | EventBridge scheduled rule        |
| **Google Cloud**   | Cloud Scheduler + Cloud Functions |

***

## Next Steps

* Create a [custom dashboard](/developers/guides/custom-dashboard) for real-time monitoring
* Set up [weekly reports](/developers/guides/weekly-reports) for stakeholders
* Add [competitive analysis](/developers/guides/competitive-analysis) to your exports
